Multi-Agent Coordination
Build sophisticated trading systems where multiple specialized agents collaborate, each focusing on what they do best.
Architecture
┌────────────────────────────────────────────────────────────────┐
│ Orchestrator Agent │
│ (Coordinates all agents) │
└───────────────────────┬────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Analyst │ │ Trader │ │ Risk │
│ Agent │ │ Agent │ │ Agent │
├───────────────┤ ├───────────────┤ ├───────────────┤
│ • Market data │ │ • Execution │ │ • Position │
│ • Technicals │ │ • Timing │ │ limits │
│ • Sentiment │ │ • Slippage │ │ • Stop-loss │
│ • Signals │ │ • Gas optim │ │ • Drawdown │
└───────────────┘ └───────────────┘ └───────────────┘
│
▼
┌─────────────────┐
│ ZeroQuant Vault │
└─────────────────┘
Agent Roles
| Agent | Role | Responsibility |
|---|---|---|
| Analyst | Market analysis | Produces signals, never executes |
| Trader | Execution | Optimizes entry/exit timing |
| Risk | Portfolio guard | Vetos trades that violate limits |
| Orchestrator | Coordinator | Makes final decisions |
Implementation
Base Agent Class
- Python
- TypeScript
# src/agents/base_agent.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable
from enum import Enum
class MessageType(Enum):
SIGNAL = "SIGNAL"
REQUEST = "REQUEST"
RESPONSE = "RESPONSE"
VETO = "VETO"
APPROVAL = "APPROVAL"
@dataclass
class AgentMessage:
from_agent: str
to_agent: str
type: MessageType
payload: Any
timestamp: float
class BaseAgent(ABC):
def __init__(self, name: str):
self.name = name
self.message_queue: list[AgentMessage] = []
self.on_send: Callable[[AgentMessage], None] | None = None
def receive_message(self, message: AgentMessage) -> None:
self.message_queue.append(message)
def send_message(self, to: str, type: MessageType, payload: Any) -> None:
import time
message = AgentMessage(
from_agent=self.name,
to_agent=to,
type=type,
payload=payload,
timestamp=time.time(),
)
if self.on_send:
self.on_send(message)
@abstractmethod
async def process(self) -> None:
pass
// src/agents/base-agent.ts
import { EventEmitter } from 'events';
export interface AgentMessage {
from: string;
to: string;
type: 'SIGNAL' | 'REQUEST' | 'RESPONSE' | 'VETO' | 'APPROVAL';
payload: any;
timestamp: number;
}
export abstract class BaseAgent extends EventEmitter {
public readonly name: string;
protected messageQueue: AgentMessage[] = [];
constructor(name: string) {
super();
this.name = name;
}
receiveMessage(message: AgentMessage): void {
this.messageQueue.push(message);
this.emit('message', message);
}
protected sendMessage(to: string, type: AgentMessage['type'], payload: any): void {
const message: AgentMessage = {
from: this.name,
to,
type,
payload,
timestamp: Date.now(),
};
this.emit('send', message);
}
abstract process(): Promise<void>;
}
Analyst Agent
- Python
- TypeScript
# src/agents/analyst_agent.py
import json
from dataclasses import dataclass
from langchain_openai import ChatOpenAI
from .base_agent import BaseAgent, MessageType
@dataclass
class AnalystSignal:
symbol: str
direction: str # "LONG", "SHORT", "NEUTRAL"
strength: int
reasoning: str
class AnalystAgent(BaseAgent):
def __init__(self, watchlist: list[str]):
super().__init__("Analyst")
self.watchlist = watchlist
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.REQUEST:
if message.payload.get("action") == "ANALYZE":
signal = await self.analyze_symbol(message.payload["symbol"])
self.send_message("Orchestrator", MessageType.SIGNAL, signal.__dict__)
self.message_queue = []
async def analyze_symbol(self, symbol: str) -> AnalystSignal:
# In production, fetch real data
prompt = f"""
Analyze {symbol} market conditions.
Consider technicals, sentiment, and on-chain metrics.
Respond in JSON:
{{"direction": "LONG" | "SHORT" | "NEUTRAL", "strength": 0-100, "reasoning": "explanation"}}
"""
response = await self.llm.ainvoke(prompt)
try:
data = json.loads(response.content)
return AnalystSignal(
symbol=symbol,
direction=data["direction"],
strength=data["strength"],
reasoning=data["reasoning"],
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Analysis failed for {symbol}: {e}")
return AnalystSignal(
symbol=symbol,
direction="NEUTRAL",
strength=0,
reasoning="Analysis failed",
)
// src/agents/analyst-agent.ts
import { BaseAgent, MessageType } from './base-agent';
import { ChatOpenAI } from '@langchain/openai';
interface AnalystSignal {
symbol: string;
direction: 'LONG' | 'SHORT' | 'NEUTRAL';
strength: number;
reasoning: string;
}
export class AnalystAgent extends BaseAgent {
private llm: ChatOpenAI;
private watchlist: string[];
constructor(watchlist: string[]) {
super('Analyst');
this.llm = new ChatOpenAI({ modelName: 'gpt-4', temperature: 0 });
this.watchlist = watchlist;
}
async process(): Promise<void> {
for (const message of this.messageQueue) {
if (message.type === 'REQUEST' && message.payload.action === 'ANALYZE') {
const signal = await this.analyzeSymbol(message.payload.symbol);
this.sendMessage('Orchestrator', 'SIGNAL', signal);
}
}
this.messageQueue = [];
}
private async analyzeSymbol(symbol: string): Promise<AnalystSignal> {
const response = await this.llm.invoke(`
Analyze ${symbol} market conditions.
Respond in JSON: {"direction": "LONG"|"SHORT"|"NEUTRAL", "strength": 0-100, "reasoning": "..."}
`);
try {
const data = JSON.parse(response.content as string);
return { symbol, ...data };
} catch {
return { symbol, direction: 'NEUTRAL', strength: 0, reasoning: 'Analysis failed' };
}
}
}
Trader Agent
- Python
- TypeScript
# src/agents/trader_agent.py
from dataclasses import dataclass
from zeroquant import ZeroQuantClient
from .base_agent import BaseAgent, MessageType
@dataclass
class TradeResult:
symbol: str
direction: str
size: int
tx_hash: str
class TraderAgent(BaseAgent):
def __init__(self, client: ZeroQuantClient):
super().__init__("Trader")
self.client = client
self.pending_signals: dict[str, dict] = {}
async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.APPROVAL:
# Risk agent approved, execute
signal_id = message.payload["signal_id"]
signal = self.pending_signals.get(signal_id)
if signal:
result = await self._execute_trade(signal)
self.send_message("Orchestrator", MessageType.RESPONSE, {
"action": "TRADE_EXECUTED",
"result": result.__dict__,
})
del self.pending_signals[signal_id]
elif message.type == MessageType.REQUEST:
if message.payload.get("action") == "PREPARE_TRADE":
# Store and request risk approval
signal_id = f"{message.payload['symbol']}-{message.timestamp}"
self.pending_signals[signal_id] = message.payload
self.send_message("Risk", MessageType.REQUEST, {
"action": "APPROVE_TRADE",
"signal_id": signal_id,
"signal": message.payload,
})
self.message_queue = []
async def _execute_trade(self, signal: dict) -> TradeResult:
WETH = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
token = self._get_token_address(signal["symbol"])
balance = await self.client.get_balance()
trade_size = int(balance * signal["size"] / 100)
path = [WETH, token] if signal["direction"] == "LONG" else [token, WETH]
tx = await self.client.execute_swap(
amount_in=trade_size,
path=path,
slippage_bps=100,
)
receipt = await tx.wait()
return TradeResult(
symbol=signal["symbol"],
direction=signal["direction"],
size=trade_size,
tx_hash=receipt.hash,
)
def _get_token_address(self, symbol: str) -> str:
addresses = {"USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"}
return addresses.get(symbol, "")
// src/agents/trader-agent.ts
import { BaseAgent } from './base-agent';
import { ZeroQuantClient } from '@zeroquant/sdk';
interface TradeResult {
symbol: string;
direction: string;
size: bigint;
txHash: string;
}
export class TraderAgent extends BaseAgent {
private client: ZeroQuantClient;
private pendingSignals: Map<string, any> = new Map();
constructor(client: ZeroQuantClient) {
super('Trader');
this.client = client;
}
async process(): Promise<void> {
for (const message of this.messageQueue) {
if (message.type === 'APPROVAL') {
const signal = this.pendingSignals.get(message.payload.signalId);
if (signal) {
const result = await this.executeTrade(signal);
this.sendMessage('Orchestrator', 'RESPONSE', {
action: 'TRADE_EXECUTED',
result,
});
this.pendingSignals.delete(message.payload.signalId);
}
}
if (message.type === 'REQUEST' && message.payload.action === 'PREPARE_TRADE') {
const signalId = `${message.payload.symbol}-${Date.now()}`;
this.pendingSignals.set(signalId, message.payload);
this.sendMessage('Risk', 'REQUEST', {
action: 'APPROVE_TRADE',
signalId,
signal: message.payload,
});
}
}
this.messageQueue = [];
}
private async executeTrade(signal: any): Promise<TradeResult> {
const WETH = process.env.WETH_ADDRESS!;
const token = this.getTokenAddress(signal.symbol);
const balance = await this.client.getBalance();
const tradeSize = (balance * BigInt(signal.size)) / 100n;
const path = signal.direction === 'LONG' ? [WETH, token] : [token, WETH];
const tx = await this.client.executeSwap({
amountIn: tradeSize,
path,
slippageBps: 100,
});
const receipt = await tx.wait();
return {
symbol: signal.symbol,
direction: signal.direction,
size: tradeSize,
txHash: receipt.hash,
};
}
private getTokenAddress(symbol: string): string {
const addresses: Record<string, string> = {
'USDC': '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48',
};
return addresses[symbol] || '';
}
}
Risk Agent
- Python
- TypeScript
# src/agents/risk_agent.py
from dataclasses import dataclass
from zeroquant import ZeroQuantClient
from .base_agent import BaseAgent, MessageType
@dataclass
class RiskConfig:
max_position_size_pct: float = 20
max_drawdown_pct: float = 10
max_daily_trades: int = 5
max_open_positions: int = 3
class RiskAgent(BaseAgent):
def __init__(self, client: ZeroQuantClient, config: RiskConfig | None = None):
super().__init__("Risk")
self.client = client
self.config = config or RiskConfig()
self.daily_trades = 0
self.peak_equity = 0
async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.REQUEST:
if message.payload.get("action") == "APPROVE_TRADE":
approval = await self._evaluate_trade(message.payload)
if approval["approved"]:
self.send_message("Trader", MessageType.APPROVAL, {
"signal_id": message.payload["signal_id"],
})
else:
self.send_message("Orchestrator", MessageType.VETO, {
"signal_id": message.payload["signal_id"],
"reason": approval["reason"],
})
self.message_queue = []
async def _evaluate_trade(self, payload: dict) -> dict:
signal = payload["signal"]
# Check daily trade limit
if self.daily_trades >= self.config.max_daily_trades:
return {"approved": False, "reason": "Daily trade limit reached"}
# Check position size
if signal["size"] > self.config.max_position_size_pct:
return {
"approved": False,
"reason": f"Position size {signal['size']}% exceeds max {self.config.max_position_size_pct}%",
}
# Check drawdown
current_equity = await self.client.get_balance()
if self.peak_equity > 0:
drawdown = (self.peak_equity - current_equity) / self.peak_equity * 100
if drawdown > self.config.max_drawdown_pct:
return {
"approved": False,
"reason": f"Drawdown {drawdown:.1f}% exceeds max {self.config.max_drawdown_pct}%",
}
# Update peak equity
if current_equity > self.peak_equity:
self.peak_equity = current_equity
self.daily_trades += 1
return {"approved": True}
def reset_daily(self) -> None:
self.daily_trades = 0
// src/agents/risk-agent.ts
import { BaseAgent } from './base-agent';
import { ZeroQuantClient } from '@zeroquant/sdk';
interface RiskConfig {
maxPositionSizePct: number;
maxDrawdownPct: number;
maxDailyTrades: number;
maxOpenPositions: number;
}
export class RiskAgent extends BaseAgent {
private client: ZeroQuantClient;
private config: RiskConfig;
private dailyTrades = 0;
private peakEquity = 0n;
constructor(client: ZeroQuantClient, config: Partial<RiskConfig> = {}) {
super('Risk');
this.client = client;
this.config = {
maxPositionSizePct: 20,
maxDrawdownPct: 10,
maxDailyTrades: 5,
maxOpenPositions: 3,
...config,
};
}
async process(): Promise<void> {
for (const message of this.messageQueue) {
if (message.type === 'REQUEST' && message.payload.action === 'APPROVE_TRADE') {
const approval = await this.evaluateTrade(message.payload);
if (approval.approved) {
this.sendMessage('Trader', 'APPROVAL', {
signalId: message.payload.signalId,
});
} else {
this.sendMessage('Orchestrator', 'VETO', {
signalId: message.payload.signalId,
reason: approval.reason,
});
}
}
}
this.messageQueue = [];
}
private async evaluateTrade(payload: any): Promise<{ approved: boolean; reason?: string }> {
const signal = payload.signal;
if (this.dailyTrades >= this.config.maxDailyTrades) {
return { approved: false, reason: 'Daily trade limit reached' };
}
if (signal.size > this.config.maxPositionSizePct) {
return {
approved: false,
reason: `Position size ${signal.size}% exceeds max ${this.config.maxPositionSizePct}%`,
};
}
const currentEquity = await this.client.getBalance();
if (this.peakEquity > 0n) {
const drawdown = Number(this.peakEquity - currentEquity) / Number(this.peakEquity) * 100;
if (drawdown > this.config.maxDrawdownPct) {
return {
approved: false,
reason: `Drawdown ${drawdown.toFixed(1)}% exceeds max ${this.config.maxDrawdownPct}%`,
};
}
}
if (currentEquity > this.peakEquity) {
this.peakEquity = currentEquity;
}
this.dailyTrades++;
return { approved: true };
}
resetDaily(): void {
this.dailyTrades = 0;
}
}
Orchestrator
- Python
- TypeScript
# src/agents/orchestrator.py
import asyncio
from .base_agent import BaseAgent, AgentMessage, MessageType
class Orchestrator(BaseAgent):
def __init__(self):
super().__init__("Orchestrator")
self.agents: dict[str, BaseAgent] = {}
self._running = False
def register_agent(self, agent: BaseAgent) -> None:
self.agents[agent.name] = agent
# Forward messages between agents
def on_send(message: AgentMessage):
if message.to_agent == "Orchestrator":
self.receive_message(message)
else:
target = self.agents.get(message.to_agent)
if target:
target.receive_message(message)
agent.on_send = on_send
async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.SIGNAL:
await self._handle_signal(message.payload)
elif message.type == MessageType.VETO:
print(f"Trade vetoed: {message.payload['reason']}")
elif message.type == MessageType.RESPONSE:
if message.payload.get("action") == "TRADE_EXECUTED":
print(f"Trade executed: {message.payload['result']['tx_hash']}")
self.message_queue = []
# Process all agents
for agent in self.agents.values():
await agent.process()
async def _handle_signal(self, signal: dict) -> None:
print(f"\nSignal for {signal['symbol']}:")
print(f" Direction: {signal['direction']}")
print(f" Strength: {signal['strength']}%")
print(f" Reasoning: {signal['reasoning']}")
if signal["direction"] != "NEUTRAL" and signal["strength"] >= 70:
trader = self.agents.get("Trader")
if trader:
import time
trader.receive_message(AgentMessage(
from_agent="Orchestrator",
to_agent="Trader",
type=MessageType.REQUEST,
payload={
"action": "PREPARE_TRADE",
**signal,
"size": min(signal["strength"] // 5, 20),
},
timestamp=time.time(),
))
async def start_analysis(self, symbols: list[str], interval_ms: int) -> None:
self._running = True
while self._running:
analyst = self.agents.get("Analyst")
if analyst:
for symbol in symbols:
import time
analyst.receive_message(AgentMessage(
from_agent="Orchestrator",
to_agent="Analyst",
type=MessageType.REQUEST,
payload={"action": "ANALYZE", "symbol": symbol},
timestamp=time.time(),
))
await self.process()
await asyncio.sleep(interval_ms / 1000)
def stop(self) -> None:
self._running = False
// src/agents/orchestrator.ts
import { BaseAgent, AgentMessage } from './base-agent';
export class Orchestrator extends BaseAgent {
private agents: Map<string, BaseAgent> = new Map();
private analysisInterval: NodeJS.Timer | null = null;
constructor() {
super('Orchestrator');
}
registerAgent(agent: BaseAgent): void {
this.agents.set(agent.name, agent);
agent.on('send', (message: AgentMessage) => {
if (message.to === 'Orchestrator') {
this.receiveMessage(message);
} else {
const target = this.agents.get(message.to);
if (target) target.receiveMessage(message);
}
});
}
async process(): Promise<void> {
for (const message of this.messageQueue) {
if (message.type === 'SIGNAL') {
await this.handleSignal(message.payload);
} else if (message.type === 'VETO') {
console.log(`Trade vetoed: ${message.payload.reason}`);
} else if (message.type === 'RESPONSE' && message.payload.action === 'TRADE_EXECUTED') {
console.log(`Trade executed: ${message.payload.result.txHash}`);
}
}
this.messageQueue = [];
for (const agent of this.agents.values()) {
await agent.process();
}
}
private async handleSignal(signal: any): Promise<void> {
console.log(`\nSignal for ${signal.symbol}:`);
console.log(` Direction: ${signal.direction}`);
console.log(` Strength: ${signal.strength}%`);
console.log(` Reasoning: ${signal.reasoning}`);
if (signal.direction !== 'NEUTRAL' && signal.strength >= 70) {
const trader = this.agents.get('Trader');
if (trader) {
trader.receiveMessage({
from: 'Orchestrator',
to: 'Trader',
type: 'REQUEST',
payload: {
action: 'PREPARE_TRADE',
...signal,
size: Math.min(Math.floor(signal.strength / 5), 20),
},
timestamp: Date.now(),
});
}
}
}
startAnalysis(symbols: string[], intervalMs: number): void {
this.analysisInterval = setInterval(async () => {
const analyst = this.agents.get('Analyst');
if (!analyst) return;
for (const symbol of symbols) {
analyst.receiveMessage({
from: 'Orchestrator',
to: 'Analyst',
type: 'REQUEST',
payload: { action: 'ANALYZE', symbol },
timestamp: Date.now(),
});
}
await this.process();
}, intervalMs);
}
stop(): void {
if (this.analysisInterval) {
clearInterval(this.analysisInterval);
}
}
}
Running the System
- Python
- TypeScript
# src/main.py
import os
import asyncio
from dotenv import load_dotenv
from web3 import Web3
from zeroquant import ZeroQuantClient
from agents.orchestrator import Orchestrator
from agents.analyst_agent import AnalystAgent
from agents.trader_agent import TraderAgent
from agents.risk_agent import RiskAgent, RiskConfig
load_dotenv()
async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))
client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
await client.connect_vault(os.getenv("VAULT_ADDRESS"))
# Create agents
orchestrator = Orchestrator()
analyst = AnalystAgent(["ETH", "BTC", "LINK"])
trader = TraderAgent(client)
risk = RiskAgent(client, RiskConfig(
max_position_size_pct=20,
max_drawdown_pct=10,
max_daily_trades=5,
))
# Register agents
orchestrator.register_agent(analyst)
orchestrator.register_agent(trader)
orchestrator.register_agent(risk)
print("Multi-Agent Trading System Started")
await orchestrator.start_analysis(["ETH", "BTC"], 60000)
if __name__ == "__main__":
asyncio.run(main())
// src/index.ts
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
import { Orchestrator } from './agents/orchestrator';
import { AnalystAgent } from './agents/analyst-agent';
import { TraderAgent } from './agents/trader-agent';
import { RiskAgent } from './agents/risk-agent';
import 'dotenv/config';
async function main() {
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const signer = new ethers.Wallet(process.env.PRIVATE_KEY!, provider);
const client = new ZeroQuantClient(provider, {
factoryAddress: process.env.FACTORY_ADDRESS!,
permissionManagerAddress: process.env.PERMISSION_MANAGER_ADDRESS!,
});
await client.connect(signer);
await client.connectVault(process.env.VAULT_ADDRESS!);
// Create agents
const orchestrator = new Orchestrator();
const analyst = new AnalystAgent(['ETH', 'BTC', 'LINK']);
const trader = new TraderAgent(client);
const risk = new RiskAgent(client, {
maxPositionSizePct: 20,
maxDrawdownPct: 10,
maxDailyTrades: 5,
});
// Register agents
orchestrator.registerAgent(analyst);
orchestrator.registerAgent(trader);
orchestrator.registerAgent(risk);
console.log('Multi-Agent Trading System Started');
orchestrator.startAnalysis(['ETH', 'BTC'], 60000);
process.on('SIGINT', () => {
orchestrator.stop();
process.exit(0);
});
}
main().catch(console.error);
Communication Patterns
Request-Response
Orchestrator → Analyst: REQUEST { action: 'ANALYZE', symbol: 'ETH' }
Analyst → Orchestrator: SIGNAL { direction: 'LONG', strength: 85 }
Approval Chain
Trader → Risk: REQUEST { action: 'APPROVE_TRADE', signal: ... }
Risk → Trader: APPROVAL { signalId: '...' }
-or-
Risk → Orchestrator: VETO { signalId: '...', reason: '...' }
What's Next?
- Local LLM Setup - Run agents privately with Ollama
- AI Trading System - Single-agent production system