AI Trading System
Build a complete, production-grade AI trading system that combines market analysis, LLM-powered decision making, risk management, and autonomous trade execution.
System Architecture
┌────────────────────────────────────────────────────────────────────┐
│ AI Trading System │
├────────────┬─────────────┬─────────────┬─────────────┬────────────┤
│ Data │ Analysis │ Decision │ Execution │ Monitor │
│ Layer │ Engine │ Engine │ Layer │ Layer │
├────────────┼─────────────┼─────────────┼─────────────┼────────────┤
│ • Prices │ • Technical │ • LLM Core │ • Swaps │ • PnL │
│ • Volume │ • Sentiment │ • Rules │ • Batching │ • Alerts │
│ • Events │ • On-chain │ • Risk Mgmt │ • Gas Opt │ • Logs │
└────────────┴─────────────┴─────────────┴─────────────┴────────────┘
│
▼
┌───────────────────────────────┐
│ ZeroQuant Vault │
│ (Secure Execution Layer) │
└───────────────────────────────┘
Installation
- Python
- TypeScript
pip install zeroquant langchain langchain-openai web3 aiohttp websockets python-dotenv
npm install @zeroquant/sdk @langchain/openai ethers ws dotenv
Core Components
1. Real-Time Price Feed
- Python
- TypeScript
# src/data/price_feed.py
import asyncio
import json
from dataclasses import dataclass
from typing import Callable, Optional
import websockets
@dataclass
class PriceUpdate:
symbol: str
price: float
volume_24h: float
change_24h: float
timestamp: int
class PriceFeed:
def __init__(self, symbols: list[str]):
self.symbols = symbols
self.prices: dict[str, PriceUpdate] = {}
self.callbacks: list[Callable[[PriceUpdate], None]] = []
self._running = False
def on_price(self, callback: Callable[[PriceUpdate], None]) -> None:
self.callbacks.append(callback)
async def connect(self) -> None:
streams = "/".join(f"{s.lower()}usdt@ticker" for s in self.symbols)
url = f"wss://stream.binance.com:9443/ws/{streams}"
self._running = True
while self._running:
try:
async with websockets.connect(url) as ws:
print("Price feed connected")
async for message in ws:
if not self._running:
break
await self._handle_message(message)
except Exception as e:
print(f"Price feed error: {e}")
if self._running:
await asyncio.sleep(5)
async def _handle_message(self, message: str) -> None:
data = json.loads(message)
update = PriceUpdate(
symbol=data["s"].replace("USDT", ""),
price=float(data["c"]),
volume_24h=float(data["v"]),
change_24h=float(data["P"]),
timestamp=int(data["E"]),
)
self.prices[update.symbol] = update
for callback in self.callbacks:
callback(update)
def get_price(self, symbol: str) -> Optional[PriceUpdate]:
return self.prices.get(symbol)
def disconnect(self) -> None:
self._running = False
// src/data/price-feed.ts
import WebSocket from 'ws';
import { EventEmitter } from 'events';
interface PriceUpdate {
symbol: string;
price: number;
volume24h: number;
change24h: number;
timestamp: number;
}
export class PriceFeed extends EventEmitter {
private ws: WebSocket | null = null;
private prices: Map<string, PriceUpdate> = new Map();
constructor(private symbols: string[]) {
super();
}
connect(): void {
const streams = this.symbols
.map(s => `${s.toLowerCase()}usdt@ticker`)
.join('/');
this.ws = new WebSocket(
`wss://stream.binance.com:9443/ws/${streams}`
);
this.ws.on('open', () => console.log('Price feed connected'));
this.ws.on('message', (data: Buffer) => {
const ticker = JSON.parse(data.toString());
const update: PriceUpdate = {
symbol: ticker.s.replace('USDT', ''),
price: parseFloat(ticker.c),
volume24h: parseFloat(ticker.v),
change24h: parseFloat(ticker.P),
timestamp: Date.now(),
};
this.prices.set(update.symbol, update);
this.emit('price', update);
});
}
getPrice(symbol: string): PriceUpdate | undefined {
return this.prices.get(symbol);
}
disconnect(): void {
this.ws?.close();
}
}
2. Technical Analysis
- Python
- TypeScript
# src/analysis/technical.py
from dataclasses import dataclass
@dataclass
class TechnicalSignals:
rsi: float
rsi_signal: str # "OVERSOLD", "NEUTRAL", "OVERBOUGHT"
macd_trend: str # "BULLISH", "BEARISH"
overall_signal: str # "STRONG_BUY", "BUY", "NEUTRAL", "SELL", "STRONG_SELL"
confidence: float
class TechnicalAnalyzer:
def analyze(self, closes: list[float]) -> TechnicalSignals:
rsi = self._calculate_rsi(closes, 14)
macd = self._calculate_macd(closes)
ema9 = self._ema(closes, 9)
ema21 = self._ema(closes, 21)
# Calculate score
score = 0
if rsi < 30:
score += 2
elif rsi > 70:
score -= 2
if macd["histogram"] > 0:
score += 2
else:
score -= 2
if ema9 > ema21:
score += 1
else:
score -= 1
# Determine signal
if score >= 4:
overall = "STRONG_BUY"
elif score >= 2:
overall = "BUY"
elif score <= -4:
overall = "STRONG_SELL"
elif score <= -2:
overall = "SELL"
else:
overall = "NEUTRAL"
return TechnicalSignals(
rsi=rsi,
rsi_signal="OVERSOLD" if rsi < 30 else "OVERBOUGHT" if rsi > 70 else "NEUTRAL",
macd_trend="BULLISH" if macd["histogram"] > 0 else "BEARISH",
overall_signal=overall,
confidence=min(100, abs(score) * 20 + 40),
)
def _calculate_rsi(self, closes: list[float], period: int) -> float:
if len(closes) < period + 1:
return 50.0
gains = losses = 0.0
for i in range(len(closes) - period, len(closes)):
change = closes[i] - closes[i - 1]
if change > 0:
gains += change
else:
losses -= change
avg_gain = gains / period
avg_loss = losses / period
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def _calculate_macd(self, closes: list[float]) -> dict:
ema12 = self._ema(closes, 12)
ema26 = self._ema(closes, 26)
macd_line = ema12 - ema26
signal_line = macd_line * 0.8 # Simplified
return {
"value": macd_line,
"signal": signal_line,
"histogram": macd_line - signal_line,
}
def _ema(self, data: list[float], period: int) -> float:
k = 2 / (period + 1)
ema = data[0]
for price in data[1:]:
ema = price * k + ema * (1 - k)
return ema
// src/analysis/technical.ts
interface TechnicalSignals {
rsi: number;
rsiSignal: 'OVERSOLD' | 'NEUTRAL' | 'OVERBOUGHT';
macdTrend: 'BULLISH' | 'BEARISH';
overallSignal: 'STRONG_BUY' | 'BUY' | 'NEUTRAL' | 'SELL' | 'STRONG_SELL';
confidence: number;
}
export class TechnicalAnalyzer {
analyze(closes: number[]): TechnicalSignals {
const rsi = this.calculateRSI(closes, 14);
const macd = this.calculateMACD(closes);
const ema9 = this.ema(closes, 9);
const ema21 = this.ema(closes, 21);
let score = 0;
if (rsi < 30) score += 2;
else if (rsi > 70) score -= 2;
if (macd.histogram > 0) score += 2;
else score -= 2;
if (ema9 > ema21) score += 1;
else score -= 1;
let overallSignal: TechnicalSignals['overallSignal'];
if (score >= 4) overallSignal = 'STRONG_BUY';
else if (score >= 2) overallSignal = 'BUY';
else if (score <= -4) overallSignal = 'STRONG_SELL';
else if (score <= -2) overallSignal = 'SELL';
else overallSignal = 'NEUTRAL';
return {
rsi,
rsiSignal: rsi < 30 ? 'OVERSOLD' : rsi > 70 ? 'OVERBOUGHT' : 'NEUTRAL',
macdTrend: macd.histogram > 0 ? 'BULLISH' : 'BEARISH',
overallSignal,
confidence: Math.min(100, Math.abs(score) * 20 + 40),
};
}
private calculateRSI(closes: number[], period: number): number {
if (closes.length < period + 1) return 50;
let gains = 0, losses = 0;
for (let i = closes.length - period; i < closes.length; i++) {
const change = closes[i] - closes[i - 1];
if (change > 0) gains += change;
else losses -= change;
}
const avgGain = gains / period;
const avgLoss = losses / period;
if (avgLoss === 0) return 100;
const rs = avgGain / avgLoss;
return 100 - (100 / (1 + rs));
}
private calculateMACD(closes: number[]) {
const ema12 = this.ema(closes, 12);
const ema26 = this.ema(closes, 26);
const macdLine = ema12 - ema26;
const signalLine = macdLine * 0.8;
return {
value: macdLine,
signal: signalLine,
histogram: macdLine - signalLine,
};
}
private ema(data: number[], period: number): number {
const k = 2 / (period + 1);
let ema = data[0];
for (let i = 1; i < data.length; i++) {
ema = data[i] * k + ema * (1 - k);
}
return ema;
}
}
3. LLM Decision Engine
- Python
- TypeScript
# src/decision/llm_engine.py
import json
from dataclasses import dataclass
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from ..analysis.technical import TechnicalSignals
@dataclass
class TradingDecision:
action: str # "LONG", "SHORT", "HOLD", "CLOSE"
confidence: int
size: int # Percentage of portfolio
reasoning: str
stop_loss: Optional[float] = None
take_profit: Optional[float] = None
class LLMDecisionEngine:
def __init__(self, model: str = "gpt-4", temperature: float = 0):
self.llm = ChatOpenAI(model=model, temperature=temperature)
self.system_prompt = """You are an expert quantitative trader. Analyze market data and provide trading recommendations.
Key principles:
1. Never chase pumps - wait for pullbacks
2. Risk management is paramount - always define stop losses
3. Volume confirms price action
4. Be patient - no trade is better than a bad trade"""
async def analyze(
self,
symbol: str,
price: float,
technicals: TechnicalSignals,
) -> TradingDecision:
prompt = PromptTemplate.from_template("""
{system_prompt}
Analyzing {symbol}:
- Current Price: ${price:.2f}
- RSI(14): {rsi:.2f} ({rsi_signal})
- MACD Trend: {macd_trend}
- Overall Technical Signal: {overall_signal} ({confidence:.0f}% confidence)
Provide a trading decision in this JSON format:
{{
"action": "LONG" | "SHORT" | "HOLD" | "CLOSE",
"confidence": 0-100,
"size": 0-100,
"reasoning": "explanation",
"stop_loss": price or null,
"take_profit": price or null
}}
Only recommend trades with confidence > 65%.
""")
response = await self.llm.ainvoke(
prompt.format(
system_prompt=self.system_prompt,
symbol=symbol,
price=price,
rsi=technicals.rsi,
rsi_signal=technicals.rsi_signal,
macd_trend=technicals.macd_trend,
overall_signal=technicals.overall_signal,
confidence=technicals.confidence,
)
)
try:
# Extract JSON from response
content = response.content
json_match = content[content.find("{"):content.rfind("}") + 1]
data = json.loads(json_match)
return TradingDecision(
action=data["action"],
confidence=data["confidence"],
size=data["size"],
reasoning=data["reasoning"],
stop_loss=data.get("stop_loss"),
take_profit=data.get("take_profit"),
)
except Exception as e:
print(f"Failed to parse LLM response: {e}")
return TradingDecision(
action="HOLD",
confidence=0,
size=0,
reasoning="Failed to parse decision",
)
// src/decision/llm-engine.ts
import { ChatOpenAI } from '@langchain/openai';
import { PromptTemplate } from '@langchain/core/prompts';
interface TradingDecision {
action: 'LONG' | 'SHORT' | 'HOLD' | 'CLOSE';
confidence: number;
size: number;
reasoning: string;
stopLoss?: number;
takeProfit?: number;
}
export class LLMDecisionEngine {
private llm: ChatOpenAI;
constructor(model: string = 'gpt-4') {
this.llm = new ChatOpenAI({ modelName: model, temperature: 0 });
}
async analyze(
symbol: string,
price: number,
technicals: TechnicalSignals
): Promise<TradingDecision> {
const prompt = PromptTemplate.fromTemplate(`
You are an expert quantitative trader.
Analyzing {symbol}:
- Current Price: $${price.toFixed(2)}
- RSI(14): {rsi} ({rsiSignal})
- MACD Trend: {macdTrend}
- Overall Signal: {overallSignal} ({confidence}% confidence)
Provide a trading decision in JSON format:
{{
"action": "LONG" | "SHORT" | "HOLD" | "CLOSE",
"confidence": 0-100,
"size": 0-100,
"reasoning": "explanation",
"stopLoss": price or null,
"takeProfit": price or null
}}
`);
const response = await this.llm.invoke(
await prompt.format({
symbol,
rsi: technicals.rsi.toFixed(2),
rsiSignal: technicals.rsiSignal,
macdTrend: technicals.macdTrend,
overallSignal: technicals.overallSignal,
confidence: technicals.confidence.toFixed(0),
})
);
try {
const content = response.content as string;
const jsonMatch = content.match(/\{[\s\S]*\}/);
if (!jsonMatch) throw new Error('No JSON found');
return JSON.parse(jsonMatch[0]);
} catch {
return {
action: 'HOLD',
confidence: 0,
size: 0,
reasoning: 'Failed to parse decision',
};
}
}
}
4. Complete Trading System
- Python
- TypeScript
# src/main.py
import os
import asyncio
from dotenv import load_dotenv
from web3 import Web3
from zeroquant import ZeroQuantClient
from data.price_feed import PriceFeed
from analysis.technical import TechnicalAnalyzer
from decision.llm_engine import LLMDecisionEngine
load_dotenv()
class TradingSystem:
def __init__(self, client: ZeroQuantClient, symbols: list[str]):
self.client = client
self.symbols = symbols
self.price_feed = PriceFeed(symbols)
self.analyzer = TechnicalAnalyzer()
self.decision_engine = LLMDecisionEngine()
self.candle_history: dict[str, list[float]] = {s: [] for s in symbols}
self._running = False
async def start(self) -> None:
print("Starting AI Trading System...")
print(f"Symbols: {', '.join(self.symbols)}")
# Start price feed
asyncio.create_task(self.price_feed.connect())
# Start trading loop
self._running = True
while self._running:
for symbol in self.symbols:
await self._analyze_and_trade(symbol)
await asyncio.sleep(60)
async def _analyze_and_trade(self, symbol: str) -> None:
print(f"\n--- Analyzing {symbol} ---")
price_data = self.price_feed.get_price(symbol)
if not price_data:
print("No price data")
return
# Update candle history
self.candle_history[symbol].append(price_data.price)
if len(self.candle_history[symbol]) > 100:
self.candle_history[symbol] = self.candle_history[symbol][-100:]
closes = self.candle_history[symbol]
if len(closes) < 30:
print("Insufficient history")
return
# Analyze
technicals = self.analyzer.analyze(closes)
print(f"Signal: {technicals.overall_signal} ({technicals.confidence:.0f}%)")
# Get LLM decision
decision = await self.decision_engine.analyze(
symbol, price_data.price, technicals
)
print(f"Decision: {decision.action} ({decision.confidence}%)")
print(f"Reasoning: {decision.reasoning}")
# Execute if confident
if decision.confidence >= 65 and decision.action in ("LONG", "SHORT"):
await self._execute_trade(symbol, decision, price_data.price)
async def _execute_trade(self, symbol: str, decision, price: float) -> None:
print("Executing trade...")
try:
balance = await self.client.get_balance()
trade_amount = int(balance * decision.size / 100)
WETH = os.getenv("WETH_ADDRESS")
token = self._get_token_address(symbol)
path = [WETH, token] if decision.action == "LONG" else [token, WETH]
tx = await self.client.execute_swap(
amount_in=trade_amount,
path=path,
slippage_bps=100,
)
print(f"Transaction: {tx.hash}")
except Exception as e:
print(f"Trade failed: {e}")
def _get_token_address(self, symbol: str) -> str:
addresses = {
"USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
}
return addresses.get(symbol, "")
def stop(self) -> None:
self._running = False
self.price_feed.disconnect()
async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))
client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
await client.connect_vault(os.getenv("VAULT_ADDRESS"))
system = TradingSystem(client, ["ETH", "BTC"])
await system.start()
if __name__ == "__main__":
asyncio.run(main())
// src/index.ts
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
import { PriceFeed } from './data/price-feed';
import { TechnicalAnalyzer } from './analysis/technical';
import { LLMDecisionEngine } from './decision/llm-engine';
import 'dotenv/config';
class TradingSystem {
private client: ZeroQuantClient;
private priceFeed: PriceFeed;
private analyzer: TechnicalAnalyzer;
private decisionEngine: LLMDecisionEngine;
private candleHistory: Map<string, number[]> = new Map();
private isRunning = false;
constructor(client: ZeroQuantClient, private symbols: string[]) {
this.client = client;
this.priceFeed = new PriceFeed(symbols);
this.analyzer = new TechnicalAnalyzer();
this.decisionEngine = new LLMDecisionEngine();
for (const s of symbols) {
this.candleHistory.set(s, []);
}
}
async start(): Promise<void> {
console.log('Starting AI Trading System...');
console.log(`Symbols: ${this.symbols.join(', ')}`);
this.priceFeed.connect();
this.isRunning = true;
while (this.isRunning) {
for (const symbol of this.symbols) {
await this.analyzeAndTrade(symbol);
}
await this.sleep(60000);
}
}
private async analyzeAndTrade(symbol: string): Promise<void> {
console.log(`\n--- Analyzing ${symbol} ---`);
const priceData = this.priceFeed.getPrice(symbol);
if (!priceData) {
console.log('No price data');
return;
}
// Update history
const history = this.candleHistory.get(symbol) || [];
history.push(priceData.price);
if (history.length > 100) history.shift();
this.candleHistory.set(symbol, history);
if (history.length < 30) {
console.log('Insufficient history');
return;
}
// Analyze
const technicals = this.analyzer.analyze(history);
console.log(`Signal: ${technicals.overallSignal} (${technicals.confidence}%)`);
// Get LLM decision
const decision = await this.decisionEngine.analyze(
symbol,
priceData.price,
technicals
);
console.log(`Decision: ${decision.action} (${decision.confidence}%)`);
console.log(`Reasoning: ${decision.reasoning}`);
// Execute if confident
if (decision.confidence >= 65 && decision.action !== 'HOLD') {
await this.executeTrade(symbol, decision, priceData.price);
}
}
private async executeTrade(symbol: string, decision: any, price: number): Promise<void> {
console.log('Executing trade...');
try {
const balance = await this.client.getBalance();
const tradeAmount = (balance * BigInt(decision.size)) / 100n;
const WETH = process.env.WETH_ADDRESS!;
const token = this.getTokenAddress(symbol);
const path = decision.action === 'LONG'
? [WETH, token]
: [token, WETH];
const tx = await this.client.executeSwap({
amountIn: tradeAmount,
path,
slippageBps: 100,
});
console.log(`Transaction: ${tx.hash}`);
} catch (error) {
console.error('Trade failed:', error);
}
}
private getTokenAddress(symbol: string): string {
const addresses: Record<string, string> = {
'USDC': '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48',
};
return addresses[symbol] || '';
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
stop(): void {
this.isRunning = false;
this.priceFeed.disconnect();
}
}
async function main() {
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const signer = new ethers.Wallet(process.env.PRIVATE_KEY!, provider);
const client = new ZeroQuantClient(provider, {
factoryAddress: process.env.FACTORY_ADDRESS!,
permissionManagerAddress: process.env.PERMISSION_MANAGER_ADDRESS!,
});
await client.connect(signer);
await client.connectVault(process.env.VAULT_ADDRESS!);
const system = new TradingSystem(client, ['ETH', 'BTC']);
process.on('SIGINT', () => {
system.stop();
process.exit(0);
});
await system.start();
}
main().catch(console.error);
Sample Output
Starting AI Trading System...
Symbols: ETH, BTC
--- Analyzing ETH ---
Signal: BUY (72%)
Decision: LONG (78% confidence)
Reasoning: RSI at 35 showing oversold conditions, MACD histogram turning positive,
price bouncing with above-average volume. Setting stop loss at recent swing low.
Executing trade...
Transaction: 0x123...
What's Next?
- Multi-Agent Systems - Coordinate specialized agents
- Local LLM Setup - Run privately with Ollama
Risk Warning: This system is for educational purposes. Always test on testnets first. Trading involves significant financial risk.