sparrow
🧩 Syntax:
package com.bagdouri.targha.service;
import com.bagdouri.targha.client.exchange.BinanceFuturesClient;
import com.bagdouri.targha.dto.Price;
import com.bagdouri.targha.dto.Symbol;
import com.bagdouri.targha.dto.exchange.*;
import com.bagdouri.targha.math.SimpleMovingSequence;
import com.bagdouri.targha.util.ThreadUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
@Slf4j
@Service
@Profile("sparrow")
public class SparrowTradingService extends AbstractService {
private static final ExecutorService executorService = Executors.newSingleThreadExecutor(
new CustomizableThreadFactory("sparrow-loop-"));
private final BinanceFuturesClient binanceFuturesClient;
private final Symbol symbol = Symbol.of("BTC", "USDC");
private final LongAdder priceChanges = new LongAdder();
private final double quantity = 0.002;
private final double spread = 10;
private final double skewRatio = spread / 20.0;
private final AtomicLong attemptedOrders = new AtomicLong();
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AlertService alertService;
private final Collection<OrderResponse> doneOrders = ConcurrentHashMap.newKeySet();
private final AtomicReference<OrderResponse> lastFilledOrder = new AtomicReference<>();
private volatile Price lastPrice = null;
private final AtomicReference<OrderResponse> askOrder = new AtomicReference<>();
private final AtomicReference<OrderResponse> bidOrder = new AtomicReference<>();
private final SimpleMovingSequence tradesMovingSequence = new SimpleMovingSequence(20, SimpleMovingSequence.NO_FLAGS);
private final double tradeAvgLimit = 2.0;
public SparrowTradingService(
Environment environment,
BinanceFuturesClient binanceFuturesClient,
AlertService alertService) {
super(environment);
this.binanceFuturesClient = binanceFuturesClient;
this.alertService = alertService;
}
@Override
protected void init() throws Exception {
cancelAllOrders();
binanceFuturesClient.addAggTradeListener(
e -> tradesMovingSequence.add(e.isBuyerMaker() ? -1.0 : 1.0));
ThreadUtils.submitTask("sparrow-loop", symbol.toString(), executorService, this::loop);
}
private void cancelAllOrders() {
binanceFuturesClient.getOpenOrders().stream()
.filter(order -> order.getSymbol().equals(symbol.getBase() + symbol.getQuote()))
.forEach(order -> {
this.alertService.alert("targha_cycles", "Cancelling order: " + order);
log.info("Cancelling order: {}", order);
binanceFuturesClient.cancelOrder(symbol.getBase() + symbol.getQuote(), order.getOrderId());
});
}
private void loop() {
log.info("Starting Sparrow loop");
try {
while (!Thread.currentThread().isInterrupted() && !stopped.get()) {
try {
checkOrder("ASK", askOrder, bidOrder);
checkOrder("BID", bidOrder, askOrder);
if (askOrder.get() == null || bidOrder.get() == null) {
Price price = binanceFuturesClient.getPrice(symbol.getBase(), symbol.getQuote());
if (price == null) continue;
if (!price.equals(lastPrice)) priceChanges.increment();
lastPrice = price;
double tradeAvg = getTradeAverage();
if(tradeAvg < tradeAvgLimit && askOrder.get() == null) {
double position = binanceFuturesClient.getPosition(symbol);
log.info("Price: {}, Position: {}, Trade avg: {}", price, position, tradeAvg);
sendAskOrder(price, position);
}
if(tradeAvg > -tradeAvgLimit && bidOrder.get() == null) {
double position = binanceFuturesClient.getPosition(symbol);
log.info("Price: {}, Position: {}, Trade avg: {}", price, position, tradeAvg);
sendBidOrder(price, position);
}
}
} catch (Exception e) {
log.error("Sparrow iteration failed. Sleeping for 1 second", e);
ThreadUtils.sleep(Duration.ofSeconds(1L));
}
}
} finally {
log.info("Sparrow loop stopped");
}
}
private void sendBidOrder(Price price, double position) {
sendOrder("BID", price, position, bidOrder, true);
}
private void sendAskOrder(Price price, double position) {
sendOrder("ASK", price, position, askOrder, false);
}
private void sendOrder(String tag, Price currentPrice, double position, AtomicReference<OrderResponse> order, boolean bid) {
if(order.get() != null) {
throw new IllegalStateException("Order already exists: " + order.get());
}
double sign = bid ? 1.0 : -1.0;
double skew = Math.round(position / quantity) * skewRatio;
double orderPrice = (currentPrice.mid() - sign * spread) - skew;
log.info("Skew: {}, Position: {}, Initial order price: {}", skew, position, orderPrice);
NewOrder newOrder = NewOrder.builder()
.baseCurrency(symbol.getBase())
.quoteCurrency(symbol.getQuote())
.type(OrderType.LIMIT)
.timeInForce(TimeInForce.GTX)
.quantity(sign * quantity)
.price(bid ? Math.min(orderPrice, currentPrice.bid()) : Math.max(orderPrice, currentPrice.ask()))
.leverage(20.0)
.marginType(MarginType.CROSSED)
.build();
alertService.alert("targha_cycles", "Send new " + tag + " order: " + newOrder);
log.info("Send new {} order: {}", tag, newOrder);
var orderResponse = binanceFuturesClient.newOrder(newOrder);
log.info("New {} order response: {}", tag, orderResponse);
order.set(orderResponse);
}
private boolean checkOrder(String tag, AtomicReference<OrderResponse> order, AtomicReference<OrderResponse> otherOrder) {
var orderResponse = order.get();
if(orderResponse == null) {
return false;
}
OrderStatus orderStatus = binanceFuturesClient.getOrderStatus(
orderResponse.getOrderId(), symbol.getBase(), symbol.getQuote(), MarginType.CROSSED);
if(orderStatus != orderResponse.getStatus()) {
order.set(binanceFuturesClient.getOrder(
orderResponse.getOrderId(), symbol.getBase(), symbol.getQuote(), MarginType.CROSSED));
this.alertService.alert(
"targha_cycles", tag + " order status changed: " + orderResponse);
log.info("{} order status changed: {}", tag, orderResponse);
}
if(orderStatus.isDone()) {
this.alertService.alert(
"targha_cycles", tag + " order is done: " + orderResponse);
log.info("{} order is done: {}", tag, orderResponse);
order.set(null);
doneOrders.add(orderResponse);
lastFilledOrder.set(orderResponse);
if(otherOrder.get() != null) {
log.info("Cancel other order {}", otherOrder.get());
binanceFuturesClient.cancelOrder(
symbol.getBase() + symbol.getQuote(), Long.parseLong(otherOrder.get().getOrderId()));
otherOrder.set(null);
}
return true;
}
return false;
}
public void stop() {
stopped.set(true);
}
public OrderResponse getOpenAskOrder() {
return askOrder.get();
}
public OrderResponse getOpenBidOrder() {
return bidOrder.get();
}
public List<OrderResponse> getAllOrders() {
return Stream.concat(
Stream.of(askOrder, bidOrder).map(AtomicReference::get).filter(Objects::nonNull),
doneOrders.stream()).toList();
}
public OrderResponse getLastFilledOrder() {
return lastFilledOrder.get();
}
public double getPosition() {
return binanceFuturesClient.getPosition(symbol);
}
public double getTradeAverage() {
return tradesMovingSequence.isFull() ? tradesMovingSequence.getMean() : Double.NaN;
}
@Override
public Object getMonitoring() {
return new Monitoring(
lastPrice, priceChanges.sum(), doneOrders.size(), attemptedOrders.get());
}
public record Monitoring(
Price lastPrice, long priceChanges, int doneOrders, long attemptedOrders
) {}
}