Skip to main content

AI Trading System

Build a complete, production-grade AI trading system that combines market analysis, LLM-powered decision making, risk management, and autonomous trade execution.

System Architecture

┌────────────────────────────────────────────────────────────────────┐
│ AI Trading System │
├────────────┬─────────────┬─────────────┬─────────────┬────────────┤
│ Data │ Analysis │ Decision │ Execution │ Monitor │
│ Layer │ Engine │ Engine │ Layer │ Layer │
├────────────┼─────────────┼─────────────┼─────────────┼────────────┤
│ • Prices │ • Technical │ • LLM Core │ • Swaps │ • PnL │
│ • Volume │ • Sentiment │ • Rules │ • Batching │ • Alerts │
│ • Events │ • On-chain │ • Risk Mgmt │ • Gas Opt │ • Logs │
└────────────┴─────────────┴─────────────┴─────────────┴────────────┘


┌───────────────────────────────┐
│ ZeroQuant Vault │
│ (Secure Execution Layer) │
└───────────────────────────────┘

Installation

pip install zeroquant langchain langchain-openai web3 aiohttp websockets python-dotenv

Core Components

1. Real-Time Price Feed

# src/data/price_feed.py
import asyncio
import json
from dataclasses import dataclass
from typing import Callable, Optional
import websockets


@dataclass
class PriceUpdate:
symbol: str
price: float
volume_24h: float
change_24h: float
timestamp: int


class PriceFeed:
def __init__(self, symbols: list[str]):
self.symbols = symbols
self.prices: dict[str, PriceUpdate] = {}
self.callbacks: list[Callable[[PriceUpdate], None]] = []
self._running = False

def on_price(self, callback: Callable[[PriceUpdate], None]) -> None:
self.callbacks.append(callback)

async def connect(self) -> None:
streams = "/".join(f"{s.lower()}usdt@ticker" for s in self.symbols)
url = f"wss://stream.binance.com:9443/ws/{streams}"

self._running = True

while self._running:
try:
async with websockets.connect(url) as ws:
print("Price feed connected")
async for message in ws:
if not self._running:
break
await self._handle_message(message)
except Exception as e:
print(f"Price feed error: {e}")
if self._running:
await asyncio.sleep(5)

async def _handle_message(self, message: str) -> None:
data = json.loads(message)
update = PriceUpdate(
symbol=data["s"].replace("USDT", ""),
price=float(data["c"]),
volume_24h=float(data["v"]),
change_24h=float(data["P"]),
timestamp=int(data["E"]),
)
self.prices[update.symbol] = update

for callback in self.callbacks:
callback(update)

def get_price(self, symbol: str) -> Optional[PriceUpdate]:
return self.prices.get(symbol)

def disconnect(self) -> None:
self._running = False

2. Technical Analysis

# src/analysis/technical.py
from dataclasses import dataclass


@dataclass
class TechnicalSignals:
rsi: float
rsi_signal: str # "OVERSOLD", "NEUTRAL", "OVERBOUGHT"
macd_trend: str # "BULLISH", "BEARISH"
overall_signal: str # "STRONG_BUY", "BUY", "NEUTRAL", "SELL", "STRONG_SELL"
confidence: float


class TechnicalAnalyzer:
def analyze(self, closes: list[float]) -> TechnicalSignals:
rsi = self._calculate_rsi(closes, 14)
macd = self._calculate_macd(closes)
ema9 = self._ema(closes, 9)
ema21 = self._ema(closes, 21)

# Calculate score
score = 0
if rsi < 30:
score += 2
elif rsi > 70:
score -= 2

if macd["histogram"] > 0:
score += 2
else:
score -= 2

if ema9 > ema21:
score += 1
else:
score -= 1

# Determine signal
if score >= 4:
overall = "STRONG_BUY"
elif score >= 2:
overall = "BUY"
elif score <= -4:
overall = "STRONG_SELL"
elif score <= -2:
overall = "SELL"
else:
overall = "NEUTRAL"

return TechnicalSignals(
rsi=rsi,
rsi_signal="OVERSOLD" if rsi < 30 else "OVERBOUGHT" if rsi > 70 else "NEUTRAL",
macd_trend="BULLISH" if macd["histogram"] > 0 else "BEARISH",
overall_signal=overall,
confidence=min(100, abs(score) * 20 + 40),
)

def _calculate_rsi(self, closes: list[float], period: int) -> float:
if len(closes) < period + 1:
return 50.0

gains = losses = 0.0
for i in range(len(closes) - period, len(closes)):
change = closes[i] - closes[i - 1]
if change > 0:
gains += change
else:
losses -= change

avg_gain = gains / period
avg_loss = losses / period

if avg_loss == 0:
return 100.0

rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))

def _calculate_macd(self, closes: list[float]) -> dict:
ema12 = self._ema(closes, 12)
ema26 = self._ema(closes, 26)
macd_line = ema12 - ema26
signal_line = macd_line * 0.8 # Simplified

return {
"value": macd_line,
"signal": signal_line,
"histogram": macd_line - signal_line,
}

def _ema(self, data: list[float], period: int) -> float:
k = 2 / (period + 1)
ema = data[0]
for price in data[1:]:
ema = price * k + ema * (1 - k)
return ema

3. LLM Decision Engine

# src/decision/llm_engine.py
import json
from dataclasses import dataclass
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate

from ..analysis.technical import TechnicalSignals


@dataclass
class TradingDecision:
action: str # "LONG", "SHORT", "HOLD", "CLOSE"
confidence: int
size: int # Percentage of portfolio
reasoning: str
stop_loss: Optional[float] = None
take_profit: Optional[float] = None


