Skip to main content

Building a Trading Agent

Learn how to build an AI trading agent that can analyze market conditions, make trading decisions, and execute swaps through your ZeroQuant vault.

Architecture Overview

┌─────────────────────────────────────────────────────────┐
│ Trading Agent │
├──────────────┬──────────────┬──────────────┬───────────┤
│ Market │ Decision │ Execution │ Risk │
│ Analysis │ Engine │ Layer │ Manager │
├──────────────┼──────────────┼──────────────┼───────────┤
│ • Price feeds│ • LLM-based │ • Swap exec │ • Position│
│ • Indicators │ • Rule-based │ • Batch ops │ limits │
│ • Sentiment │ • Hybrid │ • Gas optim │ • Stop- │
│ │ │ │ loss │
└──────────────┴──────────────┴──────────────┴───────────┘


┌─────────────────┐
│ ZeroQuant SDK │
│ + Vault │
└─────────────────┘

Prerequisites

pip install zeroquant langchain langchain-openai web3 aiohttp python-dotenv

Step 1: Market Data Integration

First, create a service to fetch market data:

# src/services/market_data.py
import aiohttp
from dataclasses import dataclass
from typing import Optional


@dataclass
class PriceData:
symbol: str
price: float
change_24h: float
volume_24h: float


@dataclass
class MarketIndicators:
rsi: float
macd: dict # {"value": float, "signal": float, "histogram": float}
sma20: float
sma50: float


class MarketDataService:
def __init__(self):
self.base_url = "https://api.coingecko.com/api/v3"

async def get_price(self, token_id: str) -> PriceData:
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/simple/price"
params = {
"ids": token_id,
"vs_currencies": "usd",
"include_24hr_change": "true",
"include_24hr_vol": "true",
}
async with session.get(url, params=params) as response:
data = await response.json()

token_data = data[token_id]
return PriceData(
symbol=token_id.upper(),
price=token_data["usd"],
change_24h=token_data.get("usd_24h_change", 0),
volume_24h=token_data.get("usd_24h_vol", 0),
)

async def get_historical_prices(self, token_id: str, days: int = 30) -> list[float]:
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/coins/{token_id}/market_chart"
params = {"vs_currency": "usd", "days": days}
async with session.get(url, params=params) as response:
data = await response.json()

return [p[1] for p in data["prices"]]

def calculate_rsi(self, prices: list[float], period: int = 14) -> float:
if len(prices) < period + 1:
return 50.0

gains = 0.0
losses = 0.0

for i in range(1, period + 1):
change = prices[i] - prices[i - 1]
if change > 0:
gains += change
else:
losses -= change

avg_gain = gains / period
avg_loss = losses / period

if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))

def calculate_sma(self, prices: list[float], period: int) -> float:
if len(prices) < period:
return prices[-1]
return sum(prices[-period:]) / period

def calculate_ema(self, prices: list[float], period: int) -> float:
multiplier = 2 / (period + 1)
ema = prices[0]

for price in prices[1:]:
ema = (price - ema) * multiplier + ema

return ema

def calculate_macd(self, prices: list[float]) -> dict:
ema12 = self.calculate_ema(prices, 12)
ema26 = self.calculate_ema(prices, 26)
macd_line = ema12 - ema26
signal = self.calculate_ema([macd_line] * 9, 9)

return {
"value": macd_line,
"signal": signal,
"histogram": macd_line - signal,
}

async def get_indicators(self, token_id: str) -> MarketIndicators:
prices = await self.get_historical_prices(token_id, 60)

return MarketIndicators(
rsi=self.calculate_rsi(prices[-15:]),
macd=self.calculate_macd(prices),
sma20=self.calculate_sma(prices, 20),
sma50=self.calculate_sma(prices, 50),
)

Step 2: Trading Decision Engine

Create an LLM-powered decision engine:

# src/services/decision_engine.py
import asyncio
import json
from dataclasses import dataclass
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate

from .market_data import MarketDataService


@dataclass
class TradingSignal:
action: str # "BUY", "SELL", or "HOLD"
confidence: int
reasoning: str
suggested_amount: Optional[str] = None


class DecisionEngine:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.market_data = MarketDataService()

async def analyze(self, token_id: str) -> TradingSignal:
price, indicators = await asyncio.gather(
self.market_data.get_price(token_id),
self.market_data.get_indicators(token_id),
)

prompt = PromptTemplate.from_template("""
You are a professional crypto trading analyst. Analyze the following market data and provide a trading recommendation.

Current Market Data for {symbol}:
- Price: ${price:.2f}
- 24h Change: {change_24h:.2f}%
- 24h Volume: ${volume_24h:,.0f}

Technical Indicators:
- RSI (14): {rsi:.2f} (Oversold < 30, Overbought > 70)
- MACD: {macd_value:.4f} (Signal: {macd_signal:.4f})
- SMA 20: {sma20:.2f}
- SMA 50: {sma50:.2f}
- Trend: {trend}

Based on this analysis, provide a recommendation in the following JSON format:
{{
"action": "BUY" | "SELL" | "HOLD",
"confidence": 0-100,
"reasoning": "Brief explanation",
"suggested_amount": "percentage of portfolio (e.g., '10%')"
}}

Only respond with valid JSON.
""")

