Deploying AI Agents to Production


Introduction





AI agents that autonomously execute multi-step tasks are transitioning from experimental prototypes to production systems. Unlike traditional API calls, agents make decisions, use tools, and interact with external systems--introducing new challenges around reliability, cost, safety, and observability. This article covers the patterns and practices needed to deploy AI agents safely and efficiently.





Agent Orchestration





Production agents typically follow a structured execution loop:






import asyncio


from typing import List, Optional


from dataclasses import dataclass, field




@dataclass


class AgentContext:


task: str


max_steps: int = 20


current_step: int = 0


history: List[dict] = field(default_factory=list)


tools_used: List[str] = field(default_factory=list)


total_cost: float = 0.0




class AgentOrchestrator:


def __init__(self, model: str = "claude-sonnet-4-20260512"):


self.model = model


self.max_retries = 3


self.cost_per_token = {"input": 0.000003, "output": 0.000015}




async def run(self, task: str) -> dict:


ctx = AgentContext(task=task)




while ctx.current_step < ctx.max_steps:


ctx.current_step += 1




try:


# 1. Think: decide next action


action = await self.think(ctx)




# 2. Act: execute tool or respond


if action["type"] == "tool_call":


result = await self.execute_tool(action["tool"], action["args"])


ctx.tools_used.append(action["tool"]["name"])




elif action["type"] == "final_answer":


return {


"status": "success",


"answer": action["content"],


"steps": ctx.current_step,


"tools_used": ctx.tools_used,


"total_cost": ctx.total_cost,


}




# 3. Observe: store result


ctx.history.append({


"step": ctx.current_step,


"action": action,


"observation": result,


})




# 4. Cost tracking


ctx.total_cost += self._calculate_cost(action)




except ToolError as e:


# Handle tool failures with retry


await self.handle_tool_error(ctx, e)




except Exception as e:


# Catch-all for unexpected errors


await self.handle_unexpected_error(ctx, e)




return {"status": "max_steps_exceeded", "steps": ctx.max_steps}







Error Handling and Retry Logic





Agents must gracefully handle failures across multiple dimensions:






class AgentErrorHandler:


def __init__(self):


self.retry_policies = {


"rate_limit": RetryPolicy(max_retries=5, backoff="exponential"),


"timeout": RetryPolicy(max_retries=3, backoff="linear"),


"auth_error": RetryPolicy(max_retries=1, backoff="none"),


"tool_crash": RetryPolicy(max_retries=2, backoff="constant"),


}




async def execute_with_retry(self, tool_call: dict) -> dict:


policy = self._get_policy(tool_call["tool"]["name"])




for attempt in range(policy.max_retries):


try:


return await self._execute_tool(tool_call)


except RateLimitError as e:


wait = self._calculate_backoff(attempt, policy.backoff, e.reset_at)


await self._log_retry(tool_call, attempt, wait)


await asyncio.sleep(wait)


except TimeoutError:


if attempt == policy.max_retries - 1:


return self._graceful_degradation(tool_call)


await asyncio.sleep(policy.backoff_delay * (attempt + 1))


except AuthError:


await self._refresh_credentials(tool_call["tool"]["name"])


continue




return {"error": "max_retries_exceeded", "tool": tool_call["tool"]["name"]}




def _graceful_degradation(self, tool_call: dict) -> dict:


"""Return a safe default when a tool is unavailable."""


return {


"status": "unavailable",


"message": f"{tool_call['tool']['name']} is temporarily unavailable",


"suggestion": "Try again later or use an alternative approach",


}







Human-in-the-Loop





Critical agent actions require human approval before execution:






class HumanInTheLoop:


def __init__(self, approval_thresholds: dict):


self.thresholds = approval_thresholds


self.pending_approvals = {}




async def request_approval(


self, action: dict, context: AgentContext


) -> bool:


# Determine if approval is needed


if not self._requires_approval(action):


return True




approval_id = str(uuid.uuid4())


