Stateful Agents
Learn how to add memory and persistent state to your AI agents, enabling them to learn from past interactions and make smarter decisions over time.
Stateless vs Stateful
Stateless Agent
Each interaction is independent. No memory of previous conversations or decisions.
- Python
- TypeScript
# Every call is fresh - no context
result1 = await agent.ainvoke({"input": "Buy ETH"})
result2 = await agent.ainvoke({"input": "What did I just buy?"})
# Agent doesn't know about the previous buy
// Every call is fresh - no context
const result1 = await agent.invoke({ input: 'Buy ETH' });
const result2 = await agent.invoke({ input: 'What did I just buy?' });
// Agent doesn't know about the previous buy
Stateful Agent
Maintains context across interactions. Remembers previous decisions and their outcomes.
- Python
- TypeScript
# Agent remembers context
result1 = await agent.ainvoke({"input": "Buy ETH"})
result2 = await agent.ainvoke({"input": "What did I just buy?"})
# Agent knows: "You bought ETH in the previous transaction"
// Agent remembers context
const result1 = await agent.invoke({ input: 'Buy ETH' });
const result2 = await agent.invoke({ input: 'What did I just buy?' });
// Agent knows: "You bought ETH in the previous transaction"
Memory Types
1. Conversation Memory
Remember recent messages in the conversation:
- Python
- TypeScript
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
llm = ChatOpenAI(model="gpt-4", temperature=0)
memory = ConversationBufferMemory(
return_messages=True,
memory_key="history",
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a DeFi assistant."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
chain = ConversationChain(
llm=llm,
memory=memory,
prompt=prompt,
)
# Conversations now have context
await chain.acall({"input": "My vault address is 0x123..."})
await chain.acall({"input": "What is my vault address?"})
# Agent remembers: 0x123...
import { BufferMemory } from 'langchain/memory';
import { ConversationChain } from 'langchain/chains';
const memory = new BufferMemory({
returnMessages: true,
memoryKey: 'history',
});
const chain = new ConversationChain({
llm,
memory,
prompt: ChatPromptTemplate.fromMessages([
['system', 'You are a DeFi assistant.'],
['placeholder', '{history}'],
['human', '{input}'],
]),
});
// Conversations now have context
await chain.call({ input: 'My vault address is 0x123...' });
await chain.call({ input: 'What is my vault address?' });
// Agent remembers: 0x123...
2. Summary Memory
Compress long conversations into summaries:
- Python
- TypeScript
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="summary",
)
# Long conversations get summarized
# "User created vault, executed 3 swaps, current balance is 5 ETH"
import { ConversationSummaryMemory } from 'langchain/memory';
const memory = new ConversationSummaryMemory({
llm,
memoryKey: 'summary',
});
// Long conversations get summarized
// "User created vault, executed 3 swaps, current balance is 5 ETH"
3. Vault Memory (ZeroQuant)
Track vault-specific state:
- Python
- TypeScript
from zeroquant.langchain.memory import VaultMemory
from datetime import datetime
memory = VaultMemory()
# Track vault state
memory.add_vault("0x123...", {
"created_at": datetime.now().timestamp(),
"owner": "0xabc...",
})
# Track operations
memory.add_operation("0x123...", {
"type": "SWAP",
"token_in": "WETH",
"token_out": "USDC",
"amount_in": "1.0",
"timestamp": datetime.now().timestamp(),
"tx_hash": "0x...",
})
# Query history
history = memory.get_operations("0x123...", limit=10)
import { VaultMemory } from '@zeroquant/langchain';
const memory = new VaultMemory();
// Track vault state
memory.addVault('0x123...', {
createdAt: Date.now(),
owner: '0xabc...',
});
// Track operations
memory.addOperation('0x123...', {
type: 'SWAP',
tokenIn: 'WETH',
tokenOut: 'USDC',
amountIn: '1.0',
timestamp: Date.now(),
txHash: '0x...',
});
// Query history
const history = memory.getOperations('0x123...', { limit: 10 });
Building a Stateful Trading Agent
Step 1: Define State Schema
- Python
- TypeScript
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class TradeAction(Enum):
BUY = "BUY"
SELL = "SELL"
@dataclass
class Trade:
timestamp: float
action: TradeAction
token: str
amount: str
price: float
pnl: Optional[float] = None
@dataclass
class Position:
amount: int
entry_price: float
current_price: float
unrealized_pnl: float
@dataclass
class AgentState:
# Vault info
vault_address: str
vault_balance: int = 0
# Trading history
trades: list[Trade] = field(default_factory=list)
# Performance metrics
total_trades: int = 0
win_rate: float = 0.0
total_pnl: float = 0.0
# Current positions
positions: dict[str, Position] = field(default_factory=dict)
# Learning
successful_patterns: list[str] = field(default_factory=list)
failed_patterns: list[str] = field(default_factory=list)
interface AgentState {
// Vault info
vaultAddress: string;
vaultBalance: bigint;
// Trading history
trades: Array<{
timestamp: number;
action: 'BUY' | 'SELL';
token: string;
amount: string;
price: number;
pnl?: number;
}>;
// Performance metrics
totalTrades: number;
winRate: number;
totalPnL: number;
// Current positions
positions: Map<string, {
amount: bigint;
entryPrice: number;
currentPrice: number;
unrealizedPnL: number;
}>;
// Learning
successfulPatterns: string[];
failedPatterns: string[];
}
Step 2: Implement State Manager
- Python
- TypeScript
import os
import json
import redis.asyncio as redis
class AgentStateManager:
def __init__(self, vault_address: str):
self.redis = redis.from_url(os.getenv("REDIS_URL"))
self.state = self._initialize_state(vault_address)
def _initialize_state(self, vault_address: str) -> AgentState:
return AgentState(vault_address=vault_address)
async def load(self) -> None:
data = await self.redis.get(f"agent:{self.state.vault_address}")
if data:
parsed = json.loads(data)
self.state = AgentState(
vault_address=parsed["vault_address"],
vault_balance=int(parsed["vault_balance"]),
trades=[Trade(**t) for t in parsed["trades"]],
total_trades=parsed["total_trades"],
win_rate=parsed["win_rate"],
total_pnl=parsed["total_pnl"],
positions={k: Position(**v) for k, v in parsed["positions"].items()},
successful_patterns=parsed["successful_patterns"],
failed_patterns=parsed["failed_patterns"],
)
async def save(self) -> None:
serialized = {
"vault_address": self.state.vault_address,
"vault_balance": str(self.state.vault_balance),
"trades": [
{
"timestamp": t.timestamp,
"action": t.action.value,
"token": t.token,
"amount": t.amount,
"price": t.price,
"pnl": t.pnl,
}
for t in self.state.trades
],
"total_trades": self.state.total_trades,
"win_rate": self.state.win_rate,
"total_pnl": self.state.total_pnl,
"positions": {
k: {
"amount": v.amount,
"entry_price": v.entry_price,
"current_price": v.current_price,
"unrealized_pnl": v.unrealized_pnl,
}
for k, v in self.state.positions.items()
},
"successful_patterns": self.state.successful_patterns,
"failed_patterns": self.state.failed_patterns,
}
await self.redis.set(
f"agent:{self.state.vault_address}",
json.dumps(serialized),
)
def record_trade(self, trade: Trade) -> None:
self.state.trades.append(trade)
self.state.total_trades += 1
# Update win rate
wins = sum(1 for t in self.state.trades if (t.pnl or 0) > 0)
self.state.win_rate = (wins / self.state.total_trades) * 100
# Update total PnL
if trade.pnl:
self.state.total_pnl += trade.pnl
def get_performance_summary(self) -> str:
recent_trades = ", ".join(
f"{t.action.value} {t.token} @ ${t.price}"
for t in self.state.trades[-5:]
)
return f"""Performance Summary:
- Total Trades: {self.state.total_trades}
- Win Rate: {self.state.win_rate:.1f}%
- Total PnL: ${self.state.total_pnl:.2f}
- Recent Trades: {recent_trades}"""
def should_avoid_pattern(self, pattern: str) -> bool:
return pattern in self.state.failed_patterns
def learn_from_trade(self, pattern: str, success: bool) -> None:
if success:
if pattern not in self.state.successful_patterns:
self.state.successful_patterns.append(pattern)
# Remove from failed if it was there
self.state.failed_patterns = [
p for p in self.state.failed_patterns if p != pattern
]
else:
if pattern not in self.state.failed_patterns:
self.state.failed_patterns.append(pattern)
import { Redis } from 'ioredis';
class AgentStateManager {
private redis: Redis;
private state: AgentState;
constructor(vaultAddress: string) {
this.redis = new Redis(process.env.REDIS_URL);
this.state = this.initializeState(vaultAddress);
}
private initializeState(vaultAddress: string): AgentState {
return {
vaultAddress,
vaultBalance: 0n,
trades: [],
totalTrades: 0,
winRate: 0,
totalPnL: 0,
positions: new Map(),
successfulPatterns: [],
failedPatterns: [],
};
}
async load(): Promise<void> {
const data = await this.redis.get(`agent:${this.state.vaultAddress}`);
if (data) {
const parsed = JSON.parse(data);
this.state = {
...parsed,
vaultBalance: BigInt(parsed.vaultBalance),
positions: new Map(Object.entries(parsed.positions)),
};
}
}
async save(): Promise<void> {
const serialized = {
...this.state,
vaultBalance: this.state.vaultBalance.toString(),
positions: Object.fromEntries(this.state.positions),
};
await this.redis.set(
`agent:${this.state.vaultAddress}`,
JSON.stringify(serialized)
);
}
recordTrade(trade: AgentState['trades'][0]): void {
this.state.trades.push(trade);
this.state.totalTrades++;
// Update win rate
const wins = this.state.trades.filter(t => (t.pnl || 0) > 0).length;
this.state.winRate = (wins / this.state.totalTrades) * 100;
// Update total PnL
if (trade.pnl) {
this.state.totalPnL += trade.pnl;
}
}
getPerformanceSummary(): string {
return `
Performance Summary:
- Total Trades: ${this.state.totalTrades}
- Win Rate: ${this.state.winRate.toFixed(1)}%
- Total PnL: $${this.state.totalPnL.toFixed(2)}
- Recent Trades: ${this.state.trades.slice(-5).map(t =>
`${t.action} ${t.token} @ $${t.price}`
).join(', ')}
`.trim();
}
shouldAvoidPattern(pattern: string): boolean {
return this.state.failedPatterns.includes(pattern);
}
learnFromTrade(pattern: string, success: boolean): void {
if (success) {
if (!this.state.successfulPatterns.includes(pattern)) {
this.state.successfulPatterns.push(pattern);
}
// Remove from failed if it was there
this.state.failedPatterns = this.state.failedPatterns.filter(p => p !== pattern);
} else {
if (!this.state.failedPatterns.includes(pattern)) {
this.state.failedPatterns.push(pattern);
}
}
}
}
Step 3: Stateful Decision Engine
- Python
- TypeScript
import json
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
class StatefulDecisionEngine:
def __init__(self, state_manager: AgentStateManager):
self.state_manager = state_manager
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
async def analyze(self, market_data: dict) -> dict:
# Include historical context in the prompt
prompt = ChatPromptTemplate.from_template("""
You are a trading agent with the following history:
{performance_summary}
Patterns to AVOID (previously failed):
{failed_patterns}
Patterns that worked well:
{successful_patterns}
Current Market Data:
{market_data}
Based on your past performance and current market conditions, provide a recommendation.
Consider:
1. Your historical win rate on similar setups
2. Patterns that have failed before
3. Current market conditions
Respond in JSON format:
{{
"action": "BUY" | "SELL" | "HOLD",
"confidence": 0-100,
"reasoning": "explanation including historical context",
"pattern": "description of this setup for learning"
}}
""")
response = await self.llm.ainvoke(
prompt.format(
performance_summary=self.state_manager.get_performance_summary(),
failed_patterns=", ".join(self.state_manager.state.failed_patterns) or "None",
successful_patterns=", ".join(self.state_manager.state.successful_patterns) or "None",
market_data=json.dumps(market_data),
)
)
signal = json.loads(response.content)
# Check if we should avoid this pattern
if self.state_manager.should_avoid_pattern(signal.get("pattern", "")):
signal["action"] = "HOLD"
signal["reasoning"] = f'Avoiding pattern "{signal["pattern"]}" due to previous failures'
signal["confidence"] = 0
return signal
async def record_outcome(self, signal: dict, success: bool) -> None:
self.state_manager.learn_from_trade(signal.get("pattern", "unknown"), success)
await self.state_manager.save()
class StatefulDecisionEngine {
private stateManager: AgentStateManager;
private llm: ChatOpenAI;
constructor(stateManager: AgentStateManager) {
this.stateManager = stateManager;
this.llm = new ChatOpenAI({ modelName: 'gpt-4', temperature: 0 });
}
async analyze(marketData: MarketData): Promise<TradingSignal> {
// Include historical context in the prompt
const prompt = ChatPromptTemplate.fromTemplate(`
You are a trading agent with the following history:
{performance_summary}
Patterns to AVOID (previously failed):
{failed_patterns}
Patterns that worked well:
{successful_patterns}
Current Market Data:
{market_data}
Based on your past performance and current market conditions, provide a recommendation.
Consider:
1. Your historical win rate on similar setups
2. Patterns that have failed before
3. Current market conditions
Respond in JSON format:
{{
"action": "BUY" | "SELL" | "HOLD",
"confidence": 0-100,
"reasoning": "explanation including historical context",
"pattern": "description of this setup for learning"
}}
`);
const response = await this.llm.invoke(
await prompt.format({
performance_summary: this.stateManager.getPerformanceSummary(),
failed_patterns: this.stateManager.state.failedPatterns.join(', ') || 'None',
successful_patterns: this.stateManager.state.successfulPatterns.join(', ') || 'None',
market_data: JSON.stringify(marketData),
})
);
const signal = JSON.parse(response.content as string);
// Check if we should avoid this pattern
if (this.stateManager.shouldAvoidPattern(signal.pattern)) {
signal.action = 'HOLD';
signal.reasoning = `Avoiding pattern "${signal.pattern}" due to previous failures`;
signal.confidence = 0;
}
return signal;
}
async recordOutcome(signal: TradingSignal, success: boolean): Promise<void> {
this.stateManager.learnFromTrade(signal.pattern || 'unknown', success);
await this.stateManager.save();
}
}
Step 4: Complete Stateful Agent
- Python
- TypeScript
import os
import asyncio
from web3 import Web3
from zeroquant import ZeroQuantClient
WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
class StatefulTradingAgent:
def __init__(self, client: ZeroQuantClient, vault_address: str):
self.client = client
self.state_manager = AgentStateManager(vault_address)
self.decision_engine = StatefulDecisionEngine(self.state_manager)
async def initialize(self) -> None:
await self.state_manager.load()
print("Agent state loaded")
print(self.state_manager.get_performance_summary())
async def execute_trading_cycle(self, market_data: dict) -> None:
# Get decision with historical context
signal = await self.decision_engine.analyze(market_data)
print(f"Decision: {signal['action']} ({signal['confidence']}% confidence)")
print(f"Reasoning: {signal['reasoning']}")
if signal["action"] == "HOLD" or signal["confidence"] < 70:
return
# Execute trade
entry_price = market_data["price"]
success = False
try:
tx = await self._execute_trade(signal, market_data)
await tx.wait()
# Wait for outcome (simplified - in reality you'd monitor the position)
exit_price = await self._wait_for_exit(signal, entry_price)
pnl = self._calculate_pnl(signal["action"], entry_price, exit_price)
success = pnl > 0
# Record the trade
self.state_manager.record_trade(Trade(
timestamp=datetime.now().timestamp(),
action=TradeAction(signal["action"]),
token=market_data["symbol"],
amount="1.0", # Simplified
price=entry_price,
pnl=pnl,
))
except Exception as e:
print(f"Trade failed: {e}")
success = False
# Learn from the outcome
await self.decision_engine.record_outcome(signal, success)
print(f"Trade {'succeeded' if success else 'failed'}")
print(self.state_manager.get_performance_summary())
async def _execute_trade(self, signal: dict, market: dict):
path = (
[WETH_ADDRESS, market["token_address"]]
if signal["action"] == "BUY"
else [market["token_address"], WETH_ADDRESS]
)
return await self.client.execute_swap(
amount_in=Web3.to_wei(0.1, "ether"),
path=path,
slippage_bps=100,
)
def _calculate_pnl(self, action: str, entry: float, exit: float) -> float:
if action == "BUY":
return ((exit - entry) / entry) * 100
else:
return ((entry - exit) / entry) * 100
# Usage
async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))
client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
agent = StatefulTradingAgent(client, vault_address="0x123...")
await agent.initialize()
# Run trading cycles
while True:
market_data = await fetch_market_data() # Your market data source
await agent.execute_trading_cycle(market_data)
await asyncio.sleep(60) # Check every minute
if __name__ == "__main__":
asyncio.run(main())
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
class StatefulTradingAgent {
private client: ZeroQuantClient;
private stateManager: AgentStateManager;
private decisionEngine: StatefulDecisionEngine;
constructor(client: ZeroQuantClient, vaultAddress: string) {
this.client = client;
this.stateManager = new AgentStateManager(vaultAddress);
this.decisionEngine = new StatefulDecisionEngine(this.stateManager);
}
async initialize(): Promise<void> {
await this.stateManager.load();
console.log('Agent state loaded');
console.log(this.stateManager.getPerformanceSummary());
}
async executeTradingCycle(marketData: MarketData): Promise<void> {
// Get decision with historical context
const signal = await this.decisionEngine.analyze(marketData);
console.log(`Decision: ${signal.action} (${signal.confidence}% confidence)`);
console.log(`Reasoning: ${signal.reasoning}`);
if (signal.action === 'HOLD' || signal.confidence < 70) {
return;
}
// Execute trade
const entryPrice = marketData.price;
let success = false;
try {
const tx = await this.executeTrade(signal, marketData);
await tx.wait();
// Wait for outcome (simplified - in reality you'd monitor the position)
const exitPrice = await this.waitForExit(signal, entryPrice);
const pnl = this.calculatePnL(signal.action, entryPrice, exitPrice);
success = pnl > 0;
// Record the trade
this.stateManager.recordTrade({
timestamp: Date.now(),
action: signal.action,
token: marketData.symbol,
amount: '1.0', // Simplified
price: entryPrice,
pnl,
});
} catch (error) {
console.error('Trade failed:', error);
success = false;
}
// Learn from the outcome
await this.decisionEngine.recordOutcome(signal, success);
console.log(`Trade ${success ? 'succeeded' : 'failed'}`);
console.log(this.stateManager.getPerformanceSummary());
}
private async executeTrade(signal: TradingSignal, market: MarketData) {
return this.client.executeSwap({
amountIn: ethers.parseEther('0.1'),
path: signal.action === 'BUY'
? [WETH_ADDRESS, market.tokenAddress]
: [market.tokenAddress, WETH_ADDRESS],
slippageBps: 100,
});
}
private calculatePnL(action: string, entry: number, exit: number): number {
if (action === 'BUY') {
return ((exit - entry) / entry) * 100;
} else {
return ((entry - exit) / entry) * 100;
}
}
}
Memory Patterns Comparison
| Pattern | Use Case | Pros | Cons |
|---|---|---|---|
| Buffer Memory | Short conversations | Simple, fast | Limited history |
| Summary Memory | Long sessions | Compressed context | Loses details |
| Vector Store | Knowledge retrieval | Semantic search | Complexity |
| Redis/DB | Persistent state | Survives restarts | External dependency |
Best Practices
1. Memory Window Management
Don't let memory grow unbounded:
- Python
- TypeScript
class BoundedMemory:
def __init__(self, max_items: int = 100):
self.max_items = max_items
self.items: list = []
def add(self, item: any) -> None:
self.items.append(item)
if len(self.items) > self.max_items:
# Summarize old items before removing
self._summarize_oldest(10)
self.items = self.items[10:]
def _summarize_oldest(self, count: int) -> None:
# Implementation for summarizing old items
pass
class BoundedMemory {
private maxItems = 100;
private items: any[] = [];
add(item: any): void {
this.items.push(item);
if (this.items.length > this.maxItems) {
// Summarize old items before removing
this.summarizeOldest(10);
this.items = this.items.slice(10);
}
}
}
2. Periodic State Snapshots
Save state regularly:
- Python
- TypeScript
import asyncio
async def periodic_save(state_manager: AgentStateManager):
while True:
await state_manager.save()
print("State checkpoint saved")
await asyncio.sleep(60) # Every minute
# Run alongside your main loop
asyncio.create_task(periodic_save(state_manager))
setInterval(async () => {
await stateManager.save();
console.log('State checkpoint saved');
}, 60000); // Every minute
3. Memory Relevance Scoring
Prioritize relevant memories:
- Python
- TypeScript
def get_relevant_memories(query: str, memories: list[dict]) -> list[dict]:
scored = [
{**m, "relevance": calculate_similarity(query, m["content"])}
for m in memories
]
scored.sort(key=lambda x: x["relevance"], reverse=True)
return scored[:5]
function getRelevantMemories(query: string, memories: Memory[]): Memory[] {
return memories
.map(m => ({
...m,
relevance: calculateSimilarity(query, m.content),
}))
.sort((a, b) => b.relevance - a.relevance)
.slice(0, 5);
}
What's Next?
- AI Trading System - Production system with all components
- Multi-Agent Systems - Coordinate multiple stateful agents
- Local LLM Setup - Private inference for state processing