Introduction
AI agents that autonomously execute multi-step tasks are transitioning from experimental prototypes to production systems. Unlike traditional API calls, agents make decisions, use tools, and interact with external systems--introducing new challenges around reliability, cost, safety, and observability. This article covers the patterns and practices needed to deploy AI agents safely and efficiently.
Agent Orchestration
Production agents typically follow a structured execution loop:
import asyncio
from typing import List, Optional
from dataclasses import dataclass, field
@dataclass
class AgentContext:
task: str
max_steps: int = 20
current_step: int = 0
history: List[dict] = field(default_factory=list)
tools_used: List[str] = field(default_factory=list)
total_cost: float = 0.0
class AgentOrchestrator:
def __init__(self, model: str = "claude-sonnet-4-20260512"):
self.model = model
self.max_retries = 3
self.cost_per_token = {"input": 0.000003, "output": 0.000015}
async def run(self, task: str) -> dict:
ctx = AgentContext(task=task)
while ctx.current_step < ctx.max_steps:
ctx.current_step += 1
try:
# 1. Think: decide next action
action = await self.think(ctx)
# 2. Act: execute tool or respond
if action["type"] == "tool_call":
result = await self.execute_tool(action["tool"], action["args"])
ctx.tools_used.append(action["tool"]["name"])
elif action["type"] == "final_answer":
return {
"status": "success",
"answer": action["content"],
"steps": ctx.current_step,
"tools_used": ctx.tools_used,
"total_cost": ctx.total_cost,
}
# 3. Observe: store result
ctx.history.append({
"step": ctx.current_step,
"action": action,
"observation": result,
})
# 4. Cost tracking
ctx.total_cost += self._calculate_cost(action)
except ToolError as e:
# Handle tool failures with retry
await self.handle_tool_error(ctx, e)
except Exception as e:
# Catch-all for unexpected errors
await self.handle_unexpected_error(ctx, e)
return {"status": "max_steps_exceeded", "steps": ctx.max_steps}
Error Handling and Retry Logic
Agents must gracefully handle failures across multiple dimensions:
class AgentErrorHandler:
def __init__(self):
self.retry_policies = {
"rate_limit": RetryPolicy(max_retries=5, backoff="exponential"),
"timeout": RetryPolicy(max_retries=3, backoff="linear"),
"auth_error": RetryPolicy(max_retries=1, backoff="none"),
"tool_crash": RetryPolicy(max_retries=2, backoff="constant"),
}
async def execute_with_retry(self, tool_call: dict) -> dict:
policy = self._get_policy(tool_call["tool"]["name"])
for attempt in range(policy.max_retries):
try:
return await self._execute_tool(tool_call)
except RateLimitError as e:
wait = self._calculate_backoff(attempt, policy.backoff, e.reset_at)
await self._log_retry(tool_call, attempt, wait)
await asyncio.sleep(wait)
except TimeoutError:
if attempt == policy.max_retries - 1:
return self._graceful_degradation(tool_call)
await asyncio.sleep(policy.backoff_delay * (attempt + 1))
except AuthError:
await self._refresh_credentials(tool_call["tool"]["name"])
continue
return {"error": "max_retries_exceeded", "tool": tool_call["tool"]["name"]}
def _graceful_degradation(self, tool_call: dict) -> dict:
"""Return a safe default when a tool is unavailable."""
return {
"status": "unavailable",
"message": f"{tool_call['tool']['name']} is temporarily unavailable",
"suggestion": "Try again later or use an alternative approach",
}
Human-in-the-Loop
Critical agent actions require human approval before execution:
class HumanInTheLoop:
def __init__(self, approval_thresholds: dict):
self.thresholds = approval_thresholds
self.pending_approvals = {}
async def request_approval(
self, action: dict, context: AgentContext
) -> bool:
# Determine if approval is needed
if not self._requires_approval(action):
return True
approval_id = str(uuid.uuid4())
self.pending_approvals[approval_id] = {
"action": action,
"context": context,
"status": "pending",
"created_at": datetime.utcnow(),
}
# Notify human reviewer
await self._notify_reviewer(
approval_id=approval_id,
action_description=action["description"],
risk_level=action.get("risk", "low"),
current_state=context.history[-3:],
)
# Wait for approval (with timeout)
try:
approved = await self._wait_for_approval(approval_id, timeout=300)
self.pending_approvals[approval_id]["status"] = (
"approved" if approved else "rejected"
)
return approved
except TimeoutError:
self.pending_approvals[approval_id]["status"] = "timed_out"
return False
def _requires_approval(self, action: dict) -> bool:
return any([
action.get("risk") in self.thresholds.get("high_risk_actions", []),
action["tool"].get("name") in self.thresholds.get("protected_tools", []),
action.get("amount", 0) > self.thresholds.get("max_amount", 1000),
action.get("destructive", False),
])
Monitoring Agent Behavior
Track agent decisions and outcomes with structured telemetry:
class AgentTelemetry:
def __init__(self):
self.metrics = MetricsClient()
self.tracer = Tracer()
def record_step(self, ctx: AgentContext, action: dict, observation: dict):
span = self.tracer.start_span("agent_step")
span.set_attributes({
"agent.step": ctx.current_step,
"agent.task_hash": hash(ctx.task),
"action.type": action["type"],
"action.tool": action.get("tool", {}).get("name", "none"),
"action.duration_ms": action.get("duration_ms", 0),
"observation.status": observation.get("status", "unknown"),
})
self.metrics.histogram(
"agent.step.duration",
value=action.get("duration_ms", 0),
tags={
"tool": action.get("tool", {}).get("name", "none"),
"status": observation.get("status", "unknown"),
},
)
span.end()
def detect_anomalies(self, ctx: AgentContext) -> List[str]:
warnings = []
# Looping detection
recent_actions = [h["action"]["tool"]["name"]
for h in ctx.history[-10:]]
if len(set(recent_actions)) < 3 and len(recent_actions) >= 5:
warnings.append("POSSIBLE_LOOP")
# Cost anomaly
if ctx.total_cost > 0.50:
warnings.append("HIGH_COST")
# Token usage
total_tokens = sum(
h["action"].get("tokens", 0) for h in ctx.history
)
if total_tokens > 50000:
warnings.append("HIGH_TOKEN_USAGE")
return warnings
Cost Tracking and Rate Limiting
class AgentCostManager:
def __init__(self, daily_budget: float = 10.0):
self.daily_budget = daily_budget
self.daily_spend = 0.0
self.token_buckets = {}
async def check_budget(self, estimated_cost: float) -> bool:
# Reset daily counter
if self._is_new_day():
self.daily_spend = 0.0
if self.daily_spend + estimated_cost > self.daily_budget:
return False # Budget exceeded
self.daily_spend += estimated_cost
return True
async def rate_limit_check(self, tool: str) -> bool:
bucket = self.token_buckets.get(tool, TokenBucket(
capacity=10,
refill_rate=1,
refill_interval=60
))
return bucket.consume()
Observability
Log agent decision traces for debugging and audit:
{
"timestamp": "2026-05-12T10:30:00Z",
"agent_id": "agent-payment-v3",
"session_id": "sess_abc123",
"step": 4,
"action": {
"type": "tool_call",
"tool": "stripe_charge",
"args": {"amount": 49.99, "currency": "usd"},
"reasoning": "Customer requested payment for order ord-789"
},
"observation": {
"status": "success",
"charge_id": "ch_xyz456",
"duration_ms": 234
},
"cost": {
"input_tokens": 1245,
"output_tokens": 89,
"estimated_cost": 0.005
},
"warnings": []
}
Deploy agents incrementally: start with read-only tools, add human-in-the-loop for destructive actions, and only move to fully autonomous mode after extensive monitoring and failure mode analysis.