# 0. Introduction - Purpose of the trading bot. This trading bot is designed to trade on Binance Futures using a combination of trading strategies and technical indicators like RSI, Bollinger Bands, and MACD. It also incorporates psychological trading patterns like double bottoms, head and shoulders, etc., to identify potential entry and exit points for long and short positions. Additionally, the bot features a double-down functionality, allowing users to average down their entry price when a predetermined percentage loss occurs. # 1. Import necessary libraries and establish a connection to Binance API. # 1.1. Importing Libraries import websocket, json, requests, threading import talib, numpy as np, pandas as pd import datetime, time, keyboard from functools import wraps from queue import Queue import ccxt from math import floor import math from flask import Flask, request, abort from binance.exceptions import BinanceAPIException # 1.2. Defining Input Parameters quote_currency = 'USDT' base_currency = 'BTC' symbol = 'BTCUSDT' trade_amount_pct = 0.10 #10% balance as base trade before leverage enable_leverage = True leverage = 50 enable_double_down = True double_down_percent = 5 # Percentage of balance to use for double-down functionality stop_loss_pct = 5.0 # 2% stop loss take_profit_pct = 25.0 # 5% take profit trailing_stop_loss_pct = 2.0 interval = 1 # in seconds timeframe = '5m' # 5 minute chart lookback = 35 # 30-50short, 100avg, 2-500long entry_prices = {} asset_info = {'BTC': {'precision': 3, 'step_size': 0.001},'USDT': {'precision': 2, 'step_size': 0.01}} # Define profit lock ratios enable_profit_lock_ratio = False PLR_Short = 4.0 # Set the desired profit lock ratio for short positions (e.g., 4:1) PLR_Long = 2.0 # Set the desired profit lock ratio for long positions (e.g., 2:1) PLR_DDLong = 4.0 # Set the desired profit lock ratio for double-down long positions (e.g., 4:1) PLR_DDShort = 5.0 # Set the desired profit lock ratio for double-down short positions (e.g., 5:1) # 1.3. Order types ORDER_TYPE_MARKET = 'MARKET' ORDER_TYPE_LIMIT = 'LIMIT' ORDER_TYPE_STOP_MARKET = 'STOP_MARKET' ORDER_TYPE_TAKE_PROFIT_MARKET = 'TAKE_PROFIT_MARKET' order_type = "market" # or "limit" or any other valid order type # 1.4 positions def close_current_position(): position_info["direction"] = None position_info["open_price"] = None position_info["open_time"] = None # Reset other properties if needed position_info = { "direction": None, "open_price": None, "open_time": None, # Add more properties if needed } # 2.1 WebSocket Connection and Callback Function def on_message(ws, message): data = json.loads(message) if 'k' in data: kline = data['k'] if kline['x']: # Check if the candle is closed q.put(kline) # Add the closed candle to the queue def on_close(ws): print("WebSocket connection closed") def on_open(ws): print("WebSocket connection opened") def create_websocket_connection(on_message, on_error, on_close, on_open): ws = websocket.WebSocketApp(f"wss://stream.binance.com:9443/ws/{symbol.lower()}@kline_{interval}", on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open) ws.run_forever() def websocket_thread(symbol, timeframe): threading.Thread(target=create_websocket_connection, args=(on_message, None, on_close, on_open)).start() q = Queue() websocket_thread(symbol, timeframe) # 2.2 Webhook & signals from it # 2.2.1 Webhook app = Flask(__name__) # Webhook @app.route('/webhook', methods=['POST']) def webhook(): if request.method == 'POST': alert_payload_str = request.data.decode('utf-8') try: alert_payload = json.loads(alert_payload_str) print(f"Received alert payload: {alert_payload}") except json.JSONDecodeError: print("Error: Failed to decode JSON payload") print(f"Received alert payload: {alert_payload}") return 'failure', 400 last_signal = determine_last_signal(alert_payload) print(f"Determined last signal: {last_signal}") manage_position(last_signal, symbol, stop_loss_pct, trailing_stop_loss_pct, take_profit_pct) return 'success', 200 else: return 'failure', 400 # Determine Last Signal def determine_last_signal(alert_payload): signal_type = alert_payload.get('type') if signal_type is None: return None elif signal_type == 'open_short': return 'sell' elif signal_type == 'open_long': return 'buy' else: print(f"Unknown signal type: {signal_type}") return None # 3 Data Fetch & Initializing Binance Client # 3.1. API keys api_key = '5akw4UEJOtffqtsU5Pj2nxjvLjys1HVEjea4PsttdCwl7tYTWF0zbIn8kaDe55JN' api_secret = '9TJcQYmItg3ed8RwZmvngP7kPg7JGySH79nXXjx8BWKCQFQJ1K9U7Nctu4jsBAFb' exchange = ccxt.binance({'apiKey': api_key,'secret': api_secret,'enableRateLimit': True, 'options': {'defaultType': 'future',}}) # 3.2 Set leverage def set_leverage(symbol, leverage): try: response = exchange.fapiPrivate_post_leverage({ 'symbol': symbol.replace('/', ''), 'leverage': leverage }) print(f"Changed leverage for {symbol} to {leverage}x") except ccxt.BaseError as e: print(f"Error setting leverage for {symbol}: {e}") set_leverage(symbol, leverage) # Set leverage defined in inputs # 3.3 Fetch Historics data def fetch_historical_data(symbol, timeframe, limit=500): try: klines = exchange.fetch_ohlcv(symbol, timeframe, limit=limit) except ccxt.BaseError as e: print(f"Error fetching historical data: {e}") return None df = pd.DataFrame(klines, columns=['open_time', 'open', 'high', 'low', 'close', 'volume']) df['open_time'] = pd.to_datetime(df['open_time'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(np.float64) print(df.head()) return df # 3.3. Get Symbol Info def get_symbol_info(symbol): try: exchange.load_markets() market = exchange.market(symbol) filters = market['info']['filters'] lot_size_filter = next(filter(lambda x: x['filterType'] == 'LOT_SIZE', filters), None) if lot_size_filter: max_quantity = float(lot_size_filter['maxQty']) min_quantity = float(lot_size_filter['minQty']) step_size = float(lot_size_filter['stepSize']) return max_quantity, min_quantity, step_size else: return None, None, None except KeyError: print(f"Error fetching symbol info for {symbol}") return None, None, None # 3.4 Fetching account balance % of which will be used as base calc def get_balance(): try: futures_account = exchange.fetch_balance() total_balance = float(futures_account['total']['USDT']) available_balance = float(futures_account['free']['USDT']) print(f"Total Futures Account Balance: {total_balance}") print(f"Available Margin Balance: {available_balance}") return total_balance, available_balance except ccxt.BaseError as e: print(f"Error getting futures account balance: {e}") return 0, 0 # Fetch the balance total_balance, available_balance = get_balance() balance = total_balance # Use the total balance as the balance # 3.5 Fetch current Mark price used in base calc trailing stop pct & entry point def fetch_mark_price(symbol): try: # Ensure that the symbol variable is a string if not isinstance(symbol, str): raise ValueError(f"Invalid symbol type: {type(symbol)}. Symbol must be a string.") # Add a debugging statement to log the value of the symbol variable print(f"Fetching mark price for symbol: {symbol}") # Fetch the ticker data for the given symbol ticker = exchange.fetch_ticker(symbol) # Extract the mark price from the ticker data mark_price = float(ticker['last']) print(f"Mark price for {symbol}: {mark_price}") return mark_price except ccxt.BaseError as e: print(f"Error fetching mark price for {symbol}: {e}") return None except KeyError: print(f"'last' key not found in the response for {symbol}") return None except ValueError as e: print(e) return # 3.5.1. step size and rounding def get_asset_step_size(symbol): try: exchange.load_markets() market = exchange.market(symbol) step_size = market['limits']['amount']['min'] return step_size except KeyError: print(f"Error fetching step size for {symbol}") return None def round_step_size(value, step_size): step = int(1 / step_size) decimals = int(-math.log10(step_size)) rounded_value = round(value * step) / step return round(rounded_value, decimals) # 3.5.2. Calculate Quantity def calculate_quantity(symbol, position_size): try: # Get the current price of the trading symbol ticker_info = exchange.fetch_ticker(symbol) # Use 'fetch_ticker' method price = ticker_info['last'] # Use the 'last' price from the ticker info # Calculate the quantity based on the position size quantity = position_size / price return quantity except Exception as e: print(f"Error calculating quantity: {e}") # 3.5.3 Adjusted Balance /leveraged amount def calculate_adjusted_balance(account_balance, leverage, trade_amount_pct): trade_amount = account_balance * trade_amount_pct # Calculate trade_amount using trade_amount_pct adjusted_balance = trade_amount * leverage return adjusted_balance # Calculate adjusted balance adjusted_balance = calculate_adjusted_balance(balance, leverage, trade_amount_pct) # 3.5.4 Position calculation and management def calculate_trading_size(balance, trade_amount_pct, symbol): entry_price = fetch_mark_price(symbol) if entry_price is None: print(f"Error: Unable to calculate trading size for {symbol}") return None position_size = (adjusted_balance * trade_amount_pct / 100) / entry_price return position_size # Calculate adjusted balance adjusted_balance = calculate_adjusted_balance(balance, leverage, trade_amount_pct) # Calculate the position size based on the adjusted balance and trade amount percentage mark_price = fetch_mark_price(symbol) position_size = calculate_trading_size(adjusted_balance, trade_amount_pct, symbol) # 3.6 Defining double-down functionality and re-entry logic (optional) def double_down_position(direction, symbol, quantity, leverage, take_profit_pct): direction = direction.upper() if direction not in ["LONG", "SHORT"]: raise ValueError("Invalid direction. Use either 'LONG' or 'SHORT'.") print(f"Doubling down on {direction} position") open_position(direction, quantity * 2, symbol, take_profit_pct=10) # 3.5 fetch_latest_prices def fetch_latest_prices(symbol): try: ticker = exchange.fetch_ticker(symbol) mark_price = ticker.get('markPrice', ticker['last']) # Use markPrice if available, else use the last traded price return ticker['bid'], ticker['ask'], mark_price except ccxt.BaseError as e: print(f"Error fetching latest prices for {symbol}: {e}") return None, None, None # 3.6 Calculate position size def calc_position_size(account_balance, risk_percent, stop_loss_percent): risk_amount = account_balance * risk_percent position_size = risk_amount / (stop_loss_percent * account_balance) return position_size # 3.7 Calculate order quantity def calc_order_quantity(balance, trade_amount_pct, symbol, leverage): try: # Get the current price of the trading symbol ticker_info = exchange.fetch_ticker(symbol) # Use 'fetch_ticker' method price = ticker_info['last'] # Use the 'last' price from the ticker info # Calculate the position size based on the balance, leverage, and trade_amount_pct position_size = (balance * leverage) * trade_amount_pct # Calculate the quantity based on the position size asset_info = exchange.fetch_contract(symbol) base_currency = asset_info['base']['id'] precision = asset_info['base']['precision'] step_size = asset_info['base']['stepSize'] quantity = position_size / price return float(format(floor(quantity / step_size) * step_size, f'.{precision}f')) except Exception as e: print(f"Error calculating quantity: {e}") # 3.8 Calculate stop loss price def calc_stop_loss_price(entry_price, stop_loss_pct, position_type): if position_type == "long": return entry_price * (1 - stop_loss_pct / 100) elif position_type == "short": return entry_price * (1 + stop_loss_pct / 100) # 3.9 Calculate take profit price def calc_take_profit_price(entry_price, take_profit_percent, position): if position == "long": take_profit_price = entry_price * (1 + take_profit_percent) else: take_profit_price = entry_price * (1 - take_profit_percent) return take_profit_price # 3.5.4 Calc loss profists stopped def calculate_stop_loss(mark_price, stop_loss_pct): return mark_price * (1 - stop_loss_pct / 100) def set_stop_loss_take_profit(symbol, position_size, entry_price, take_profit_pct, stop_loss_pct, position_direction, oco_distance_pct): stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, position_direction, stop_loss_pct, take_profit_pct, entry_price, oco_distance_pct) side = "SELL" if position_direction == "long" else "BUY" stop_loss_order, take_profit_order = create_oco_order( symbol=symbol, side=side, stop_loss_price=stop_loss_price, take_profit_price=take_profit_price, quantity=position_size ) print("Stop Loss Order:", stop_loss_order) print("Take Profit Order:", take_profit_order) # 3.10 calculate_stop_loss_take_profit_prices def calculate_stop_loss_take_profit_prices(symbol, position_direction, stop_loss_pct, take_profit_pct, entry_price, oco_distance_pct): # Calculate the OCO distance in price oco_distance = entry_price * (oco_distance_pct / 100) # Adjust the stop loss and take profit prices based on the OCO distance if position_direction == "long": stop_loss_price = entry_price - oco_distance take_profit_price = entry_price + oco_distance else: stop_loss_price = entry_price + oco_distance take_profit_price = entry_price - oco_distance return stop_loss_price, take_profit_price # 3.11 caclulate_position_loss def calculate_position_loss(symbol, position_direction): mark_price = fetch_mark_price(symbol) if mark_price is None: print(f"Error fetching mark price for {symbol}. Aborting the operation.") return None entry_price = position['entry_price'] position_loss = (mark_price - entry_price) / entry_price * 100 if position_direction == "short": position_loss = -position_loss return position_loss # 3.12 set_trailing_stop_loss def set_trailing_stop_loss(symbol, order_quantity, trailing_stop_loss_pct, position): if position == "long": side = "sell" elif position == "short": side = "buy" params = { "symbol": symbol.replace("/", ""), "quantity": order_quantity, "side": side, "type": "TRAILING_STOP_MARKET", "callbackRate": trailing_stop_loss_pct } try: response = exchange.fapiPrivatePostOrder(params) print(f"Trailing stop loss set for {symbol} at {trailing_stop_loss_pct}%") except ccxt.BaseError as e: print(f"Error setting trailing stop loss for {symbol}: {e}") #.3.12.1 def update_trailing_stop_loss(symbol, entry_price, trailing_stop_loss_pct, stop_loss_pct, interval): # Update the position update_position(symbol, entry_price, stop_loss_pct, trailing_stop_loss_pct) # Print message print("Updating trailing stop loss") # Schedule the function to run again after a specified interval threading.Timer(interval, update_trailing_stop_loss, args=[symbol, entry_price, trailing_stop_loss_pct, stop_loss_pct, interval]).start() # The following code is commented out because it's unclear what it's supposed to do: # else: # print("Not in position, skipping trailing stop loss update") # # threading.Timer(60, update_trailing_stop_loss).start() # update_trailing_stop_loss_periodically() # This function is not defined in the code provided # 4.0 Position Management - Evaluate Existing Positioning # 4.1 reenter_position def get_position_info(symbol): position_info = client.futures_position_information(symbol=symbol) if position_info: for position in position_info: if position['symbol'] == symbol: return { "symbol": position["symbol"], "positionAmt": float(position["positionAmt"]), "entryPrice": float(position["entryPrice"]), "unrealizedProfit": float(position["unRealizedProfit"]), "liquidationPrice": float(position["liquidationPrice"]), "leverage": float(position["leverage"]), "maxNotionalValue": float(position["maxNotionalValue"]), "marginType": position["marginType"], "isolatedMargin": float(position["isolatedMargin"]), "isAutoAddMargin": position["isAutoAddMargin"], "positionSide": position["positionSide"] } return None position, position_direction = get_position_info(symbol) # 3.1 open_position def get_position_direction(position_amt): if position_amt > 0: return "long" elif position_amt < 0: return "short" else: return None def open_position(position_direction, position_size, symbol, take_profit_pct, stop_loss_pct, order_type): # Get the mark price for the futures contract entry_price = fetch_mark_price(symbol) if entry_price is None: print(f"Error fetching mark price for {symbol}. Aborting the operation.") return None side = "buy" if position_direction == "long" else "sell" # Set stop loss and take profit set_stop_loss_take_profit(symbol, position_size, entry_price, take_profit_pct, stop_loss_pct, position_direction) # Calculate take_profit and stop_loss take_profit = entry_price * (1 + take_profit_pct / 100) stop_loss = entry_price * (1 - stop_loss_pct / 100) try: # Create OCO order after successfully opening a position oco_order = create_oco_order( symbol=symbol.replace("/", ""), position_direction=position_direction, quantity=position_size, stop_loss_price=stop_loss, take_profit_price=take_profit, entry_price=entry_price ) return entry_price except ccxt.BaseError as e: print(f"Error opening {position_direction} position: {e}") return None def reenter_position(position_direction, position_size, symbol, stop_loss_pct, take_profit_pct, order_type, enable_profit_lock_ratio, profit_lock_ratio): close_position(symbol, position_info, stop_loss) entry_price = open_position(position_direction, position_size, symbol, take_profit_pct, stop_loss_pct, order_type) if position_direction == "long": print(f"Reentered long position with {position_size} {symbol}, stop loss: {stop_loss_pct}%, take profit: {take_profit_pct}%") elif position_direction == "short": print(f"Reentered short position with {position_size} {symbol}, stop loss: {stop_loss_pct}%, take profit: {take_profit_pct}%") if enable_profit_lock_ratio: print(f"Profit lock ratio: {profit_lock_ratio}") return entry_price def set_trailing_stop_loss(side, position_size, symbol, stop_loss_pct): mark_price = fetch_mark_price(symbol) stop_loss = calculate_stop_loss(mark_price, stop_loss_pct) if side == "LONG": stop_side = "SELL" elif side == "SHORT": stop_side = "BUY" try: order = exchange.create_order( symbol=symbol, side=stop_side, type=ORDER_TYPE_STOP_MARKET, amount=position_size, params={ 'stopPrice': stop_loss, } ) print(f"Set trailing stop loss for {symbol} to {stop_loss}") except BinanceAPIException as e: print(f"Error setting trailing stop loss for {symbol}: {e}") def set_stop_loss_take_profit(symbol, position_size, entry_price, take_profit_pct, stop_loss_pct, position_direction, oco_distance_pct): if position_direction == "long": if is_double_down: plr = PLR_DDLong else: plr = PLR_Long else: # position_direction == "short" if is_double_down: plr = PLR_DDShort else: plr = PLR_Short take_profit_pct = stop_loss_pct * plr stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, position_direction, stop_loss_pct, take_profit_pct, entry_price, oco_distance_pct) side = "SELL" if position_direction == "long" else "BUY" stop_loss_order, take_profit_order = create_oco_order( symbol=symbol, side=side, stop_loss_price=stop_loss_price, take_profit_price=take_profit_price, quantity=position_size ) print("Stop Loss Order:", stop_loss_order) print("Take Profit Order:", take_profit_order) try: # Create OCO order for stop loss and take profit oco_order = create_oco_order( symbol=symbol.replace("/", ""), side=stop_side, quantity=order_quantity, stop_loss_price=stop_loss_price, take_profit_price=take_profit_price ) print(f"OCO order created for {symbol} with stop loss at {stop_loss_price} and take profit at {take_profit_price}") except ccxt.BaseError as e: print(f"Error setting OCO order: {e}") def positioning(signal, symbol, balance, leverage, trade_amount_pct, stop_loss_pct, take_profit_pct, enable_profit_lock_ratio): entry_price = fetch_mark_price(symbol) if entry_price is None: print(f"Error: Unable to fetch mark price for {symbol}") return position_info = get_position_info(symbol) position_amt = float(position_info["positionAmt"]) position_direction = get_position_direction(position_amt) if signal == "b": if position_direction == "long": reenter_position("long", position_amt * 2, symbol, stop_loss_pct, take_profit_pct, enable_profit_lock_ratio, 4.0) else: if position_direction == "short": close_position(symbol) enter_long_position(balance, leverage, trade_amount_pct, symbol, stop_loss_pct, take_profit_pct, entry_price, enable_profit_lock_ratio, 3.0) elif signal == "s": if position_direction == "short": reenter_position("short", position_amt * 2, symbol, stop_loss_pct, take_profit_pct, enable_profit_lock_ratio, 5.0) else: if position_direction == "long": close_position(symbol) enter_short_position(balance, leverage, trade_amount_pct, symbol, stop_loss_pct, take_profit_pct, entry_price, enable_profit_lock_ratio, 4.0) # 4.4 Reposition Management with trailing stop loss % from mark_price def update_position(symbol, entry_price, stop_loss_pct, trailing_stop_loss_pct, position_type, position_size): account_balance = get_balance() mark_price = fetch_mark_price(symbol) quantity, _ = calculate_trading_size(account_balance, mark_price, leverage, trade_amount_pct) adjusted_balance = calculate_adjusted_balance(account_balance, leverage, trade_amount_pct) position_size = calculate_trading_size(adjusted_balance, trade_amount_pct, symbol) if mark_price is None: print(f"Error fetching mark price for {symbol}. Aborting the operation.") return stop_loss, take_profit = calculate_stop_loss_take_profit(entry_price, stop_loss_pct, trailing_stop_loss_pct) stop_side = "SELL" if position_type == "LONG" else "BUY" if stop_loss is None: print("Error calculating stop loss. Aborting the operation.") return try: order = exchange.create_order( symbol=symbol.replace("/", ""), side=stop_side, type=ORDER_TYPE_STOP_MARKET, amount=position_size, params={'stopPrice': stop_loss, 'closePosition': True} ) print(f"Updated trailing stop loss for {symbol} to {stop_loss}") except BinanceAPIException as e: print(f"Error updating trailing stop loss for {symbol}: {e}") return None # 4.6 Enter Position based on triggers def enter_long_position(new_position_amt, symbol, stop_loss_pct, take_profit_pct, enable_profit_lock_ratio, profit_lock_ratio): print(f"Entered long position with {new_position_amt} {symbol}, stop loss: {stop_loss_pct}%, take profit: {take_profit_pct}%") if enable_profit_lock_ratio: print(f"Profit lock ratio: {profit_lock_ratio}") if not balance or not leverage or not trade_amount_pct or not symbol or not stop_loss_pct or not take_profit_pct or not entry_price: total_balance, available_balance = get_balance() entry_price = fetch_mark_price(symbol) adjusted_balance = calculate_adjusted_balance(balance, leverage, trade_amount_pct) # Calculate the position size based on the adjusted balance and trade amount percentage calculated_position_size = calculate_trading_size(adjusted_balance, trade_amount_pct, symbol) print("Entering long position...") # Call create_order to place the market order and get the entry price side = "BUY" # Define the side variable for a long position order_type = "MARKET" entry_price = create_order(side, order_type, calculated_position_size, None) if entry_price is not None: update_position(symbol, entry_price, stop_loss_pct, take_profit_pct) # Set a trailing stop loss for the long position set_trailing_stop_loss("LONG", position_size, symbol, stop_loss_pct) # Update the trailing stop loss periodically update_trailing_stop_loss(symbol, entry_price, trailing_stop_loss_pct, stop_loss_pct, interval) # Create OCO order after successfully opening a position stop_loss, take_profit = calculate_stop_loss_take_profit(entry_price, stop_loss_pct, take_profit_pct) create_oco_order(symbol, "SELL", calculated_position_size, stop_loss, take_profit) def enter_short_position(new_position_amt, symbol, stop_loss_pct, take_profit_pct, enable_profit_lock_ratio, profit_lock_ratio): print(f"Entered short position with {new_position_amt} {symbol}, stop loss: {stop_loss_pct}%, take profit: {take_profit_pct}%") if enable_profit_lock_ratio: print(f"Profit lock ratio: {profit_lock_ratio}") if not balance or not leverage or not trade_amount_pct or not symbol or not stop_loss_pct or not take_profit_pct or not entry_price: total_balance, available_balance = get_balance() entry_price = fetch_mark_price(symbol) adjusted_balance = calculate_adjusted_balance(balance, leverage, trade_amount_pct) # Calculate the position size based on the adjusted balance and trade amount percentage calculated_position_size = calculate_trading_size(adjusted_balance, trade_amount_pct, symbol) print("Entering short position...") # Call create_order to place the market order and get the entry price side = "SELL" # Define the side variable for a short position order_type = "MARKET" entry_price = create_order(side, order_type, calculated_position_size, None) if entry_price is not None: update_position(symbol, entry_price, stop_loss_pct, take_profit_pct) # Set a trailing stop loss for the short position set_trailing_stop_loss("SHORT", position_size, symbol, stop_loss_pct) # Update the trailing stop loss periodically update_trailing_stop_loss(symbol, entry_price, trailing_stop_loss_pct, stop_loss_pct, interval) # Create OCO order after successfully opening a position stop_loss, take_profit = calculate_stop_loss_take_profit(entry_price, stop_loss_pct, take_profit_pct) create_oco_order(symbol, "BUY", calculated_position_size, stop_loss, take_profit) # 4.4 Reposition Management with trailing stop loss % from mark_price def update_position(symbol, entry_price, stop_loss_pct, trailing_stop_loss_pct, position_type, position_size): account_balance = get_balance() mark_price = fetch_mark_price(symbol) quantity, _ = calculate_trading_size(account_balance, mark_price, leverage, trade_amount_pct) adjusted_balance = calculate_adjusted_balance(account_balance, leverage, trade_amount_pct) position_size = calculate_trading_size(adjusted_balance, trade_amount_pct, symbol) if mark_price is None: print(f"Error fetching mark price for {symbol}. Aborting the operation.") return stop_loss, take_profit = calculate_stop_loss_take_profit(entry_price, stop_loss_pct, trailing_stop_loss_pct) stop_side = "SELL" if position_type == "LONG" else "BUY" if stop_loss is None: print("Error calculating stop loss. Aborting the operation.") return try: order = exchange.create_order( symbol=symbol.replace("/", ""), side=stop_side, type=ORDER_TYPE_STOP_MARKET, amount=position_size, params={'stopPrice': stop_loss, 'closePosition': True} ) print(f"Updated trailing stop loss for {symbol} to {stop_loss}") except BinanceAPIException as e: print(f"Error updating trailing stop loss for {symbol}: {e}") return None # 4. Positioning based on triggers from webhook # 4.1 Open Long Position def open_long_position(symbol, order_quantity, asset_info, stop_loss_pct, take_profit_pct, position, double_down=False): # Fetch latest prices prices = fetch_latest_prices(symbol) entry_price = prices['ask'] # Place the order execute_market_order(symbol, 'buy', order_quantity, asset_info) # Calculate stop loss price, and take profit price stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, position_direction, stop_loss_pct, take_profit_pct, entry_price) if not double_down: stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, position, stop_loss_pct, take_profit_pct, is_double_down=True) # 4.2 Open Short Position # 4.2. Create Order def create_order(side, order_type, quantity, price): symbol = "BTCUSDT" step_size = get_asset_step_size(symbol) rounded_quantity = round_step_size(quantity, step_size) order_params = {"symbol": symbol} if price is not None: order_params["price"] = price try: print(f"Placing order: symbol={symbol}, side={side}, quantity={rounded_quantity}, order_type={order_type}") order = exchange.create_order( symbol=symbol.replace("/", ""), side=side, type=order_type, amount=rounded_quantity, params=order_params ) print(f"Order placed successfully: {order}") entry_price = float(order["price"]) # Extract the entry price from the order return entry_price except BinanceAPIException as e: print(f"Error creating order: {e}") return None def create_custom_oco_order(symbol, side, stop_loss_price, take_profit_price, quantity): params = { "symbol": symbol, "side": side, "type": "STOP_MARKET", "stopPrice": stop_loss_price, "quantity": quantity, "newOrderRespType": "FULL" } stop_loss_order = client.create_order(**params) params["type"] = "TAKE_PROFIT_MARKET" params["stopPrice"] = take_profit_price take_profit_order = client.create_order(**params) return stop_loss_order, take_profit_order def open_short_position(symbol, order_quantity, asset_info, stop_loss_pct, take_profit_pct, position, double_down=False): # Fetch latest prices prices = fetch_latest_prices(symbol) entry_price = prices['bid'] # Place the order execute_market_order(symbol, 'sell', order_quantity, asset_info) # Calculate stop loss price, and take profit price stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, position, stop_loss_pct, take_profit_pct, is_double_down=False) if not double_down: stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, position, stop_loss_pct, take_profit_pct, is_double_down=True) # 4.3 Close Long Position def close_long_position(symbol, order_quantity): prices = fetch_latest_prices(symbol) close_price = prices['bid'] close_order(symbol, "long", order_quantity, close_price) # 4.4 Close Short Position def close_short_position(symbol, order_quantity): prices = fetch_latest_prices(symbol) close_price = prices['ask'] close_order(symbol, "short", order_quantity, close_price) def close_position(symbol, position_info, stop_loss): print(f"Closing position") try: position_amt = float(position_info["positionAmt"]) if position_amt > 0: # long position order = exchange.create_order( symbol=symbol, side="sell", type="market", amount=-position_amt, params={ "reduceOnly": True, } ) elif position_amt < 0: # short position order = exchange.create_order( symbol=symbol, side="buy", type="market", amount=-position_amt, params={ "reduceOnly": True, } ) else: print(f"Position amount is zero for {symbol}") return None print(f"Closed position: {order}") return order except ccxt.BaseError as e: print(f"Error closing position for {symbol}: {e}") return None # Call the positioning function positioning(position_direction, symbol, balance, leverage, trade_amount_pct, stop_loss_pct, take_profit_pct) def terminate_bot(): print("Terminating bot...") exit(0) keyboard.add_hotkey('ctrl+alt+l', enter_long_position) keyboard.add_hotkey('ctrl+alt+s', enter_short_position) keyboard.add_hotkey('ctrl+alt+c', close_current_position) keyboard.add_hotkey('ctrl+alt+x', terminate_bot) # 4.5 place_order def place_order(symbol, position, order_quantity, entry_price, stop_loss_price, take_profit_price): if position == "long": side = "buy" elif position == "short": side = "sell" else: print("Invalid position") return try: order = exchange.create_market_order(symbol, side, order_quantity) print(f"{position.capitalize()} position opened for {symbol} with amount {order_quantity}") # Set stop loss and take profit prices set_stop_loss_take_profit(symbol, order_quantity, stop_loss_price, take_profit_price, position) except ccxt.BaseError as e: print(f"Error placing order: {e}") # 4.6 close_order def close_order(symbol, position, order_quantity): if position == "long": side = "sell" elif position == "short": side = "buy" else: print("Invalid position") return try: order = exchange.create_market_order(symbol, side, order_quantity) print(f"{position.capitalize()} position closed for {symbol} with amount {order_quantity}") except ccxt.BaseError as e: print(f"Error closing order: {e}") def place_stop_loss_order(symbol, amount, stop_price, side): try: # Place stop-loss order exchange.fapiPrivate_post_order(symbol, 'stop', side, amount, None, {'stopPrice': stop_price}) print(f"Stop-loss order placed for {symbol} at price {stop_price}") except ccxt.BaseError as e: print(f"Error placing stop-loss order: {e}") def close_current_position(): print("Closing position...") position_info = get_position_info(symbol) if position_info: stop_loss, _ = calculate_stop_loss_take_profit(fetch_mark_price(symbol), stop_loss_pct, take_profit_pct) close_position(symbol, position_info, stop_loss) else: print("No position found.") def close_position_with_limit_order(direction, calculated_position_size, symbol, take_profit_price): # Define the side (buy or sell) based on the direction side = 'buy' if direction == 'SHORT' else 'sell' # Create the limit order to close the position at the take profit price try: order = exchange.create_order( symbol=symbol.replace("/", ""), # Update here type='limit', side=side, amount=calculated_position_size, price=take_profit_price ) print(f'Limit order to close position created: {order}') except ccxt.BaseError as e: print(f'Error creating limit order to close position: {e}') # Call the positioning function positioning(position_direction, symbol, balance, leverage, trade_amount_pct, stop_loss_pct, take_profit_pct) def terminate_bot(): print("Terminating bot...") exit(0) keyboard.add_hotkey('ctrl+alt+l', enter_long_position) keyboard.add_hotkey('ctrl+alt+s', enter_short_position) keyboard.add_hotkey('ctrl+alt+c', close_current_position) keyboard.add_hotkey('ctrl+alt+x', terminate_bot) # 5 Position managemant & Functions def manage_position(signal, symbol, stop_loss_pct, trailing_stop_loss_pct, take_profit_pct, trade_amount_pct): position_info = get_position_info(symbol) position_exists = position_info is not None position_direction = None if position_exists: position_amt = float(position_info["positionAmt"]) position_direction = "long" if position_amt > 0 else "short" global position balance = get_balance()[0] order_quantity = calc_order_quantity(balance, trade_amount_pct, symbol, leverage) if signal == "buy": if position_direction == "short": print("Closing short position and opening long position") close_position(symbol, position_info, stop_loss) entry_price = reenter_position("long", position_size, symbol, take_profit_pct, stop_loss_pct, order_type) elif not position_exists: print("Opening long position") entry_price = open_position("long", position_size, symbol, take_profit_pct, stop_loss_pct, order_type) elif position_direction == "long": print("Double down long position") entry_price = open_position("long", position_size, symbol, take_profit_pct, stop_loss_pct, order_type) elif signal == "sell": if position_direction == "long": print("Closing long position and opening short position") close_position(symbol, position_info, stop_loss) entry_price = reenter_position("short", position_size, symbol, take_profit_pct, stop_loss_pct, order_type) elif not position_exists: print("Opening short position") entry_price = open_position("short", position_size, symbol, take_profit_pct, stop_loss_pct, order_type) elif position_direction == "short": print("Double down short position") entry_price = open_position("short", position_size, symbol, take_profit_pct, stop_loss_pct, order_type) else: print("No action required") # Set the trailing stop loss set_trailing_stop_loss(symbol, order_quantity, trailing_stop_loss_pct, position) # 5.2 double_down def double_down_long_position(symbol, balance, double_down_percent, asset_info, stop_loss_pct, take_profit_pct, leverage): print("Double Down Long") order_quantity = calc_order_quantity(balance, double_down_percent, symbol, leverage) open_long_position(symbol, order_quantity, asset_info, stop_loss_pct, take_profit_pct, 'long', double_down=True) # Change here def double_down_short_position(symbol, balance, double_down_percent, asset_info, stop_loss_pct, take_profit_pct, leverage): print("Double Down Short") order_quantity = calc_order_quantity(balance, double_down_percent, symbol, leverage) open_short_position(symbol, order_quantity, asset_info, stop_loss_pct, take_profit_pct, 'short', double_down=True) # Change here # 4.5. Execute Market Order def execute_market_order(last_signal): if last_signal == "buy": open_long_position() elif last_signal == "sell": open_short_position() elif last_signal == "b+b": # or any condition you want to use for double down long double_down_long_position() elif last_signal == "s+s": # or any condition you want to use for double down short double_down_short_position() else: print(f"Unknown last signal: {last_signal}") def manual_buy(symbol, balance, trade_amount_pct, asset_info, stop_loss_pct, take_profit_pct, leverage): order_quantity = calc_order_quantity(balance, trade_amount_pct, symbol, leverage) execute_market_order(symbol, 'buy', order_quantity, asset_info) stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, 'long', stop_loss_pct, take_profit_pct) set_stop_loss_take_profit(symbol, order_quantity, stop_loss_price, take_profit_price, 'long') def manual_sell(symbol, balance, trade_amount_pct, asset_info, stop_loss_pct, take_profit_pct, leverage): order_quantity = calc_order_quantity(balance, trade_amount_pct, symbol, leverage) execute_market_order(symbol, 'sell', order_quantity, asset_info) stop_loss_price, take_profit_price = calculate_stop_loss_take_profit_prices(symbol, 'short', stop_loss_pct, take_profit_pct) set_stop_loss_take_profit(symbol, order_quantity, stop_loss_price, take_profit_price, 'short') if __name__ == "__main__": app.run(host='0.0.0.0', port=80) while True: if keyboard.is_pressed('b'): print("Manual Buy") manual_buy(symbol, balance, trade_amount_pct, asset_info, stop_loss_pct, take_profit_pct, leverage) elif keyboard.is_pressed('s'): print("Manual Sell") manual_sell(symbol, balance, trade_amount_pct, asset_info, stop_loss_pct, take_profit_pct, leverage) time.sleep(1)