package com.bagdouri.targha.service; import com.bagdouri.targha.client.exchange.BinanceFuturesClient; import com.bagdouri.targha.dto.Price; import com.bagdouri.targha.dto.Symbol; import com.bagdouri.targha.dto.exchange.*; import com.bagdouri.targha.math.SimpleMovingSequence; import com.bagdouri.targha.util.ThreadUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Service; import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; @Slf4j @Service @Profile("sparrow") public class SparrowTradingService extends AbstractService { private static final ExecutorService executorService = Executors.newSingleThreadExecutor( new CustomizableThreadFactory("sparrow-loop-")); private final BinanceFuturesClient binanceFuturesClient; private final Symbol symbol = Symbol.of("BTC", "USDC"); private final LongAdder priceChanges = new LongAdder(); private final double quantity = 0.002; private final double spread = 10; private final double skewRatio = spread / 20.0; private final AtomicLong attemptedOrders = new AtomicLong(); private final AtomicBoolean stopped = new AtomicBoolean(false); private final AlertService alertService; private final Collection doneOrders = ConcurrentHashMap.newKeySet(); private final AtomicReference lastFilledOrder = new AtomicReference<>(); private volatile Price lastPrice = null; private final AtomicReference askOrder = new AtomicReference<>(); private final AtomicReference bidOrder = new AtomicReference<>(); private final SimpleMovingSequence tradesMovingSequence = new SimpleMovingSequence(20, SimpleMovingSequence.NO_FLAGS); private final double tradeAvgLimit = 2.0; public SparrowTradingService( Environment environment, BinanceFuturesClient binanceFuturesClient, AlertService alertService) { super(environment); this.binanceFuturesClient = binanceFuturesClient; this.alertService = alertService; } @Override protected void init() throws Exception { cancelAllOrders(); binanceFuturesClient.addAggTradeListener( e -> tradesMovingSequence.add(e.isBuyerMaker() ? -1.0 : 1.0)); ThreadUtils.submitTask("sparrow-loop", symbol.toString(), executorService, this::loop); } private void cancelAllOrders() { binanceFuturesClient.getOpenOrders().stream() .filter(order -> order.getSymbol().equals(symbol.getBase() + symbol.getQuote())) .forEach(order -> { this.alertService.alert("targha_cycles", "Cancelling order: " + order); log.info("Cancelling order: {}", order); binanceFuturesClient.cancelOrder(symbol.getBase() + symbol.getQuote(), order.getOrderId()); }); } private void loop() { log.info("Starting Sparrow loop"); try { while (!Thread.currentThread().isInterrupted() && !stopped.get()) { try { checkOrder("ASK", askOrder, bidOrder); checkOrder("BID", bidOrder, askOrder); if (askOrder.get() == null || bidOrder.get() == null) { Price price = binanceFuturesClient.getPrice(symbol.getBase(), symbol.getQuote()); if (price == null) continue; if (!price.equals(lastPrice)) priceChanges.increment(); lastPrice = price; double tradeAvg = getTradeAverage(); if(tradeAvg < tradeAvgLimit && askOrder.get() == null) { double position = binanceFuturesClient.getPosition(symbol); log.info("Price: {}, Position: {}, Trade avg: {}", price, position, tradeAvg); sendAskOrder(price, position); } if(tradeAvg > -tradeAvgLimit && bidOrder.get() == null) { double position = binanceFuturesClient.getPosition(symbol); log.info("Price: {}, Position: {}, Trade avg: {}", price, position, tradeAvg); sendBidOrder(price, position); } } } catch (Exception e) { log.error("Sparrow iteration failed. Sleeping for 1 second", e); ThreadUtils.sleep(Duration.ofSeconds(1L)); } } } finally { log.info("Sparrow loop stopped"); } } private void sendBidOrder(Price price, double position) { sendOrder("BID", price, position, bidOrder, true); } private void sendAskOrder(Price price, double position) { sendOrder("ASK", price, position, askOrder, false); } private void sendOrder(String tag, Price currentPrice, double position, AtomicReference order, boolean bid) { if(order.get() != null) { throw new IllegalStateException("Order already exists: " + order.get()); } double sign = bid ? 1.0 : -1.0; double skew = Math.round(position / quantity) * skewRatio; double orderPrice = (currentPrice.mid() - sign * spread) - skew; log.info("Skew: {}, Position: {}, Initial order price: {}", skew, position, orderPrice); NewOrder newOrder = NewOrder.builder() .baseCurrency(symbol.getBase()) .quoteCurrency(symbol.getQuote()) .type(OrderType.LIMIT) .timeInForce(TimeInForce.GTX) .quantity(sign * quantity) .price(bid ? Math.min(orderPrice, currentPrice.bid()) : Math.max(orderPrice, currentPrice.ask())) .leverage(20.0) .marginType(MarginType.CROSSED) .build(); alertService.alert("targha_cycles", "Send new " + tag + " order: " + newOrder); log.info("Send new {} order: {}", tag, newOrder); var orderResponse = binanceFuturesClient.newOrder(newOrder); log.info("New {} order response: {}", tag, orderResponse); order.set(orderResponse); } private boolean checkOrder(String tag, AtomicReference order, AtomicReference otherOrder) { var orderResponse = order.get(); if(orderResponse == null) { return false; } OrderStatus orderStatus = binanceFuturesClient.getOrderStatus( orderResponse.getOrderId(), symbol.getBase(), symbol.getQuote(), MarginType.CROSSED); if(orderStatus != orderResponse.getStatus()) { order.set(binanceFuturesClient.getOrder( orderResponse.getOrderId(), symbol.getBase(), symbol.getQuote(), MarginType.CROSSED)); this.alertService.alert( "targha_cycles", tag + " order status changed: " + orderResponse); log.info("{} order status changed: {}", tag, orderResponse); } if(orderStatus.isDone()) { this.alertService.alert( "targha_cycles", tag + " order is done: " + orderResponse); log.info("{} order is done: {}", tag, orderResponse); order.set(null); doneOrders.add(orderResponse); lastFilledOrder.set(orderResponse); if(otherOrder.get() != null) { log.info("Cancel other order {}", otherOrder.get()); binanceFuturesClient.cancelOrder( symbol.getBase() + symbol.getQuote(), Long.parseLong(otherOrder.get().getOrderId())); otherOrder.set(null); } return true; } return false; } public void stop() { stopped.set(true); } public OrderResponse getOpenAskOrder() { return askOrder.get(); } public OrderResponse getOpenBidOrder() { return bidOrder.get(); } public List getAllOrders() { return Stream.concat( Stream.of(askOrder, bidOrder).map(AtomicReference::get).filter(Objects::nonNull), doneOrders.stream()).toList(); } public OrderResponse getLastFilledOrder() { return lastFilledOrder.get(); } public double getPosition() { return binanceFuturesClient.getPosition(symbol); } public double getTradeAverage() { return tradesMovingSequence.isFull() ? tradesMovingSequence.getMean() : Double.NaN; } @Override public Object getMonitoring() { return new Monitoring( lastPrice, priceChanges.sum(), doneOrders.size(), attemptedOrders.get()); } public record Monitoring( Price lastPrice, long priceChanges, int doneOrders, long attemptedOrders ) {} }