Skip to main content

Multi-Agent Coordination

Build sophisticated trading systems where multiple specialized agents collaborate, each focusing on what they do best.

Architecture

┌────────────────────────────────────────────────────────────────┐
│ Orchestrator Agent │
│ (Coordinates all agents) │
└───────────────────────┬────────────────────────────────────────┘

┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Analyst │ │ Trader │ │ Risk │
│ Agent │ │ Agent │ │ Agent │
├───────────────┤ ├───────────────┤ ├───────────────┤
│ • Market data │ │ • Execution │ │ • Position │
│ • Technicals │ │ • Timing │ │ limits │
│ • Sentiment │ │ • Slippage │ │ • Stop-loss │
│ • Signals │ │ • Gas optim │ │ • Drawdown │
└───────────────┘ └───────────────┘ └───────────────┘


┌─────────────────┐
│ ZeroQuant Vault │
└─────────────────┘

Agent Roles

AgentRoleResponsibility
AnalystMarket analysisProduces signals, never executes
TraderExecutionOptimizes entry/exit timing
RiskPortfolio guardVetos trades that violate limits
OrchestratorCoordinatorMakes final decisions

Implementation

Base Agent Class

# src/agents/base_agent.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable
from enum import Enum


class MessageType(Enum):
SIGNAL = "SIGNAL"
REQUEST = "REQUEST"
RESPONSE = "RESPONSE"
VETO = "VETO"
APPROVAL = "APPROVAL"


@dataclass
class AgentMessage:
from_agent: str
to_agent: str
type: MessageType
payload: Any
timestamp: float


class BaseAgent(ABC):
def __init__(self, name: str):
self.name = name
self.message_queue: list[AgentMessage] = []
self.on_send: Callable[[AgentMessage], None] | None = None

def receive_message(self, message: AgentMessage) -> None:
self.message_queue.append(message)

def send_message(self, to: str, type: MessageType, payload: Any) -> None:
import time
message = AgentMessage(
from_agent=self.name,
to_agent=to,
type=type,
payload=payload,
timestamp=time.time(),
)
if self.on_send:
self.on_send(message)

@abstractmethod
async def process(self) -> None:
pass

Analyst Agent

# src/agents/analyst_agent.py
import json
from dataclasses import dataclass
from langchain_openai import ChatOpenAI

from .base_agent import BaseAgent, MessageType


@dataclass
class AnalystSignal:
symbol: str
direction: str # "LONG", "SHORT", "NEUTRAL"
strength: int
reasoning: str


class AnalystAgent(BaseAgent):
def __init__(self, watchlist: list[str]):
super().__init__("Analyst")
self.watchlist = watchlist
self.llm = ChatOpenAI(model="gpt-4", temperature=0)

async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.REQUEST:
if message.payload.get("action") == "ANALYZE":
signal = await self.analyze_symbol(message.payload["symbol"])
self.send_message("Orchestrator", MessageType.SIGNAL, signal.__dict__)
self.message_queue = []

async def analyze_symbol(self, symbol: str) -> AnalystSignal:
# In production, fetch real data
prompt = f"""
Analyze {symbol} market conditions.
Consider technicals, sentiment, and on-chain metrics.

Respond in JSON:
{{"direction": "LONG" | "SHORT" | "NEUTRAL", "strength": 0-100, "reasoning": "explanation"}}
"""
response = await self.llm.ainvoke(prompt)

