Skip to main content

Portfolio Rebalancing Agent

Build an agent that monitors your portfolio allocation and automatically rebalances to maintain target weights.

Overview

The Portfolio Agent:

  • Monitors token balances across your vault
  • Compares current allocation to targets
  • Executes trades to rebalance when drift exceeds threshold
  • Respects gas costs and slippage limits

Installation

pip install zeroquant langchain langchain-openai web3 aiohttp python-dotenv

Architecture

┌──────────────────────────────────────────────┐
│ Portfolio Agent │
├─────────────────┬────────────────────────────┤
│ Monitor │ Rebalancer │
│ ───────── │ ────────── │
│ • Token prices │ • Calculate trades │
│ • Balances │ • Optimize for gas │
│ • Drift calc │ • Execute swaps │
└─────────────────┴────────────────────────────┘

Target Allocation Setup

Define your ideal portfolio:

# Target allocation - must sum to 100
targets: dict[str, float] = {
"WETH": 40, # 40% in ETH
"USDC": 30, # 30% in USDC
"WBTC": 20, # 20% in wrapped BTC
"DAI": 10, # 10% in DAI
}

Complete Implementation

# src/portfolio_agent.py
import asyncio
import aiohttp
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from web3 import Web3

from zeroquant import ZeroQuantClient


@dataclass
class TokenConfig:
symbol: str
address: str
decimals: int
coingecko_id: str


@dataclass
class PortfolioState:
balances: dict[str, int]
prices: dict[str, float]
total_value_usd: float
allocations: dict[str, float]
drift: dict[str, float]


@dataclass
class RebalanceTrade:
token_in: str
token_out: str
amount_in: int
reason: str


@dataclass
class PortfolioAgentConfig:
client: ZeroQuantClient
vault_address: str
tokens: list[TokenConfig]
target_allocation: dict[str, float]
rebalance_threshold_pct: float # Minimum drift to trigger rebalance
min_trade_value_usd: float # Minimum trade size
max_slippage_bps: int
check_interval_ms: int


class PortfolioAgent:
def __init__(self, config: PortfolioAgentConfig):
self.config = config
self.client = config.client
self.is_running = False

# Validate allocations sum to 100
total = sum(config.target_allocation.values())
if abs(total - 100) > 0.01:
raise ValueError(f"Target allocations must sum to 100, got {total}")

async def start(self) -> None:
print("Portfolio Agent started")
print(f"Target allocation: {self.config.target_allocation}")
print(f"Rebalance threshold: {self.config.rebalance_threshold_pct}%")

self.is_running = True

while self.is_running:
await self._run_rebalance_cycle()
await asyncio.sleep(self.config.check_interval_ms / 1000)

def stop(self) -> None:
self.is_running = False
print("Portfolio Agent stopped")

async def _run_rebalance_cycle(self) -> None:
print("\n--- Rebalance Check ---")
print(f"Time: {datetime.now().isoformat()}")

try:
# Get current portfolio state
state = await self._get_portfolio_state()
self._log_portfolio_state(state)

# Check if rebalance needed
trades = self._calculate_rebalance_trades(state)

if not trades:
print("Portfolio is balanced. No trades needed.")
return

print(f"\nRebalance trades needed: {len(trades)}")
for trade in trades:
print(f" {trade.reason}")

# Execute trades
await self._execute_trades(trades)

except Exception as e:
print(f"Rebalance cycle failed: {e}")

async def _get_portfolio_state(self) -> PortfolioState:
balances: dict[str, int] = {}
prices: dict[str, float] = {}

# Fetch balances and prices in parallel
async def fetch_token_data(token: TokenConfig):
balance = await self._get_token_balance(token)
price = await self._get_token_price(token.coingecko_id)
return token.symbol, balance, price

results = await asyncio.gather(*[
fetch_token_data(token) for token in self.config.tokens
])

for symbol, balance, price in results:
balances[symbol] = balance
prices[symbol] = price

# Calculate total value
total_value_usd = 0.0
for token in self.config.tokens:
balance = balances.get(token.symbol, 0)
price = prices.get(token.symbol, 0)
value = (balance / (10 ** token.decimals)) * price
total_value_usd += value

# Calculate current allocations and drift
allocations: dict[str, float] = {}
drift: dict[str, float] = {}

for token in self.config.tokens:
balance = balances.get(token.symbol, 0)
price = prices.get(token.symbol, 0)
value = (balance / (10 ** token.decimals)) * price

current_pct = (value / total_value_usd * 100) if total_value_usd > 0 else 0
target_pct = self.config.target_allocation.get(token.symbol, 0)

allocations[token.symbol] = current_pct
drift[token.symbol] = current_pct - target_pct

return PortfolioState(
balances=balances,
prices=prices,
total_value_usd=total_value_usd,
allocations=allocations,
drift=drift,
)

async def _get_token_balance(self, token: TokenConfig) -> int:
if token.symbol in ("WETH", "ETH"):
return await self.client.get_balance()

# For ERC20 tokens
# Simplified - use actual contract calls in production
return 0

async def _get_token_price(self, coingecko_id: str) -> float:
try:
async with aiohttp.ClientSession() as session:
url = f"https://api.coingecko.com/api/v3/simple/price"
params = {"ids": coingecko_id, "vs_currencies": "usd"}
async with session.get(url, params=params) as response:
data = await response.json()
return data.get(coingecko_id, {}).get("usd", 0)
except Exception:
print(f"Failed to fetch price for {coingecko_id}")
return 0