self.pending_approvals[approval_id] = {


"action": action,


"context": context,


"status": "pending",


"created_at": datetime.utcnow(),


}




# Notify human reviewer


await self._notify_reviewer(


approval_id=approval_id,


action_description=action["description"],


risk_level=action.get("risk", "low"),


current_state=context.history[-3:],


)




# Wait for approval (with timeout)


try:


approved = await self._wait_for_approval(approval_id, timeout=300)


self.pending_approvals[approval_id]["status"] = (


"approved" if approved else "rejected"


)


return approved


except TimeoutError:


self.pending_approvals[approval_id]["status"] = "timed_out"


return False




def _requires_approval(self, action: dict) -> bool:


return any([


action.get("risk") in self.thresholds.get("high_risk_actions", []),


action["tool"].get("name") in self.thresholds.get("protected_tools", []),


action.get("amount", 0) > self.thresholds.get("max_amount", 1000),


action.get("destructive", False),


])







Monitoring Agent Behavior





Track agent decisions and outcomes with structured telemetry:






class AgentTelemetry:


def __init__(self):


self.metrics = MetricsClient()


self.tracer = Tracer()




def record_step(self, ctx: AgentContext, action: dict, observation: dict):


span = self.tracer.start_span("agent_step")




span.set_attributes({


"agent.step": ctx.current_step,


"agent.task_hash": hash(ctx.task),


"action.type": action["type"],


"action.tool": action.get("tool", {}).get("name", "none"),


"action.duration_ms": action.get("duration_ms", 0),


"observation.status": observation.get("status", "unknown"),


})




self.metrics.histogram(


"agent.step.duration",


value=action.get("duration_ms", 0),


tags={


"tool": action.get("tool", {}).get("name", "none"),


"status": observation.get("status", "unknown"),


},


)




span.end()




def detect_anomalies(self, ctx: AgentContext) -> List[str]:


warnings = []




# Looping detection


recent_actions = [h["action"]["tool"]["name"]


for h in ctx.history[-10:]]


if len(set(recent_actions)) < 3 and len(recent_actions) >= 5:


warnings.append("POSSIBLE_LOOP")




# Cost anomaly


if ctx.total_cost > 0.50:


warnings.append("HIGH_COST")




# Token usage


total_tokens = sum(


h["action"].get("tokens", 0) for h in ctx.history


)


if total_tokens > 50000:


warnings.append("HIGH_TOKEN_USAGE")




return warnings







Cost Tracking and Rate Limiting






class AgentCostManager:


def __init__(self, daily_budget: float = 10.0):


self.daily_budget = daily_budget


self.daily_spend = 0.0


self.token_buckets = {}




async def check_budget(self, estimated_cost: float) -> bool:


# Reset daily counter


if self._is_new_day():


self.daily_spend = 0.0




if self.daily_spend + estimated_cost > self.daily_budget:


return False # Budget exceeded




self.daily_spend += estimated_cost


return True




async def rate_limit_check(self, tool: str) -> bool:


bucket = self.token_buckets.get(tool, TokenBucket(


capacity=10,


refill_rate=1,


refill_interval=60


))


return bucket.consume()







Observability





Log agent decision traces for debugging and audit:






{


"timestamp": "2026-05-12T10:30:00Z",


"agent_id": "agent-payment-v3",


"session_id": "sess_abc123",


"step": 4,


"action": {


"type": "tool_call",


"tool": "stripe_charge",


"args": {"amount": 49.99, "currency": "usd"},


"reasoning": "Customer requested payment for order ord-789"


},


"observation": {


"status": "success",


"charge_id": "ch_xyz456",


"duration_ms": 234


},


"cost": {


"input_tokens": 1245,


"output_tokens": 89,


"estimated_cost": 0.005


},


"warnings": []


}







Deploy agents incrementally: start with read-only tools, add human-in-the-loop for destructive actions, and only move to fully autonomous mode after extensive monitoring and failure mode analysis.