Building a Trading Agent
Learn how to build an AI trading agent that can analyze market conditions, make trading decisions, and execute swaps through your ZeroQuant vault.
Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ Trading Agent │
├──────────────┬──────────────┬──────────────┬───────────┤
│ Market │ Decision │ Execution │ Risk │
│ Analysis │ Engine │ Layer │ Manager │
├──────────────┼──────────────┼──────────────┼───────────┤
│ • Price feeds│ • LLM-based │ • Swap exec │ • Position│
│ • Indicators │ • Rule-based │ • Batch ops │ limits │
│ • Sentiment │ • Hybrid │ • Gas optim │ • Stop- │
│ │ │ │ loss │
└──────────────┴──────────────┴──────────────┴───────────┘
│
▼
┌─────────────────┐
│ ZeroQuant SDK │
│ + Vault │
└─────────────────┘
Prerequisites
- Python
- TypeScript
pip install zeroquant langchain langchain-openai web3 aiohttp python-dotenv
npm install @zeroquant/sdk @zeroquant/langchain @langchain/openai ethers axios
Step 1: Market Data Integration
First, create a service to fetch market data:
- Python
- TypeScript
# src/services/market_data.py
import aiohttp
from dataclasses import dataclass
from typing import Optional
@dataclass
class PriceData:
symbol: str
price: float
change_24h: float
volume_24h: float
@dataclass
class MarketIndicators:
rsi: float
macd: dict # {"value": float, "signal": float, "histogram": float}
sma20: float
sma50: float
class MarketDataService:
def __init__(self):
self.base_url = "https://api.coingecko.com/api/v3"
async def get_price(self, token_id: str) -> PriceData:
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/simple/price"
params = {
"ids": token_id,
"vs_currencies": "usd",
"include_24hr_change": "true",
"include_24hr_vol": "true",
}
async with session.get(url, params=params) as response:
data = await response.json()
token_data = data[token_id]
return PriceData(
symbol=token_id.upper(),
price=token_data["usd"],
change_24h=token_data.get("usd_24h_change", 0),
volume_24h=token_data.get("usd_24h_vol", 0),
)
async def get_historical_prices(self, token_id: str, days: int = 30) -> list[float]:
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/coins/{token_id}/market_chart"
params = {"vs_currency": "usd", "days": days}
async with session.get(url, params=params) as response:
data = await response.json()
return [p[1] for p in data["prices"]]
def calculate_rsi(self, prices: list[float], period: int = 14) -> float:
if len(prices) < period + 1:
return 50.0
gains = 0.0
losses = 0.0
for i in range(1, period + 1):
change = prices[i] - prices[i - 1]
if change > 0:
gains += change
else:
losses -= change
avg_gain = gains / period
avg_loss = losses / period
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def calculate_sma(self, prices: list[float], period: int) -> float:
if len(prices) < period:
return prices[-1]
return sum(prices[-period:]) / period
def calculate_ema(self, prices: list[float], period: int) -> float:
multiplier = 2 / (period + 1)
ema = prices[0]
for price in prices[1:]:
ema = (price - ema) * multiplier + ema
return ema
def calculate_macd(self, prices: list[float]) -> dict:
ema12 = self.calculate_ema(prices, 12)
ema26 = self.calculate_ema(prices, 26)
macd_line = ema12 - ema26
signal = self.calculate_ema([macd_line] * 9, 9)
return {
"value": macd_line,
"signal": signal,
"histogram": macd_line - signal,
}
async def get_indicators(self, token_id: str) -> MarketIndicators:
prices = await self.get_historical_prices(token_id, 60)
return MarketIndicators(
rsi=self.calculate_rsi(prices[-15:]),
macd=self.calculate_macd(prices),
sma20=self.calculate_sma(prices, 20),
sma50=self.calculate_sma(prices, 50),
)
// src/services/market-data.ts
import axios from 'axios';
interface PriceData {
symbol: string;
price: number;
change24h: number;
volume24h: number;
}
interface MarketIndicators {
rsi: number;
macd: { value: number; signal: number; histogram: number };
sma20: number;
sma50: number;
}
export class MarketDataService {
private baseUrl = 'https://api.coingecko.com/api/v3';
async getPrice(tokenId: string): Promise<PriceData> {
const response = await axios.get(
`${this.baseUrl}/simple/price?ids=${tokenId}&vs_currencies=usd&include_24hr_change=true&include_24hr_vol=true`
);
const data = response.data[tokenId];
return {
symbol: tokenId.toUpperCase(),
price: data.usd,
change24h: data.usd_24h_change,
volume24h: data.usd_24h_vol,
};
}
async getHistoricalPrices(tokenId: string, days: number = 30): Promise<number[]> {
const response = await axios.get(
`${this.baseUrl}/coins/${tokenId}/market_chart?vs_currency=usd&days=${days}`
);
return response.data.prices.map((p: [number, number]) => p[1]);
}
calculateRSI(prices: number[], period: number = 14): number {
if (prices.length < period + 1) return 50;
let gains = 0;
let losses = 0;
for (let i = 1; i <= period; i++) {
const change = prices[i] - prices[i - 1];
if (change > 0) gains += change;
else losses -= change;
}
const avgGain = gains / period;
const avgLoss = losses / period;
if (avgLoss === 0) return 100;
const rs = avgGain / avgLoss;
return 100 - (100 / (1 + rs));
}
calculateSMA(prices: number[], period: number): number {
if (prices.length < period) return prices[prices.length - 1];
const slice = prices.slice(-period);
return slice.reduce((a, b) => a + b, 0) / period;
}
async getIndicators(tokenId: string): Promise<MarketIndicators> {
const prices = await this.getHistoricalPrices(tokenId, 60);
return {
rsi: this.calculateRSI(prices.slice(-15)),
macd: this.calculateMACD(prices),
sma20: this.calculateSMA(prices, 20),
sma50: this.calculateSMA(prices, 50),
};
}
private calculateMACD(prices: number[]) {
const ema12 = this.calculateEMA(prices, 12);
const ema26 = this.calculateEMA(prices, 26);
const macdLine = ema12 - ema26;
const signal = this.calculateEMA([...Array(8).fill(macdLine), macdLine], 9);
return {
value: macdLine,
signal: signal,
histogram: macdLine - signal,
};
}
private calculateEMA(prices: number[], period: number): number {
const multiplier = 2 / (period + 1);
let ema = prices[0];
for (let i = 1; i < prices.length; i++) {
ema = (prices[i] - ema) * multiplier + ema;
}
return ema;
}
}
Step 2: Trading Decision Engine
Create an LLM-powered decision engine:
- Python
- TypeScript
# src/services/decision_engine.py
import asyncio
import json
from dataclasses import dataclass
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from .market_data import MarketDataService
@dataclass
class TradingSignal:
action: str # "BUY", "SELL", or "HOLD"
confidence: int
reasoning: str
suggested_amount: Optional[str] = None
class DecisionEngine:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.market_data = MarketDataService()
async def analyze(self, token_id: str) -> TradingSignal:
price, indicators = await asyncio.gather(
self.market_data.get_price(token_id),
self.market_data.get_indicators(token_id),
)
prompt = PromptTemplate.from_template("""
You are a professional crypto trading analyst. Analyze the following market data and provide a trading recommendation.
Current Market Data for {symbol}:
- Price: ${price:.2f}
- 24h Change: {change_24h:.2f}%
- 24h Volume: ${volume_24h:,.0f}
Technical Indicators:
- RSI (14): {rsi:.2f} (Oversold < 30, Overbought > 70)
- MACD: {macd_value:.4f} (Signal: {macd_signal:.4f})
- SMA 20: {sma20:.2f}
- SMA 50: {sma50:.2f}
- Trend: {trend}
Based on this analysis, provide a recommendation in the following JSON format:
{{
"action": "BUY" | "SELL" | "HOLD",
"confidence": 0-100,
"reasoning": "Brief explanation",
"suggested_amount": "percentage of portfolio (e.g., '10%')"
}}
Only respond with valid JSON.
""")
trend = "BULLISH" if indicators.sma20 > indicators.sma50 else "BEARISH"
response = await self.llm.ainvoke(
prompt.format(
symbol=price.symbol,
price=price.price,
change_24h=price.change_24h,
volume_24h=price.volume_24h,
rsi=indicators.rsi,
macd_value=indicators.macd["value"],
macd_signal=indicators.macd["signal"],
sma20=indicators.sma20,
sma50=indicators.sma50,
trend=trend,
)
)
try:
data = json.loads(response.content)
return TradingSignal(
action=data["action"],
confidence=data["confidence"],
reasoning=data["reasoning"],
suggested_amount=data.get("suggested_amount"),
)
except (json.JSONDecodeError, KeyError):
return TradingSignal(
action="HOLD",
confidence=0,
reasoning="Failed to parse LLM response",
)
// src/services/decision-engine.ts
import { ChatOpenAI } from '@langchain/openai';
import { PromptTemplate } from '@langchain/core/prompts';
import { MarketDataService, MarketIndicators, PriceData } from './market-data';
export interface TradingSignal {
action: 'BUY' | 'SELL' | 'HOLD';
confidence: number;
reasoning: string;
suggestedAmount?: string;
}
export class DecisionEngine {
private llm: ChatOpenAI;
private marketData: MarketDataService;
constructor() {
this.llm = new ChatOpenAI({
modelName: 'gpt-4',
temperature: 0,
});
this.marketData = new MarketDataService();
}
async analyze(tokenId: string): Promise<TradingSignal> {
const [price, indicators] = await Promise.all([
this.marketData.getPrice(tokenId),
this.marketData.getIndicators(tokenId),
]);
const prompt = PromptTemplate.fromTemplate(`
You are a professional crypto trading analyst. Analyze the following market data and provide a trading recommendation.
Current Market Data for {symbol}:
- Price: ${price.price.toFixed(2)}
- 24h Change: {change24h}%
- 24h Volume: ${price.volume24h.toLocaleString()}
Technical Indicators:
- RSI (14): {rsi} (Oversold < 30, Overbought > 70)
- MACD: {macd_value} (Signal: {macd_signal})
- SMA 20: {sma20}
- SMA 50: {sma50}
- Trend: {trend}
Based on this analysis, provide a recommendation in the following JSON format:
{{
"action": "BUY" | "SELL" | "HOLD",
"confidence": 0-100,
"reasoning": "Brief explanation",
"suggestedAmount": "percentage of portfolio (e.g., '10%')"
}}
Only respond with valid JSON.
`);
const trend = indicators.sma20 > indicators.sma50 ? 'BULLISH' : 'BEARISH';
const response = await this.llm.invoke(
await prompt.format({
symbol: price.symbol,
change24h: price.change24h.toFixed(2),
rsi: indicators.rsi.toFixed(2),
macd_value: indicators.macd.value.toFixed(4),
macd_signal: indicators.macd.signal.toFixed(4),
sma20: indicators.sma20.toFixed(2),
sma50: indicators.sma50.toFixed(2),
trend,
})
);
try {
return JSON.parse(response.content as string);
} catch {
return {
action: 'HOLD',
confidence: 0,
reasoning: 'Failed to parse LLM response',
};
}
}
}
Step 3: Risk Manager
Implement safety controls:
- Python
- TypeScript
# src/services/risk_manager.py
from dataclasses import dataclass
from typing import Optional
from web3 import Web3
from zeroquant import ZeroQuantClient
@dataclass
class RiskConfig:
max_position_size_pct: float = 10.0 # Max % of portfolio per trade
max_daily_loss_pct: float = 5.0 # Max daily loss before stopping
stop_loss_pct: float = 2.0 # Per-trade stop loss
take_profit_pct: float = 5.0 # Per-trade take profit
max_slippage_bps: int = 100 # Max acceptable slippage
@dataclass
class TradeValidation:
approved: bool
reason: Optional[str] = None
adjusted_amount: Optional[int] = None
class RiskManager:
def __init__(self, client: ZeroQuantClient, config: Optional[RiskConfig] = None):
self.client = client
self.config = config or RiskConfig()
self.daily_pnl: float = 0.0
async def validate_trade(
self,
amount_in: int,
token_in: str,
token_out: str,
) -> TradeValidation:
# Check daily loss limit
if self.daily_pnl < -self.config.max_daily_loss_pct:
return TradeValidation(
approved=False,
reason=f"Daily loss limit reached ({self.daily_pnl:.2f}%)",
)
# Get vault balance
balance = await self.client.get_balance()
# Check position size
position_size_pct = (amount_in / balance) * 100
if position_size_pct > self.config.max_position_size_pct:
adjusted_amount = int(balance * self.config.max_position_size_pct / 100)
return TradeValidation(
approved=True,
reason=f"Position size adjusted from {position_size_pct:.1f}% to {self.config.max_position_size_pct}%",
adjusted_amount=adjusted_amount,
)
return TradeValidation(approved=True)
def update_pnl(self, pnl: float) -> None:
self.daily_pnl += pnl
def reset_daily(self) -> None:
self.daily_pnl = 0.0
def get_stop_loss(self, entry_price: float) -> float:
return entry_price * (1 - self.config.stop_loss_pct / 100)
def get_take_profit(self, entry_price: float) -> float:
return entry_price * (1 + self.config.take_profit_pct / 100)
// src/services/risk-manager.ts
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
export interface RiskConfig {
maxPositionSizePct: number; // Max % of portfolio per trade
maxDailyLossPct: number; // Max daily loss before stopping
stopLossPct: number; // Per-trade stop loss
takeProfitPct: number; // Per-trade take profit
maxSlippageBps: number; // Max acceptable slippage
}
export interface TradeValidation {
approved: boolean;
reason?: string;
adjustedAmount?: bigint;
}
export class RiskManager {
private config: RiskConfig;
private client: ZeroQuantClient;
private dailyPnL: number = 0;
constructor(client: ZeroQuantClient, config: Partial<RiskConfig> = {}) {
this.client = client;
this.config = {
maxPositionSizePct: 10,
maxDailyLossPct: 5,
stopLossPct: 2,
takeProfitPct: 5,
maxSlippageBps: 100,
...config,
};
}
async validateTrade(
amountIn: bigint,
tokenIn: string,
tokenOut: string
): Promise<TradeValidation> {
// Check daily loss limit
if (this.dailyPnL < -this.config.maxDailyLossPct) {
return {
approved: false,
reason: `Daily loss limit reached (${this.dailyPnL.toFixed(2)}%)`,
};
}
// Get vault balance
const balance = await this.client.getBalance();
// Check position size
const positionSizePct = (Number(amountIn) / Number(balance)) * 100;
if (positionSizePct > this.config.maxPositionSizePct) {
const adjustedAmount = (balance * BigInt(this.config.maxPositionSizePct)) / 100n;
return {
approved: true,
reason: `Position size adjusted from ${positionSizePct.toFixed(1)}% to ${this.config.maxPositionSizePct}%`,
adjustedAmount,
};
}
return { approved: true };
}
updatePnL(pnl: number): void {
this.dailyPnL += pnl;
}
resetDaily(): void {
this.dailyPnL = 0;
}
getStopLoss(entryPrice: number): number {
return entryPrice * (1 - this.config.stopLossPct / 100);
}
getTakeProfit(entryPrice: number): number {
return entryPrice * (1 + this.config.takeProfitPct / 100);
}
}
Step 4: Complete Trading Agent
Bring it all together:
- Python
- TypeScript
# src/trading_agent.py
import asyncio
import re
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from web3 import Web3
from zeroquant import ZeroQuantClient
from .services.decision_engine import DecisionEngine, TradingSignal
from .services.risk_manager import RiskManager, RiskConfig
from .services.market_data import MarketDataService
@dataclass
class TradingPair:
token_in: str
token_out: str
token_id: str
@dataclass
class TradingAgentConfig:
client: ZeroQuantClient
vault_address: str
trading_pairs: list[TradingPair]
risk_config: Optional[RiskConfig] = None
check_interval_ms: int = 60000
class TradingAgent:
def __init__(self, config: TradingAgentConfig):
self.client = config.client
self.vault_address = config.vault_address
self.trading_pairs = config.trading_pairs
self.check_interval = config.check_interval_ms / 1000 # Convert to seconds
self.decision_engine = DecisionEngine()
self.risk_manager = RiskManager(self.client, config.risk_config)
self.market_data = MarketDataService()
self.is_running = False
async def start(self) -> None:
print("Trading Agent started")
self.is_running = True
while self.is_running:
await self._run_trading_cycle()
await asyncio.sleep(self.check_interval)
def stop(self) -> None:
self.is_running = False
print("Trading Agent stopped")
async def _run_trading_cycle(self) -> None:
print("\n--- Trading Cycle ---")
print(f"Time: {datetime.now().isoformat()}")
for pair in self.trading_pairs:
try:
# Analyze market
signal = await self.decision_engine.analyze(pair.token_id)
print(f"\n{pair.token_id.upper()} Analysis:")
print(f" Action: {signal.action}")
print(f" Confidence: {signal.confidence}%")
print(f" Reasoning: {signal.reasoning}")
# Only act on high-confidence signals
if signal.confidence < 70:
print(" -> Skipping: Low confidence")
continue
if signal.action == "HOLD":
print(" -> Holding position")
continue
# Calculate trade amount
balance = await self.client.get_balance()
trade_amount = self._calculate_trade_amount(balance, signal)
# Validate with risk manager
validation = await self.risk_manager.validate_trade(
trade_amount,
pair.token_in,
pair.token_out,
)
if not validation.approved:
print(f" -> Trade rejected: {validation.reason}")
continue
final_amount = validation.adjusted_amount or trade_amount
if validation.reason:
print(f" -> {validation.reason}")
# Execute trade
if signal.action == "BUY":
await self._execute_buy(pair.token_in, pair.token_out, final_amount)
elif signal.action == "SELL":
await self._execute_sell(pair.token_out, pair.token_in, final_amount)
except Exception as e:
print(f"Error processing {pair.token_id}: {e}")
def _calculate_trade_amount(self, balance: int, signal: TradingSignal) -> int:
# Parse suggested amount (e.g., "10%")
match = re.search(r"(\d+)%", signal.suggested_amount or "")
pct = int(match.group(1)) if match else 5 # Default 5%
return int(balance * pct / 100)
async def _execute_buy(
self,
token_in: str,
token_out: str,
amount: int,
) -> None:
print(f" -> Executing BUY: {Web3.from_wei(amount, 'ether')} ETH -> {token_out}")
try:
tx = await self.client.execute_swap(
amount_in=amount,
path=[token_in, token_out],
slippage_bps=100,
)
print(f" -> Transaction: {tx.hash}")
await tx.wait()
print(" -> Trade completed")
except Exception as e:
print(f" -> Trade failed: {e}")
async def _execute_sell(
self,
token_in: str,
token_out: str,
amount: int,
) -> None:
print(f" -> Executing SELL: {token_in} -> ETH")
try:
tx = await self.client.execute_swap(
amount_in=amount,
path=[token_in, token_out],
slippage_bps=100,
)
print(f" -> Transaction: {tx.hash}")
await tx.wait()
print(" -> Trade completed")
except Exception as e:
print(f" -> Trade failed: {e}")
// src/trading-agent.ts
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
import { DecisionEngine, TradingSignal } from './services/decision-engine';
import { RiskManager, RiskConfig } from './services/risk-manager';
import { MarketDataService } from './services/market-data';
interface TradingAgentConfig {
client: ZeroQuantClient;
vaultAddress: string;
riskConfig?: Partial<RiskConfig>;
tradingPairs: Array<{ tokenIn: string; tokenOut: string; tokenId: string }>;
checkIntervalMs?: number;
}
export class TradingAgent {
private client: ZeroQuantClient;
private vaultAddress: string;
private decisionEngine: DecisionEngine;
private riskManager: RiskManager;
private marketData: MarketDataService;
private tradingPairs: Array<{ tokenIn: string; tokenOut: string; tokenId: string }>;
private checkInterval: number;
private isRunning: boolean = false;
constructor(config: TradingAgentConfig) {
this.client = config.client;
this.vaultAddress = config.vaultAddress;
this.tradingPairs = config.tradingPairs;
this.checkInterval = config.checkIntervalMs || 60000;
this.decisionEngine = new DecisionEngine();
this.riskManager = new RiskManager(this.client, config.riskConfig);
this.marketData = new MarketDataService();
}
async start(): Promise<void> {
console.log('Trading Agent started');
this.isRunning = true;
while (this.isRunning) {
await this.runTradingCycle();
await this.sleep(this.checkInterval);
}
}
stop(): void {
this.isRunning = false;
console.log('Trading Agent stopped');
}
private async runTradingCycle(): Promise<void> {
console.log('\n--- Trading Cycle ---');
console.log(`Time: ${new Date().toISOString()}`);
for (const pair of this.tradingPairs) {
try {
// Analyze market
const signal = await this.decisionEngine.analyze(pair.tokenId);
console.log(`\n${pair.tokenId.toUpperCase()} Analysis:`);
console.log(` Action: ${signal.action}`);
console.log(` Confidence: ${signal.confidence}%`);
console.log(` Reasoning: ${signal.reasoning}`);
// Only act on high-confidence signals
if (signal.confidence < 70) {
console.log(' -> Skipping: Low confidence');
continue;
}
if (signal.action === 'HOLD') {
console.log(' -> Holding position');
continue;
}
// Calculate trade amount
const balance = await this.client.getBalance();
const tradeAmount = this.calculateTradeAmount(balance, signal);
// Validate with risk manager
const validation = await this.riskManager.validateTrade(
tradeAmount,
pair.tokenIn,
pair.tokenOut
);
if (!validation.approved) {
console.log(` -> Trade rejected: ${validation.reason}`);
continue;
}
const finalAmount = validation.adjustedAmount || tradeAmount;
if (validation.reason) {
console.log(` -> ${validation.reason}`);
}
// Execute trade
if (signal.action === 'BUY') {
await this.executeBuy(pair.tokenIn, pair.tokenOut, finalAmount);
} else if (signal.action === 'SELL') {
await this.executeSell(pair.tokenOut, pair.tokenIn, finalAmount);
}
} catch (error) {
console.error(`Error processing ${pair.tokenId}:`, error);
}
}
}
private calculateTradeAmount(balance: bigint, signal: TradingSignal): bigint {
const pctMatch = signal.suggestedAmount?.match(/(\d+)%/);
const pct = pctMatch ? parseInt(pctMatch[1]) : 5;
return (balance * BigInt(pct)) / 100n;
}
private async executeBuy(
tokenIn: string,
tokenOut: string,
amount: bigint
): Promise<void> {
console.log(` -> Executing BUY: ${ethers.formatEther(amount)} ETH -> ${tokenOut}`);
try {
const tx = await this.client.executeSwap({
amountIn: amount,
path: [tokenIn, tokenOut],
slippageBps: 100,
});
console.log(` -> Transaction: ${tx.hash}`);
await tx.wait();
console.log(' -> Trade completed');
} catch (error) {
console.error(' -> Trade failed:', error);
}
}
private async executeSell(
tokenIn: string,
tokenOut: string,
amount: bigint
): Promise<void> {
console.log(` -> Executing SELL: ${tokenIn} -> ETH`);
try {
const tx = await this.client.executeSwap({
amountIn: amount,
path: [tokenIn, tokenOut],
slippageBps: 100,
});
console.log(` -> Transaction: ${tx.hash}`);
await tx.wait();
console.log(' -> Trade completed');
} catch (error) {
console.error(' -> Trade failed:', error);
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Step 5: Run the Agent
- Python
- TypeScript
# src/main.py
import os
import asyncio
import signal
from dotenv import load_dotenv
from web3 import Web3
from zeroquant import ZeroQuantClient
from trading_agent import TradingAgent, TradingAgentConfig, TradingPair
from services.risk_manager import RiskConfig
load_dotenv()
async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))
client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
# Connect to vault
await client.connect_vault(os.getenv("VAULT_ADDRESS"))
agent = TradingAgent(TradingAgentConfig(
client=client,
vault_address=os.getenv("VAULT_ADDRESS"),
trading_pairs=[
TradingPair(
token_in="0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", # WETH
token_out="0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", # USDC
token_id="ethereum",
),
],
risk_config=RiskConfig(
max_position_size_pct=10,
max_daily_loss_pct=5,
stop_loss_pct=3,
take_profit_pct=8,
),
check_interval_ms=300000, # 5 minutes
))
# Handle shutdown
def shutdown_handler(sig, frame):
agent.stop()
signal.signal(signal.SIGINT, shutdown_handler)
await agent.start()
if __name__ == "__main__":
asyncio.run(main())
// src/index.ts
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
import { TradingAgent } from './trading-agent';
import 'dotenv/config';
async function main() {
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const signer = new ethers.Wallet(process.env.PRIVATE_KEY!, provider);
const client = new ZeroQuantClient(provider, {
factoryAddress: process.env.FACTORY_ADDRESS!,
permissionManagerAddress: process.env.PERMISSION_MANAGER_ADDRESS!,
});
await client.connect(signer);
// Connect to vault
await client.connectVault(process.env.VAULT_ADDRESS!);
const agent = new TradingAgent({
client,
vaultAddress: process.env.VAULT_ADDRESS!,
tradingPairs: [
{
tokenIn: '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', // WETH
tokenOut: '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', // USDC
tokenId: 'ethereum',
},
],
riskConfig: {
maxPositionSizePct: 10,
maxDailyLossPct: 5,
stopLossPct: 3,
takeProfitPct: 8,
},
checkIntervalMs: 300000, // 5 minutes
});
// Handle shutdown
process.on('SIGINT', () => {
agent.stop();
process.exit(0);
});
await agent.start();
}
main().catch(console.error);
Key Concepts
Stateless vs Stateful Decisions
Stateless: Each decision is independent based on current market state.
- Python
- TypeScript
signal = await decision_engine.analyze("ethereum")
# No memory of previous decisions
const signal = await decisionEngine.analyze('ethereum');
// No memory of previous decisions
Stateful: Track history for pattern recognition.
- Python
- TypeScript
class StatefulDecisionEngine:
def __init__(self):
self.history: list[TradingSignal] = []
async def analyze(self, token_id: str) -> TradingSignal:
signal = await self._base_analyze(token_id)
# Avoid flip-flopping
if self.history:
last_action = self.history[-1].action
if last_action != signal.action and signal.confidence < 80:
signal.action = "HOLD"
signal.reasoning = "Avoiding rapid position change"
self.history.append(signal)
return signal
class StatefulDecisionEngine {
private history: TradingSignal[] = [];
async analyze(tokenId: string): Promise<TradingSignal> {
const signal = await this.baseAnalyze(tokenId);
// Avoid flip-flopping
if (this.history.length > 0) {
const lastAction = this.history[this.history.length - 1].action;
if (lastAction !== signal.action && signal.confidence < 80) {
signal.action = 'HOLD';
signal.reasoning = 'Avoiding rapid position change';
}
}
this.history.push(signal);
return signal;
}
}
Position Management
Track open positions and their performance:
- Python
- TypeScript
from dataclasses import dataclass
@dataclass
class Position:
token: str
entry_price: float
amount: int
stop_loss: float
take_profit: float
opened_at: float
class PositionManager:
def __init__(self):
self.positions: dict[str, Position] = {}
def open_position(
self,
token: str,
price: float,
amount: int,
risk_manager: RiskManager,
) -> None:
self.positions[token] = Position(
token=token,
entry_price=price,
amount=amount,
stop_loss=risk_manager.get_stop_loss(price),
take_profit=risk_manager.get_take_profit(price),
opened_at=datetime.now().timestamp(),
)
def should_close(self, token: str, current_price: float) -> Optional[str]:
position = self.positions.get(token)
if not position:
return None
if current_price <= position.stop_loss:
return "STOP_LOSS"
if current_price >= position.take_profit:
return "TAKE_PROFIT"
return None
interface Position {
token: string;
entryPrice: number;
amount: bigint;
stopLoss: number;
takeProfit: number;
openedAt: number;
}
class PositionManager {
private positions: Map<string, Position> = new Map();
openPosition(token: string, price: number, amount: bigint, riskManager: RiskManager) {
this.positions.set(token, {
token,
entryPrice: price,
amount,
stopLoss: riskManager.getStopLoss(price),
takeProfit: riskManager.getTakeProfit(price),
openedAt: Date.now(),
});
}
shouldClose(token: string, currentPrice: number): 'STOP_LOSS' | 'TAKE_PROFIT' | null {
const position = this.positions.get(token);
if (!position) return null;
if (currentPrice <= position.stopLoss) return 'STOP_LOSS';
if (currentPrice >= position.takeProfit) return 'TAKE_PROFIT';
return null;
}
}
What's Next?
- Stateful Agents - Add memory and learning
- AI Trading System - Production-grade system with local LLMs
- Multi-Agent Coordination - Multiple specialized agents
Warning: This is for educational purposes. Trading involves significant risk. Always test thoroughly on testnets before using real funds.