try:
data = json.loads(response.content)
return AnalystSignal(
symbol=symbol,
direction=data["direction"],
strength=data["strength"],
reasoning=data["reasoning"],
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Analysis failed for {symbol}: {e}")
return AnalystSignal(
symbol=symbol,
direction="NEUTRAL",
strength=0,
reasoning="Analysis failed",
)

Trader Agent

# src/agents/trader_agent.py
from dataclasses import dataclass
from zeroquant import ZeroQuantClient

from .base_agent import BaseAgent, MessageType


@dataclass
class TradeResult:
symbol: str
direction: str
size: int
tx_hash: str


class TraderAgent(BaseAgent):
def __init__(self, client: ZeroQuantClient):
super().__init__("Trader")
self.client = client
self.pending_signals: dict[str, dict] = {}

async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.APPROVAL:
# Risk agent approved, execute
signal_id = message.payload["signal_id"]
signal = self.pending_signals.get(signal_id)
if signal:
result = await self._execute_trade(signal)
self.send_message("Orchestrator", MessageType.RESPONSE, {
"action": "TRADE_EXECUTED",
"result": result.__dict__,
})
del self.pending_signals[signal_id]

elif message.type == MessageType.REQUEST:
if message.payload.get("action") == "PREPARE_TRADE":
# Store and request risk approval
signal_id = f"{message.payload['symbol']}-{message.timestamp}"
self.pending_signals[signal_id] = message.payload
self.send_message("Risk", MessageType.REQUEST, {
"action": "APPROVE_TRADE",
"signal_id": signal_id,
"signal": message.payload,
})

self.message_queue = []

async def _execute_trade(self, signal: dict) -> TradeResult:
WETH = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
token = self._get_token_address(signal["symbol"])

balance = await self.client.get_balance()
trade_size = int(balance * signal["size"] / 100)

path = [WETH, token] if signal["direction"] == "LONG" else [token, WETH]

tx = await self.client.execute_swap(
amount_in=trade_size,
path=path,
slippage_bps=100,
)
receipt = await tx.wait()

return TradeResult(
symbol=signal["symbol"],
direction=signal["direction"],
size=trade_size,
tx_hash=receipt.hash,
)

def _get_token_address(self, symbol: str) -> str:
addresses = {"USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"}
return addresses.get(symbol, "")

Risk Agent

# src/agents/risk_agent.py
from dataclasses import dataclass
from zeroquant import ZeroQuantClient

from .base_agent import BaseAgent, MessageType


@dataclass
class RiskConfig:
max_position_size_pct: float = 20
max_drawdown_pct: float = 10
max_daily_trades: int = 5
max_open_positions: int = 3


class RiskAgent(BaseAgent):
def __init__(self, client: ZeroQuantClient, config: RiskConfig | None = None):
super().__init__("Risk")
self.client = client
self.config = config or RiskConfig()
self.daily_trades = 0
self.peak_equity = 0

async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.REQUEST:
if message.payload.get("action") == "APPROVE_TRADE":
approval = await self._evaluate_trade(message.payload)

if approval["approved"]:
self.send_message("Trader", MessageType.APPROVAL, {
"signal_id": message.payload["signal_id"],
})
else:
self.send_message("Orchestrator", MessageType.VETO, {
"signal_id": message.payload["signal_id"],
"reason": approval["reason"],
})

self.message_queue = []

async def _evaluate_trade(self, payload: dict) -> dict:
signal = payload["signal"]

# Check daily trade limit
if self.daily_trades >= self.config.max_daily_trades:
return {"approved": False, "reason": "Daily trade limit reached"}

# Check position size
if signal["size"] > self.config.max_position_size_pct:
return {
"approved": False,
"reason": f"Position size {signal['size']}% exceeds max {self.config.max_position_size_pct}%",
}

# Check drawdown
current_equity = await self.client.get_balance()
if self.peak_equity > 0:
drawdown = (self.peak_equity - current_equity) / self.peak_equity * 100
if drawdown > self.config.max_drawdown_pct:
return {
"approved": False,
"reason": f"Drawdown {drawdown:.1f}% exceeds max {self.config.max_drawdown_pct}%",
}

# Update peak equity
if current_equity > self.peak_equity:
self.peak_equity = current_equity

self.daily_trades += 1
return {"approved": True}

def reset_daily(self) -> None:
self.daily_trades = 0

Orchestrator

# src/agents/orchestrator.py
import asyncio
from .base_agent import BaseAgent, AgentMessage, MessageType


class Orchestrator(BaseAgent):
def __init__(self):
super().__init__("Orchestrator")
self.agents: dict[str, BaseAgent] = {}
self._running = False

def register_agent(self, agent: BaseAgent) -> None:
self.agents[agent.name] = agent

# Forward messages between agents
def on_send(message: AgentMessage):
if message.to_agent == "Orchestrator":
self.receive_message(message)
else:
target = self.agents.get(message.to_agent)
if target:
target.receive_message(message)

agent.on_send = on_send

async def process(self) -> None:
for message in self.message_queue:
if message.type == MessageType.SIGNAL:
await self._handle_signal(message.payload)

elif message.type == MessageType.VETO:
print(f"Trade vetoed: {message.payload['reason']}")

elif message.type == MessageType.RESPONSE:
if message.payload.get("action") == "TRADE_EXECUTED":
print(f"Trade executed: {message.payload['result']['tx_hash']}")

self.message_queue = []

# Process all agents
for agent in self.agents.values():
await agent.process()

async def _handle_signal(self, signal: dict) -> None:
print(f"\nSignal for {signal['symbol']}:")
print(f" Direction: {signal['direction']}")
print(f" Strength: {signal['strength']}%")
print(f" Reasoning: {signal['reasoning']}")

if signal["direction"] != "NEUTRAL" and signal["strength"] >= 70:
trader = self.agents.get("Trader")
if trader:
import time
trader.receive_message(AgentMessage(
from_agent="Orchestrator",
to_agent="Trader",
type=MessageType.REQUEST,
payload={
"action": "PREPARE_TRADE",
**signal,
"size": min(signal["strength"] // 5, 20),
},
timestamp=time.time(),
))

async def start_analysis(self, symbols: list[str], interval_ms: int) -> None:
self._running = True

while self._running:
analyst = self.agents.get("Analyst")
if analyst:
for symbol in symbols:
import time
analyst.receive_message(AgentMessage(
from_agent="Orchestrator",
to_agent="Analyst",
type=MessageType.REQUEST,
payload={"action": "ANALYZE", "symbol": symbol},
timestamp=time.time(),
))

await self.process()
await asyncio.sleep(interval_ms / 1000)

def stop(self) -> None:
self._running = False

Running the System

# src/main.py
import os
import asyncio
from dotenv import load_dotenv
from web3 import Web3

from zeroquant import ZeroQuantClient
from agents.orchestrator import Orchestrator
from agents.analyst_agent import AnalystAgent
from agents.trader_agent import TraderAgent
from agents.risk_agent import RiskAgent, RiskConfig

load_dotenv()


async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))

client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
await client.connect_vault(os.getenv("VAULT_ADDRESS"))

# Create agents
orchestrator = Orchestrator()
analyst = AnalystAgent(["ETH", "BTC", "LINK"])
trader = TraderAgent(client)
risk = RiskAgent(client, RiskConfig(
max_position_size_pct=20,
max_drawdown_pct=10,
max_daily_trades=5,
))

# Register agents
orchestrator.register_agent(analyst)
orchestrator.register_agent(trader)
orchestrator.register_agent(risk)

print("Multi-Agent Trading System Started")
await orchestrator.start_analysis(["ETH", "BTC"], 60000)


if __name__ == "__main__":
asyncio.run(main())

Communication Patterns

Request-Response

Orchestrator → Analyst: REQUEST { action: 'ANALYZE', symbol: 'ETH' }
Analyst → Orchestrator: SIGNAL { direction: 'LONG', strength: 85 }

Approval Chain

Trader → Risk: REQUEST { action: 'APPROVE_TRADE', signal: ... }
Risk → Trader: APPROVAL { signalId: '...' }
-or-
Risk → Orchestrator: VETO { signalId: '...', reason: '...' }

What's Next?