trend = "BULLISH" if indicators.sma20 > indicators.sma50 else "BEARISH"

response = await self.llm.ainvoke(
prompt.format(
symbol=price.symbol,
price=price.price,
change_24h=price.change_24h,
volume_24h=price.volume_24h,
rsi=indicators.rsi,
macd_value=indicators.macd["value"],
macd_signal=indicators.macd["signal"],
sma20=indicators.sma20,
sma50=indicators.sma50,
trend=trend,
)
)

try:
data = json.loads(response.content)
return TradingSignal(
action=data["action"],
confidence=data["confidence"],
reasoning=data["reasoning"],
suggested_amount=data.get("suggested_amount"),
)
except (json.JSONDecodeError, KeyError):
return TradingSignal(
action="HOLD",
confidence=0,
reasoning="Failed to parse LLM response",
)

Step 3: Risk Manager

Implement safety controls:

# src/services/risk_manager.py
from dataclasses import dataclass
from typing import Optional
from web3 import Web3
from zeroquant import ZeroQuantClient


@dataclass
class RiskConfig:
max_position_size_pct: float = 10.0 # Max % of portfolio per trade
max_daily_loss_pct: float = 5.0 # Max daily loss before stopping
stop_loss_pct: float = 2.0 # Per-trade stop loss
take_profit_pct: float = 5.0 # Per-trade take profit
max_slippage_bps: int = 100 # Max acceptable slippage


@dataclass
class TradeValidation:
approved: bool
reason: Optional[str] = None
adjusted_amount: Optional[int] = None


class RiskManager:
def __init__(self, client: ZeroQuantClient, config: Optional[RiskConfig] = None):
self.client = client
self.config = config or RiskConfig()
self.daily_pnl: float = 0.0

async def validate_trade(
self,
amount_in: int,
token_in: str,
token_out: str,
) -> TradeValidation:
# Check daily loss limit
if self.daily_pnl < -self.config.max_daily_loss_pct:
return TradeValidation(
approved=False,
reason=f"Daily loss limit reached ({self.daily_pnl:.2f}%)",
)

# Get vault balance
balance = await self.client.get_balance()

# Check position size
position_size_pct = (amount_in / balance) * 100
if position_size_pct > self.config.max_position_size_pct:
adjusted_amount = int(balance * self.config.max_position_size_pct / 100)
return TradeValidation(
approved=True,
reason=f"Position size adjusted from {position_size_pct:.1f}% to {self.config.max_position_size_pct}%",
adjusted_amount=adjusted_amount,
)

return TradeValidation(approved=True)

def update_pnl(self, pnl: float) -> None:
self.daily_pnl += pnl

def reset_daily(self) -> None:
self.daily_pnl = 0.0

def get_stop_loss(self, entry_price: float) -> float:
return entry_price * (1 - self.config.stop_loss_pct / 100)

def get_take_profit(self, entry_price: float) -> float:
return entry_price * (1 + self.config.take_profit_pct / 100)

Step 4: Complete Trading Agent

Bring it all together:

# src/trading_agent.py
import asyncio
import re
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from web3 import Web3

from zeroquant import ZeroQuantClient
from .services.decision_engine import DecisionEngine, TradingSignal
from .services.risk_manager import RiskManager, RiskConfig
from .services.market_data import MarketDataService


@dataclass
class TradingPair:
token_in: str
token_out: str
token_id: str


@dataclass
class TradingAgentConfig:
client: ZeroQuantClient
vault_address: str
trading_pairs: list[TradingPair]
risk_config: Optional[RiskConfig] = None
check_interval_ms: int = 60000


class TradingAgent:
def __init__(self, config: TradingAgentConfig):
self.client = config.client
self.vault_address = config.vault_address
self.trading_pairs = config.trading_pairs
self.check_interval = config.check_interval_ms / 1000 # Convert to seconds

self.decision_engine = DecisionEngine()
self.risk_manager = RiskManager(self.client, config.risk_config)
self.market_data = MarketDataService()
self.is_running = False

async def start(self) -> None:
print("Trading Agent started")
self.is_running = True

while self.is_running:
await self._run_trading_cycle()
await asyncio.sleep(self.check_interval)

def stop(self) -> None:
self.is_running = False
print("Trading Agent stopped")

async def _run_trading_cycle(self) -> None:
print("\n--- Trading Cycle ---")
print(f"Time: {datetime.now().isoformat()}")

for pair in self.trading_pairs:
try:
# Analyze market
signal = await self.decision_engine.analyze(pair.token_id)
print(f"\n{pair.token_id.upper()} Analysis:")
print(f" Action: {signal.action}")
print(f" Confidence: {signal.confidence}%")
print(f" Reasoning: {signal.reasoning}")

