Portfolio Rebalancing Agent
Build an agent that monitors your portfolio allocation and automatically rebalances to maintain target weights.
Overview
The Portfolio Agent:
- Monitors token balances across your vault
- Compares current allocation to targets
- Executes trades to rebalance when drift exceeds threshold
- Respects gas costs and slippage limits
Installation
- Python
- TypeScript
pip install zeroquant langchain langchain-openai web3 aiohttp python-dotenv
npm install @zeroquant/sdk @zeroquant/mastra @langchain/openai ethers
Architecture
┌──────────────────────────────────────────────┐
│ Portfolio Agent │
├─────────────────┬────────────────────────────┤
│ Monitor │ Rebalancer │
│ ───────── │ ────────── │
│ • Token prices │ • Calculate trades │
│ • Balances │ • Optimize for gas │
│ • Drift calc │ • Execute swaps │
└─────────────────┴────────────────────────────┘
Target Allocation Setup
Define your ideal portfolio:
- Python
- TypeScript
# Target allocation - must sum to 100
targets: dict[str, float] = {
"WETH": 40, # 40% in ETH
"USDC": 30, # 30% in USDC
"WBTC": 20, # 20% in wrapped BTC
"DAI": 10, # 10% in DAI
}
interface TargetAllocation {
[token: string]: number; // Percentage (must sum to 100)
}
const targets: TargetAllocation = {
'WETH': 40, // 40% in ETH
'USDC': 30, // 30% in USDC
'WBTC': 20, // 20% in wrapped BTC
'DAI': 10, // 10% in DAI
};
Complete Implementation
- Python
- TypeScript
# src/portfolio_agent.py
import asyncio
import aiohttp
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from web3 import Web3
from zeroquant import ZeroQuantClient
@dataclass
class TokenConfig:
symbol: str
address: str
decimals: int
coingecko_id: str
@dataclass
class PortfolioState:
balances: dict[str, int]
prices: dict[str, float]
total_value_usd: float
allocations: dict[str, float]
drift: dict[str, float]
@dataclass
class RebalanceTrade:
token_in: str
token_out: str
amount_in: int
reason: str
@dataclass
class PortfolioAgentConfig:
client: ZeroQuantClient
vault_address: str
tokens: list[TokenConfig]
target_allocation: dict[str, float]
rebalance_threshold_pct: float # Minimum drift to trigger rebalance
min_trade_value_usd: float # Minimum trade size
max_slippage_bps: int
check_interval_ms: int
class PortfolioAgent:
def __init__(self, config: PortfolioAgentConfig):
self.config = config
self.client = config.client
self.is_running = False
# Validate allocations sum to 100
total = sum(config.target_allocation.values())
if abs(total - 100) > 0.01:
raise ValueError(f"Target allocations must sum to 100, got {total}")
async def start(self) -> None:
print("Portfolio Agent started")
print(f"Target allocation: {self.config.target_allocation}")
print(f"Rebalance threshold: {self.config.rebalance_threshold_pct}%")
self.is_running = True
while self.is_running:
await self._run_rebalance_cycle()
await asyncio.sleep(self.config.check_interval_ms / 1000)
def stop(self) -> None:
self.is_running = False
print("Portfolio Agent stopped")
async def _run_rebalance_cycle(self) -> None:
print("\n--- Rebalance Check ---")
print(f"Time: {datetime.now().isoformat()}")
try:
# Get current portfolio state
state = await self._get_portfolio_state()
self._log_portfolio_state(state)
# Check if rebalance needed
trades = self._calculate_rebalance_trades(state)
if not trades:
print("Portfolio is balanced. No trades needed.")
return
print(f"\nRebalance trades needed: {len(trades)}")
for trade in trades:
print(f" {trade.reason}")
# Execute trades
await self._execute_trades(trades)
except Exception as e:
print(f"Rebalance cycle failed: {e}")
async def _get_portfolio_state(self) -> PortfolioState:
balances: dict[str, int] = {}
prices: dict[str, float] = {}
# Fetch balances and prices in parallel
async def fetch_token_data(token: TokenConfig):
balance = await self._get_token_balance(token)
price = await self._get_token_price(token.coingecko_id)
return token.symbol, balance, price
results = await asyncio.gather(*[
fetch_token_data(token) for token in self.config.tokens
])
for symbol, balance, price in results:
balances[symbol] = balance
prices[symbol] = price
# Calculate total value
total_value_usd = 0.0
for token in self.config.tokens:
balance = balances.get(token.symbol, 0)
price = prices.get(token.symbol, 0)
value = (balance / (10 ** token.decimals)) * price
total_value_usd += value
# Calculate current allocations and drift
allocations: dict[str, float] = {}
drift: dict[str, float] = {}
for token in self.config.tokens:
balance = balances.get(token.symbol, 0)
price = prices.get(token.symbol, 0)
value = (balance / (10 ** token.decimals)) * price
current_pct = (value / total_value_usd * 100) if total_value_usd > 0 else 0
target_pct = self.config.target_allocation.get(token.symbol, 0)
allocations[token.symbol] = current_pct
drift[token.symbol] = current_pct - target_pct
return PortfolioState(
balances=balances,
prices=prices,
total_value_usd=total_value_usd,
allocations=allocations,
drift=drift,
)
async def _get_token_balance(self, token: TokenConfig) -> int:
if token.symbol in ("WETH", "ETH"):
return await self.client.get_balance()
# For ERC20 tokens
# Simplified - use actual contract calls in production
return 0
async def _get_token_price(self, coingecko_id: str) -> float:
try:
async with aiohttp.ClientSession() as session:
url = f"https://api.coingecko.com/api/v3/simple/price"
params = {"ids": coingecko_id, "vs_currencies": "usd"}
async with session.get(url, params=params) as response:
data = await response.json()
return data.get(coingecko_id, {}).get("usd", 0)
except Exception:
print(f"Failed to fetch price for {coingecko_id}")
return 0
def _calculate_rebalance_trades(self, state: PortfolioState) -> list[RebalanceTrade]:
trades: list[RebalanceTrade] = []
threshold = self.config.rebalance_threshold_pct
# Find tokens that need selling (over-allocated)
sell_tokens: list[dict] = []
buy_tokens: list[dict] = []
for symbol, drift_value in state.drift.items():
if drift_value > threshold:
value_to_sell = (drift_value / 100) * state.total_value_usd
sell_tokens.append({"symbol": symbol, "drift": drift_value, "value": value_to_sell})
elif drift_value < -threshold:
value_to_buy = (abs(drift_value) / 100) * state.total_value_usd
buy_tokens.append({"symbol": symbol, "drift": drift_value, "value": value_to_buy})
# Sort by absolute drift (largest first)
sell_tokens.sort(key=lambda x: x["drift"], reverse=True)
buy_tokens.sort(key=lambda x: x["drift"])
# Match sells to buys
for sell in sell_tokens:
for buy in buy_tokens:
if buy["value"] <= 0:
continue
trade_value = min(sell["value"], buy["value"])
if trade_value < self.config.min_trade_value_usd:
continue
sell_token = next(t for t in self.config.tokens if t.symbol == sell["symbol"])
buy_token = next(t for t in self.config.tokens if t.symbol == buy["symbol"])
sell_price = state.prices.get(sell["symbol"], 0)
if sell_price > 0:
sell_amount = int((trade_value / sell_price) * (10 ** sell_token.decimals))
trades.append(RebalanceTrade(
token_in=sell_token.address,
token_out=buy_token.address,
amount_in=sell_amount,
reason=f"Sell {sell['symbol']} (+{sell['drift']:.1f}%) -> Buy {buy['symbol']} ({buy['drift']:.1f}%)",
))
sell["value"] -= trade_value
buy["value"] -= trade_value
return trades
async def _execute_trades(self, trades: list[RebalanceTrade]) -> None:
for trade in trades:
print(f"\nExecuting: {trade.reason}")
try:
tx = await self.client.execute_swap(
amount_in=trade.amount_in,
path=[trade.token_in, trade.token_out],
slippage_bps=self.config.max_slippage_bps,
)
print(f"Transaction: {tx.hash}")
await tx.wait()
print("Trade completed")
except Exception as e:
print(f"Trade failed: {e}")
def _log_portfolio_state(self, state: PortfolioState) -> None:
print(f"\nPortfolio Value: ${state.total_value_usd:.2f}")
print("\nCurrent Allocation:")
for token in self.config.tokens:
current = state.allocations.get(token.symbol, 0)
target = self.config.target_allocation.get(token.symbol, 0)
drift_value = state.drift.get(token.symbol, 0)
drift_str = f"+{drift_value:.1f}" if drift_value >= 0 else f"{drift_value:.1f}"
print(f" {token.symbol.ljust(6)} {current:5.1f}% / {target}% ({drift_str}%)")
// src/portfolio-agent.ts
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
import axios from 'axios';
interface TargetAllocation {
[token: string]: number;
}
interface TokenConfig {
symbol: string;
address: string;
decimals: number;
coingeckoId: string;
}
interface PortfolioState {
balances: Map<string, bigint>;
prices: Map<string, number>;
totalValueUsd: number;
allocations: Map<string, number>;
drift: Map<string, number>;
}
interface RebalanceTrade {
tokenIn: string;
tokenOut: string;
amountIn: bigint;
reason: string;
}
interface PortfolioAgentConfig {
client: ZeroQuantClient;
vaultAddress: string;
tokens: TokenConfig[];
targetAllocation: TargetAllocation;
rebalanceThresholdPct: number;
minTradeValueUsd: number;
maxSlippageBps: number;
checkIntervalMs: number;
}
export class PortfolioAgent {
private config: PortfolioAgentConfig;
private client: ZeroQuantClient;
private isRunning = false;
constructor(config: PortfolioAgentConfig) {
this.config = config;
this.client = config.client;
const total = Object.values(config.targetAllocation).reduce((a, b) => a + b, 0);
if (Math.abs(total - 100) > 0.01) {
throw new Error(`Target allocations must sum to 100, got ${total}`);
}
}
async start(): Promise<void> {
console.log('Portfolio Agent started');
console.log('Target allocation:', this.config.targetAllocation);
console.log(`Rebalance threshold: ${this.config.rebalanceThresholdPct}%`);
this.isRunning = true;
while (this.isRunning) {
await this.runRebalanceCycle();
await this.sleep(this.config.checkIntervalMs);
}
}
stop(): void {
this.isRunning = false;
console.log('Portfolio Agent stopped');
}
private async runRebalanceCycle(): Promise<void> {
console.log('\n--- Rebalance Check ---');
console.log(`Time: ${new Date().toISOString()}`);
try {
const state = await this.getPortfolioState();
this.logPortfolioState(state);
const trades = this.calculateRebalanceTrades(state);
if (trades.length === 0) {
console.log('Portfolio is balanced. No trades needed.');
return;
}
console.log(`\nRebalance trades needed: ${trades.length}`);
for (const trade of trades) {
console.log(` ${trade.reason}`);
}
await this.executeTrades(trades);
} catch (error) {
console.error('Rebalance cycle failed:', error);
}
}
private async getPortfolioState(): Promise<PortfolioState> {
const balances = new Map<string, bigint>();
const prices = new Map<string, number>();
await Promise.all(
this.config.tokens.map(async (token) => {
const [balance, price] = await Promise.all([
this.getTokenBalance(token),
this.getTokenPrice(token.coingeckoId),
]);
balances.set(token.symbol, balance);
prices.set(token.symbol, price);
})
);
let totalValueUsd = 0;
for (const token of this.config.tokens) {
const balance = balances.get(token.symbol) || 0n;
const price = prices.get(token.symbol) || 0;
const value = Number(ethers.formatUnits(balance, token.decimals)) * price;
totalValueUsd += value;
}
const allocations = new Map<string, number>();
const drift = new Map<string, number>();
for (const token of this.config.tokens) {
const balance = balances.get(token.symbol) || 0n;
const price = prices.get(token.symbol) || 0;
const value = Number(ethers.formatUnits(balance, token.decimals)) * price;
const currentPct = totalValueUsd > 0 ? (value / totalValueUsd) * 100 : 0;
const targetPct = this.config.targetAllocation[token.symbol] || 0;
allocations.set(token.symbol, currentPct);
drift.set(token.symbol, currentPct - targetPct);
}
return { balances, prices, totalValueUsd, allocations, drift };
}
private async getTokenBalance(token: TokenConfig): Promise<bigint> {
if (token.symbol === 'WETH' || token.symbol === 'ETH') {
return await this.client.getBalance();
}
const erc20 = new ethers.Contract(
token.address,
['function balanceOf(address) view returns (uint256)'],
this.client.provider
);
return await erc20.balanceOf(this.config.vaultAddress);
}
private async getTokenPrice(coingeckoId: string): Promise<number> {
try {
const response = await axios.get(
`https://api.coingecko.com/api/v3/simple/price?ids=${coingeckoId}&vs_currencies=usd`
);
return response.data[coingeckoId]?.usd || 0;
} catch {
console.warn(`Failed to fetch price for ${coingeckoId}`);
return 0;
}
}
private calculateRebalanceTrades(state: PortfolioState): RebalanceTrade[] {
const trades: RebalanceTrade[] = [];
const threshold = this.config.rebalanceThresholdPct;
const sellTokens: Array<{ symbol: string; drift: number; valueToSell: number }> = [];
const buyTokens: Array<{ symbol: string; drift: number; valueToBuy: number }> = [];
for (const [symbol, drift] of state.drift) {
if (drift > threshold) {
const valueToSell = (drift / 100) * state.totalValueUsd;
sellTokens.push({ symbol, drift, valueToSell });
} else if (drift < -threshold) {
const valueToBuy = (Math.abs(drift) / 100) * state.totalValueUsd;
buyTokens.push({ symbol, drift, valueToBuy });
}
}
sellTokens.sort((a, b) => b.drift - a.drift);
buyTokens.sort((a, b) => a.drift - b.drift);
for (const sell of sellTokens) {
for (const buy of buyTokens) {
if (buy.valueToBuy <= 0) continue;
const tradeValue = Math.min(sell.valueToSell, buy.valueToBuy);
if (tradeValue < this.config.minTradeValueUsd) continue;
const sellToken = this.config.tokens.find(t => t.symbol === sell.symbol)!;
const buyToken = this.config.tokens.find(t => t.symbol === buy.symbol)!;
const sellPrice = state.prices.get(sell.symbol) || 0;
const sellAmount = ethers.parseUnits(
(tradeValue / sellPrice).toFixed(sellToken.decimals),
sellToken.decimals
);
trades.push({
tokenIn: sellToken.address,
tokenOut: buyToken.address,
amountIn: sellAmount,
reason: `Sell ${sell.symbol} (+${sell.drift.toFixed(1)}%) -> Buy ${buy.symbol} (${buy.drift.toFixed(1)}%)`,
});
sell.valueToSell -= tradeValue;
buy.valueToBuy -= tradeValue;
}
}
return trades;
}
private async executeTrades(trades: RebalanceTrade[]): Promise<void> {
for (const trade of trades) {
console.log(`\nExecuting: ${trade.reason}`);
try {
const tx = await this.client.executeSwap({
amountIn: trade.amountIn,
path: [trade.tokenIn, trade.tokenOut],
slippageBps: this.config.maxSlippageBps,
});
console.log(`Transaction: ${tx.hash}`);
await tx.wait();
console.log('Trade completed');
} catch (error) {
console.error('Trade failed:', error);
}
}
}
private logPortfolioState(state: PortfolioState): void {
console.log(`\nPortfolio Value: $${state.totalValueUsd.toFixed(2)}`);
console.log('\nCurrent Allocation:');
for (const token of this.config.tokens) {
const current = state.allocations.get(token.symbol) || 0;
const target = this.config.targetAllocation[token.symbol] || 0;
const drift = state.drift.get(token.symbol) || 0;
const driftStr = drift >= 0 ? `+${drift.toFixed(1)}` : drift.toFixed(1);
console.log(
` ${token.symbol.padEnd(6)} ${current.toFixed(1).padStart(5)}% / ${target}% (${driftStr}%)`
);
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Usage
- Python
- TypeScript
import os
import asyncio
import signal
from dotenv import load_dotenv
from web3 import Web3
from zeroquant import ZeroQuantClient
from portfolio_agent import PortfolioAgent, PortfolioAgentConfig, TokenConfig
load_dotenv()
async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))
client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
await client.connect_vault(os.getenv("VAULT_ADDRESS"))
agent = PortfolioAgent(PortfolioAgentConfig(
client=client,
vault_address=os.getenv("VAULT_ADDRESS"),
tokens=[
TokenConfig(
symbol="WETH",
address="0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
decimals=18,
coingecko_id="ethereum",
),
TokenConfig(
symbol="USDC",
address="0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
decimals=6,
coingecko_id="usd-coin",
),
TokenConfig(
symbol="WBTC",
address="0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599",
decimals=8,
coingecko_id="wrapped-bitcoin",
),
],
target_allocation={
"WETH": 50,
"USDC": 30,
"WBTC": 20,
},
rebalance_threshold_pct=5, # Rebalance when 5% off target
min_trade_value_usd=100, # Minimum $100 trade
max_slippage_bps=100, # 1% max slippage
check_interval_ms=3600000, # Check every hour
))
def shutdown_handler(sig, frame):
agent.stop()
signal.signal(signal.SIGINT, shutdown_handler)
await agent.start()
if __name__ == "__main__":
asyncio.run(main())
import { ethers } from 'ethers';
import { ZeroQuantClient } from '@zeroquant/sdk';
import { PortfolioAgent } from './portfolio-agent';
import 'dotenv/config';
async function main() {
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const signer = new ethers.Wallet(process.env.PRIVATE_KEY!, provider);
const client = new ZeroQuantClient(provider, {
factoryAddress: process.env.FACTORY_ADDRESS!,
permissionManagerAddress: process.env.PERMISSION_MANAGER_ADDRESS!,
});
await client.connect(signer);
await client.connectVault(process.env.VAULT_ADDRESS!);
const agent = new PortfolioAgent({
client,
vaultAddress: process.env.VAULT_ADDRESS!,
tokens: [
{
symbol: 'WETH',
address: '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',
decimals: 18,
coingeckoId: 'ethereum',
},
{
symbol: 'USDC',
address: '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48',
decimals: 6,
coingeckoId: 'usd-coin',
},
{
symbol: 'WBTC',
address: '0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599',
decimals: 8,
coingeckoId: 'wrapped-bitcoin',
},
],
targetAllocation: {
'WETH': 50,
'USDC': 30,
'WBTC': 20,
},
rebalanceThresholdPct: 5,
minTradeValueUsd: 100,
maxSlippageBps: 100,
checkIntervalMs: 3600000,
});
process.on('SIGINT', () => {
agent.stop();
process.exit(0);
});
await agent.start();
}
main().catch(console.error);
Sample Output
Portfolio Agent started
Target allocation: { WETH: 50, USDC: 30, WBTC: 20 }
Rebalance threshold: 5%
--- Rebalance Check ---
Time: 2024-01-15T10:00:00.000Z
Portfolio Value: $10,000.00
Current Allocation:
WETH 55.0% / 50% (+5.0%)
USDC 25.0% / 30% (-5.0%)
WBTC 20.0% / 20% (+0.0%)
Rebalance trades needed: 1
Sell WETH (+5.0%) -> Buy USDC (-5.0%)
Executing: Sell WETH (+5.0%) -> Buy USDC (-5.0%)
Transaction: 0x...
Trade completed
What's Next?
- Stateful Agents - Add memory for smarter decisions
- Multi-Agent Systems - Coordinate portfolio + trading agents
- Local LLM Setup - Private, cost-free AI decisions