AI Workflow Automation: LangChain, Temporal, Event-Driven Agents
Introduction
Production AI systems need more than a single LLM call or a simple chain. They require reliable, long-running workflows that handle failures, retries, human-in-the-loop interventions, and complex state management. This article covers three complementary approaches to AI workflow automation: LangChain for LLM orchestration, Temporal for durable execution, and event-driven architectures for scalable pipelines.
LangChain Workflows
LangChain provides abstractions for building LLM-powered workflows:
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatAnthropic
from langchain.memory import ConversationBufferMemory
llm = ChatAnthropic(model="claude-sonnet-4-20260512")
# Define individual chain steps
extract_prompt = PromptTemplate(
input_variables=["text"],
template="Extract key requirements from this text:\n{text}",
)
extract_chain = LLMChain(llm=llm, prompt=extract_prompt, output_key="requirements")
analyze_prompt = PromptTemplate(
input_variables=["requirements"],
template="Analyze these requirements and identify potential issues:\n{requirements}",
)
analyze_chain = LLMChain(llm=llm, prompt=analyze_prompt, output_key="analysis")
generate_prompt = PromptTemplate(
input_variables=["requirements", "analysis"],
template="Based on requirements and analysis, generate a solution:\nRequirements: {requirements}\nAnalysis: {analysis}",
)
generate_chain = LLMChain(llm=llm, prompt=generate_prompt, output_key="solution")
# Compose into a sequential workflow
workflow = SequentialChain(
chains=[extract_chain, analyze_chain, generate_chain],
input_variables=["text"],
output_variables=["requirements", "analysis", "solution"],
verbose=True,
)
result = workflow({"text": "Build a REST API for user management with authentication"})
Temporal for Durable Execution
Temporal provides reliability guarantees for long-running AI workflows:
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
import asyncio
# Define activities (individual steps)
@activity.defn
async def retrieve_documents(query: str) -> list[str]:
return await vector_search(query)
@activity.defn
async def generate_answer(context: list[str], question: str) -> str:
return await call_llm(f"Context: {context}\nQuestion: {question}")
@activity.defn
async def review_output(answer: str) -> str:
"""Human-in-the-loop review with timeout."""
review = await request_human_review(answer, timeout=3600)
if review["approved"]:
return answer
return f"Needs revision: {review['feedback']}"
# Define workflow
@workflow.defn
class DocumentQAWorkflow:
@workflow.run
async def run(self, question: str) -> dict:
# Step 1: Retrieve documents
docs = await workflow.execute_activity(
retrieve_documents, question,
start_to_close_timeout=timedelta(seconds=30),
retry_policy={"maximum_attempts": 3},
)
# Step 2: Generate answer
answer = await workflow.execute_activity(
generate_answer, [docs, question],
start_to_close_timeout=timedelta(minutes=2),
)
# Step 3: Human review
final = await workflow.execute_activity(
review_output, answer,
start_to_close_timeout=timedelta(hours=2),
)
return {"question": question, "answer": final}
# Run the workflow
async def start_workflow():
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
DocumentQAWorkflow.run,
"What is our GDPR compliance policy?",
id="doc-qa-workflow-001",
task_queue="ai-tasks",
)
result = await handle.result()
print(result)
Temporal automatically retries failed activities, persists workflow state, and enables human-in-the-loop pauses.
Event-Driven Architecture
Event-driven workflows react to events with loosely coupled agents:
import asyncio
from enum import Enum
class EventType(Enum):
DOCUMENT_UPLOADED = "document.uploaded"
QUERY_RECEIVED = "query.received"
ANALYSIS_COMPLETE = "analysis.complete"
HUMAN_REVIEW_NEEDED = "human.review.needed"
WORKFLOW_COMPLETE = "workflow.complete"
@dataclass
class Event:
type: EventType
data: dict
source: str
timestamp: float = None
class EventBus:
def __init__(self):
self.subscribers: dict[EventType, list[callable]] = {}
self.event_log: list[Event] = []
def subscribe(self, event_type: EventType, handler: callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def emit(self, event: Event):
self.event_log.append(event)
handlers = self.subscribers.get(event.type, [])
results = await asyncio.gather(
*[handler(event) for handler in handlers],
return_exceptions=True,
)
return results
# Event-driven document processing pipeline
class DocumentProcessingPipeline:
def __init__(self, event_bus: EventBus):
self.bus = event_bus
self._register_handlers()
def _register_handlers(self):
self.bus.subscribe(EventType.DOCUMENT_UPLOADED, self.handle_document)
self.bus.subscribe(EventType.ANALYSIS_COMPLETE, self.handle_analysis)
self.bus.subscribe(EventType.QUERY_RECEIVED, self.handle_query)
async def handle_document(self, event: Event):
doc = event.data["document"]
# Extract text, chunk, embed
chunks = chunk_document(doc)
embeddings = embed_chunks(chunks)
store_embeddings(embeddings)
await self.bus.emit(Event(
type=EventType.ANALYSIS_COMPLETE,
data={"doc_id": doc["id"], "chunks": len(chunks)},
source="document_processor",
))
async def handle_query(self, event: Event):
query = event.data["query"]
docs = vector_search(query)
answer = call_llm(f"Context: {docs}\nQuery: {query}")
await self.bus.emit(Event(
type=EventType.WORKFLOW_COMPLETE,
data={"query": query, "answer": answer},
source="query_handler",
))
Error Recovery and Retry
All workflows need robust error handling:
from tenacity import retry, stop_after_attempt, wait_exponential
class WorkflowExecutor:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
)
async def execute_with_retry(self, workflow_fn, *args, **kwargs):
try:
return await workflow_fn(*args, **kwargs)
except LLMAPIError as e:
if e.is_rate_limit():
raise # Let retry handle it
raise
except ContextWindowError:
# Reduce context size and retry
kwargs["max_context"] = kwargs.get("max_context", 4000) // 2
return await self.execute_with_retry(workflow_fn, *args, **kwargs)
except Exception:
# Log and escalate to human
await self.escalate_to_human(workflow_fn, args, kwargs)
raise
Conclusion
AI workflow automation requires reliability guarantees beyond simple scripting. LangChain provides the LLM orchestration layer with composable chains. Temporal adds durability with automatic retries, state persistence, and human-in-the-loop capabilities. Event-driven architectures enable loosely coupled, scalable pipelines. For production systems, combine all three: use LangChain for LLM logic within Temporal activities, and wire everything together with an event bus for scalability.