def _calculate_rebalance_trades(self, state: PortfolioState) -> list[RebalanceTrade]:
trades: list[RebalanceTrade] = []
threshold = self.config.rebalance_threshold_pct

# Find tokens that need selling (over-allocated)
sell_tokens: list[dict] = []
buy_tokens: list[dict] = []

for symbol, drift_value in state.drift.items():
if drift_value > threshold:
value_to_sell = (drift_value / 100) * state.total_value_usd
sell_tokens.append({"symbol": symbol, "drift": drift_value, "value": value_to_sell})
elif drift_value < -threshold:
value_to_buy = (abs(drift_value) / 100) * state.total_value_usd
buy_tokens.append({"symbol": symbol, "drift": drift_value, "value": value_to_buy})

# Sort by absolute drift (largest first)
sell_tokens.sort(key=lambda x: x["drift"], reverse=True)
buy_tokens.sort(key=lambda x: x["drift"])

# Match sells to buys
for sell in sell_tokens:
for buy in buy_tokens:
if buy["value"] <= 0:
continue

trade_value = min(sell["value"], buy["value"])

if trade_value < self.config.min_trade_value_usd:
continue

sell_token = next(t for t in self.config.tokens if t.symbol == sell["symbol"])
buy_token = next(t for t in self.config.tokens if t.symbol == buy["symbol"])

sell_price = state.prices.get(sell["symbol"], 0)
if sell_price > 0:
sell_amount = int((trade_value / sell_price) * (10 ** sell_token.decimals))

trades.append(RebalanceTrade(
token_in=sell_token.address,
token_out=buy_token.address,
amount_in=sell_amount,
reason=f"Sell {sell['symbol']} (+{sell['drift']:.1f}%) -> Buy {buy['symbol']} ({buy['drift']:.1f}%)",
))

sell["value"] -= trade_value
buy["value"] -= trade_value

return trades

async def _execute_trades(self, trades: list[RebalanceTrade]) -> None:
for trade in trades:
print(f"\nExecuting: {trade.reason}")

try:
tx = await self.client.execute_swap(
amount_in=trade.amount_in,
path=[trade.token_in, trade.token_out],
slippage_bps=self.config.max_slippage_bps,
)

print(f"Transaction: {tx.hash}")
await tx.wait()
print("Trade completed")
except Exception as e:
print(f"Trade failed: {e}")

def _log_portfolio_state(self, state: PortfolioState) -> None:
print(f"\nPortfolio Value: ${state.total_value_usd:.2f}")
print("\nCurrent Allocation:")

for token in self.config.tokens:
current = state.allocations.get(token.symbol, 0)
target = self.config.target_allocation.get(token.symbol, 0)
drift_value = state.drift.get(token.symbol, 0)
drift_str = f"+{drift_value:.1f}" if drift_value >= 0 else f"{drift_value:.1f}"

print(f" {token.symbol.ljust(6)} {current:5.1f}% / {target}% ({drift_str}%)")

Usage

import os
import asyncio
import signal
from dotenv import load_dotenv
from web3 import Web3

from zeroquant import ZeroQuantClient
from portfolio_agent import PortfolioAgent, PortfolioAgentConfig, TokenConfig

load_dotenv()


async def main():
w3 = Web3(Web3.HTTPProvider(os.getenv("RPC_URL")))

client = ZeroQuantClient(
web3=w3,
private_key=os.getenv("PRIVATE_KEY"),
factory_address=os.getenv("FACTORY_ADDRESS"),
permission_manager_address=os.getenv("PERMISSION_MANAGER_ADDRESS"),
)
await client.connect()
await client.connect_vault(os.getenv("VAULT_ADDRESS"))

agent = PortfolioAgent(PortfolioAgentConfig(
client=client,
vault_address=os.getenv("VAULT_ADDRESS"),
tokens=[
TokenConfig(
symbol="WETH",
address="0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
decimals=18,
coingecko_id="ethereum",
),
TokenConfig(
symbol="USDC",
address="0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
decimals=6,
coingecko_id="usd-coin",
),
TokenConfig(
symbol="WBTC",
address="0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599",
decimals=8,
coingecko_id="wrapped-bitcoin",
),
],
target_allocation={
"WETH": 50,
"USDC": 30,
"WBTC": 20,
},
rebalance_threshold_pct=5, # Rebalance when 5% off target
min_trade_value_usd=100, # Minimum $100 trade
max_slippage_bps=100, # 1% max slippage
check_interval_ms=3600000, # Check every hour
))

def shutdown_handler(sig, frame):
agent.stop()

signal.signal(signal.SIGINT, shutdown_handler)

await agent.start()


if __name__ == "__main__":
asyncio.run(main())

Sample Output

Portfolio Agent started
Target allocation: { WETH: 50, USDC: 30, WBTC: 20 }
Rebalance threshold: 5%

--- Rebalance Check ---
Time: 2024-01-15T10:00:00.000Z

Portfolio Value: $10,000.00

Current Allocation:
WETH 55.0% / 50% (+5.0%)
USDC 25.0% / 30% (-5.0%)
WBTC 20.0% / 20% (+0.0%)

Rebalance trades needed: 1
Sell WETH (+5.0%) -> Buy USDC (-5.0%)

Executing: Sell WETH (+5.0%) -> Buy USDC (-5.0%)
Transaction: 0x...
Trade completed

What's Next?