class LLMDecisionEngine:
def __init__(self, model: str = "gpt-4", temperature: float = 0):
self.llm = ChatOpenAI(model=model, temperature=temperature)
self.system_prompt = """You are an expert quantitative trader. Analyze market data and provide trading recommendations.

Key principles:
1. Never chase pumps - wait for pullbacks
2. Risk management is paramount - always define stop losses
3. Volume confirms price action
4. Be patient - no trade is better than a bad trade"""

async def analyze(
self,
symbol: str,
price: float,
technicals: TechnicalSignals,
) -> TradingDecision:
prompt = PromptTemplate.from_template("""
{system_prompt}

Analyzing {symbol}:
- Current Price: ${price:.2f}
- RSI(14): {rsi:.2f} ({rsi_signal})
- MACD Trend: {macd_trend}
- Overall Technical Signal: {overall_signal} ({confidence:.0f}% confidence)

Provide a trading decision in this JSON format:
{{
"action": "LONG" | "SHORT" | "HOLD" | "CLOSE",
"confidence": 0-100,
"size": 0-100,
"reasoning": "explanation",
"stop_loss": price or null,
"take_profit": price or null
}}

Only recommend trades with confidence > 65%.
""")

response = await self.llm.ainvoke(
prompt.format(
system_prompt=self.system_prompt,
symbol=symbol,
price=price,
rsi=technicals.rsi,
rsi_signal=technicals.rsi_signal,
macd_trend=technicals.macd_trend,
overall_signal=technicals.overall_signal,
confidence=technicals.confidence,
)
)

try:
# Extract JSON from response
content = response.content
json_match = content[content.find("{"):content.rfind("}") + 1]
data = json.loads(json_match)

return TradingDecision(
action=data["action"],
confidence=data["confidence"],
size=data["size"],
reasoning=data["reasoning"],
stop_loss=data.get("stop_loss"),
take_profit=data.get("take_profit"),
)
except Exception as e:
print(f"Failed to parse LLM response: {e}")
return TradingDecision(
action="HOLD",
confidence=0,
size=0,
reasoning="Failed to parse decision",
)

4. Complete Trading System

# src/main.py
import os
import asyncio
from dotenv import load_dotenv
from web3 import Web3

from zeroquant import ZeroQuantClient
from data.price_feed import PriceFeed
from analysis.technical import TechnicalAnalyzer
from decision.llm_engine import LLMDecisionEngine

load_dotenv()


class TradingSystem:
def __init__(self, client: ZeroQuantClient, symbols: list[str]):
self.client = client
self.symbols = symbols
self.price_feed = PriceFeed(symbols)
self.analyzer = TechnicalAnalyzer()
self.decision_engine = LLMDecisionEngine()
self.candle_history: dict[str, list[float]] = {s: [] for s in symbols}
self._running = False

async def start(self) -> None:
print("Starting AI Trading System...")
print(f"Symbols: {', '.join(self.symbols)}")

# Start price feed
asyncio.create_task(self.price_feed.connect())

# Start trading loop
self._running = True
while self._running:
for symbol in self.symbols:
await self._analyze_and_trade(symbol)
await asyncio.sleep(60)

async def _analyze_and_trade(self, symbol: str) -> None:
print(f"\n--- Analyzing {symbol} ---")

price_data = self.price_feed.get_price(symbol)
if not price_data:
print("No price data")
return

# Update candle history
self.candle_history[symbol].append(price_data.price)
if len(self.candle_history[symbol]) > 100:
self.candle_history[symbol] = self.candle_history[symbol][-100:]

closes = self.candle_history[symbol]
if len(closes) < 30:
print("Insufficient history")
return

# Analyze
technicals = self.analyzer.analyze(closes)
print(f"Signal: {technicals.overall_signal} ({technicals.confidence:.0f}%)")

# Get LLM decision
decision = await self.decision_engine.analyze(
symbol, price_data.price, technicals
)
print(f"Decision: {decision.action} ({decision.confidence}%)")
print(f"Reasoning: {decision.reasoning}")

# Execute if confident
if decision.confidence >= 65 and decision.action in ("LONG", "SHORT"):
await self._execute_trade(symbol, decision, price_data.price)

async def _execute_trade(self, symbol: str, decision, price: float) -> None:
print("Executing trade...")
try:
balance = await self.client.get_balance()
trade_amount = int(balance * decision.size / 100)

WETH = os.getenv("WETH_ADDRESS")
token = self._get_token_address(symbol)

path = [WETH, token] if decision.action == "LONG" else [token, WETH]

tx = await self.client.execute_swap(
amount_in=trade_amount,
path=path,
slippage_bps=100,
)
print(f"Transaction: {tx.hash}")
except Exception as e:
print(f"Trade failed: {e}")

def _get_token_address(self, symbol: str) -> str:
addresses = {
"USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
}
return addresses.get(symbol, "")

def stop(self) -> None:
self._running = False
self.price_feed.disconnect()


async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))

client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
await client.connect_vault(os.getenv("VAULT_ADDRESS"))

system = TradingSystem(client, ["ETH", "BTC"])
await system.start()


if __name__ == "__main__":
asyncio.run(main())

Sample Output

Starting AI Trading System...
Symbols: ETH, BTC

--- Analyzing ETH ---
Signal: BUY (72%)
Decision: LONG (78% confidence)
Reasoning: RSI at 35 showing oversold conditions, MACD histogram turning positive,
price bouncing with above-average volume. Setting stop loss at recent swing low.
Executing trade...
Transaction: 0x123...

What's Next?


Risk Warning: This system is for educational purposes. Always test on testnets first. Trading involves significant financial risk.