# Only act on high-confidence signals
if signal.confidence < 70:
print(" -> Skipping: Low confidence")
continue

if signal.action == "HOLD":
print(" -> Holding position")
continue

# Calculate trade amount
balance = await self.client.get_balance()
trade_amount = self._calculate_trade_amount(balance, signal)

# Validate with risk manager
validation = await self.risk_manager.validate_trade(
trade_amount,
pair.token_in,
pair.token_out,
)

if not validation.approved:
print(f" -> Trade rejected: {validation.reason}")
continue

final_amount = validation.adjusted_amount or trade_amount
if validation.reason:
print(f" -> {validation.reason}")

# Execute trade
if signal.action == "BUY":
await self._execute_buy(pair.token_in, pair.token_out, final_amount)
elif signal.action == "SELL":
await self._execute_sell(pair.token_out, pair.token_in, final_amount)

except Exception as e:
print(f"Error processing {pair.token_id}: {e}")

def _calculate_trade_amount(self, balance: int, signal: TradingSignal) -> int:
# Parse suggested amount (e.g., "10%")
match = re.search(r"(\d+)%", signal.suggested_amount or "")
pct = int(match.group(1)) if match else 5 # Default 5%

return int(balance * pct / 100)

async def _execute_buy(
self,
token_in: str,
token_out: str,
amount: int,
) -> None:
print(f" -> Executing BUY: {Web3.from_wei(amount, 'ether')} ETH -> {token_out}")

try:
tx = await self.client.execute_swap(
amount_in=amount,
path=[token_in, token_out],
slippage_bps=100,
)

print(f" -> Transaction: {tx.hash}")
await tx.wait()
print(" -> Trade completed")
except Exception as e:
print(f" -> Trade failed: {e}")

async def _execute_sell(
self,
token_in: str,
token_out: str,
amount: int,
) -> None:
print(f" -> Executing SELL: {token_in} -> ETH")

try:
tx = await self.client.execute_swap(
amount_in=amount,
path=[token_in, token_out],
slippage_bps=100,
)

print(f" -> Transaction: {tx.hash}")
await tx.wait()
print(" -> Trade completed")
except Exception as e:
print(f" -> Trade failed: {e}")

Step 5: Run the Agent

# src/main.py
import os
import asyncio
import signal
from dotenv import load_dotenv
from web3 import Web3

from zeroquant import ZeroQuantClient
from trading_agent import TradingAgent, TradingAgentConfig, TradingPair
from services.risk_manager import RiskConfig

load_dotenv()


async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))

client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()

# Connect to vault
await client.connect_vault(os.getenv("VAULT_ADDRESS"))

agent = TradingAgent(TradingAgentConfig(
client=client,
vault_address=os.getenv("VAULT_ADDRESS"),
trading_pairs=[
TradingPair(
token_in="0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", # WETH
token_out="0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", # USDC
token_id="ethereum",
),
],
risk_config=RiskConfig(
max_position_size_pct=10,
max_daily_loss_pct=5,
stop_loss_pct=3,
take_profit_pct=8,
),
check_interval_ms=300000, # 5 minutes
))

# Handle shutdown
def shutdown_handler(sig, frame):
agent.stop()

signal.signal(signal.SIGINT, shutdown_handler)

await agent.start()


if __name__ == "__main__":
asyncio.run(main())

Key Concepts

Stateless vs Stateful Decisions

Stateless: Each decision is independent based on current market state.

signal = await decision_engine.analyze("ethereum")
# No memory of previous decisions

Stateful: Track history for pattern recognition.

class StatefulDecisionEngine:
def __init__(self):
self.history: list[TradingSignal] = []

async def analyze(self, token_id: str) -> TradingSignal:
signal = await self._base_analyze(token_id)

# Avoid flip-flopping
if self.history:
last_action = self.history[-1].action
if last_action != signal.action and signal.confidence < 80:
signal.action = "HOLD"
signal.reasoning = "Avoiding rapid position change"

self.history.append(signal)
return signal

Position Management

Track open positions and their performance:

from dataclasses import dataclass


@dataclass
class Position:
token: str
entry_price: float
amount: int
stop_loss: float
take_profit: float
opened_at: float


class PositionManager:
def __init__(self):
self.positions: dict[str, Position] = {}

def open_position(
self,
token: str,
price: float,
amount: int,
risk_manager: RiskManager,
) -> None:
self.positions[token] = Position(
token=token,
entry_price=price,
amount=amount,
stop_loss=risk_manager.get_stop_loss(price),
take_profit=risk_manager.get_take_profit(price),
opened_at=datetime.now().timestamp(),
)

def should_close(self, token: str, current_price: float) -> Optional[str]:
position = self.positions.get(token)
if not position:
return None

if current_price <= position.stop_loss:
return "STOP_LOSS"
if current_price >= position.take_profit:
return "TAKE_PROFIT"
return None

What's Next?


Warning: This is for educational purposes. Trading involves significant risk. Always test thoroughly on testnets before using real funds.