AI Workflow Automation: LangChain, Temporal, Event-Driven Agents


Introduction





Production AI systems need more than a single LLM call or a simple chain. They require reliable, long-running workflows that handle failures, retries, human-in-the-loop interventions, and complex state management. This article covers three complementary approaches to AI workflow automation: LangChain for LLM orchestration, Temporal for durable execution, and event-driven architectures for scalable pipelines.





LangChain Workflows





LangChain provides abstractions for building LLM-powered workflows:






from langchain.chains import LLMChain, SequentialChain


from langchain.prompts import PromptTemplate


from langchain.chat_models import ChatAnthropic


from langchain.memory import ConversationBufferMemory




llm = ChatAnthropic(model="claude-sonnet-4-20260512")




# Define individual chain steps


extract_prompt = PromptTemplate(


input_variables=["text"],


template="Extract key requirements from this text:\n{text}",


)


extract_chain = LLMChain(llm=llm, prompt=extract_prompt, output_key="requirements")




analyze_prompt = PromptTemplate(


input_variables=["requirements"],


template="Analyze these requirements and identify potential issues:\n{requirements}",


)


analyze_chain = LLMChain(llm=llm, prompt=analyze_prompt, output_key="analysis")




generate_prompt = PromptTemplate(


input_variables=["requirements", "analysis"],


template="Based on requirements and analysis, generate a solution:\nRequirements: {requirements}\nAnalysis: {analysis}",


)


generate_chain = LLMChain(llm=llm, prompt=generate_prompt, output_key="solution")




# Compose into a sequential workflow


workflow = SequentialChain(


chains=[extract_chain, analyze_chain, generate_chain],


input_variables=["text"],


output_variables=["requirements", "analysis", "solution"],


verbose=True,


)




result = workflow({"text": "Build a REST API for user management with authentication"})







Temporal for Durable Execution





Temporal provides reliability guarantees for long-running AI workflows:






from temporalio import activity, workflow


from temporalio.client import Client


from temporalio.worker import Worker


import asyncio




# Define activities (individual steps)


@activity.defn


async def retrieve_documents(query: str) -> list[str]:


return await vector_search(query)




@activity.defn


async def generate_answer(context: list[str], question: str) -> str:


return await call_llm(f"Context: {context}\nQuestion: {question}")




@activity.defn


async def review_output(answer: str) -> str:


"""Human-in-the-loop review with timeout."""


review = await request_human_review(answer, timeout=3600)


if review["approved"]:


return answer


return f"Needs revision: {review['feedback']}"




# Define workflow


@workflow.defn


class DocumentQAWorkflow:


@workflow.run


async def run(self, question: str) -> dict:


# Step 1: Retrieve documents


docs = await workflow.execute_activity(


retrieve_documents, question,


start_to_close_timeout=timedelta(seconds=30),


retry_policy={"maximum_attempts": 3},


)




# Step 2: Generate answer


answer = await workflow.execute_activity(


generate_answer, [docs, question],


start_to_close_timeout=timedelta(minutes=2),


)




# Step 3: Human review


final = await workflow.execute_activity(


review_output, answer,


start_to_close_timeout=timedelta(hours=2),


)




return {"question": question, "answer": final}




# Run the workflow


async def start_workflow():


client = await Client.connect("localhost:7233")


handle = await client.start_workflow(


DocumentQAWorkflow.run,


"What is our GDPR compliance policy?",


id="doc-qa-workflow-001",


task_queue="ai-tasks",


)


result = await handle.result()


print(result)







Temporal automatically retries failed activities, persists workflow state, and enables human-in-the-loop pauses.





Event-Driven Architecture





Event-driven workflows react to events with loosely coupled agents:






import asyncio


from enum import Enum




class EventType(Enum):


DOCUMENT_UPLOADED = "document.uploaded"


QUERY_RECEIVED = "query.received"


ANALYSIS_COMPLETE = "analysis.complete"


HUMAN_REVIEW_NEEDED = "human.review.needed"


WORKFLOW_COMPLETE = "workflow.complete"




@dataclass


class Event:


type: EventType


data: dict


source: str


timestamp: float = None




class EventBus:


def __init__(self):


self.subscribers: dict[EventType, list[callable]] = {}


self.event_log: list[Event] = []




def subscribe(self, event_type: EventType, handler: callable):


if event_type not in self.subscribers:


self.subscribers[event_type] = []


self.subscribers[event_type].append(handler)




async def emit(self, event: Event):


self.event_log.append(event)


handlers = self.subscribers.get(event.type, [])


results = await asyncio.gather(


*[handler(event) for handler in handlers],


return_exceptions=True,


)


return results




# Event-driven document processing pipeline


class DocumentProcessingPipeline:


def __init__(self, event_bus: EventBus):


self.bus = event_bus


self._register_handlers()




def _register_handlers(self):


self.bus.subscribe(EventType.DOCUMENT_UPLOADED, self.handle_document)


self.bus.subscribe(EventType.ANALYSIS_COMPLETE, self.handle_analysis)


self.bus.subscribe(EventType.QUERY_RECEIVED, self.handle_query)




async def handle_document(self, event: Event):


doc = event.data["document"]


# Extract text, chunk, embed


chunks = chunk_document(doc)


embeddings = embed_chunks(chunks)


store_embeddings(embeddings)




await self.bus.emit(Event(


type=EventType.ANALYSIS_COMPLETE,


data={"doc_id": doc["id"], "chunks": len(chunks)},


source="document_processor",


))




async def handle_query(self, event: Event):


query = event.data["query"]


docs = vector_search(query)


answer = call_llm(f"Context: {docs}\nQuery: {query}")




await self.bus.emit(Event(


type=EventType.WORKFLOW_COMPLETE,


data={"query": query, "answer": answer},


source="query_handler",


))







Error Recovery and Retry





All workflows need robust error handling:






from tenacity import retry, stop_after_attempt, wait_exponential




class WorkflowExecutor:


@retry(


stop=stop_after_attempt(3),


wait=wait_exponential(multiplier=1, min=4, max=60),


)


async def execute_with_retry(self, workflow_fn, *args, **kwargs):


try:


return await workflow_fn(*args, **kwargs)


except LLMAPIError as e:


if e.is_rate_limit():


raise # Let retry handle it


raise


except ContextWindowError:


# Reduce context size and retry


kwargs["max_context"] = kwargs.get("max_context", 4000) // 2


return await self.execute_with_retry(workflow_fn, *args, **kwargs)


except Exception:


# Log and escalate to human


await self.escalate_to_human(workflow_fn, args, kwargs)


raise







Conclusion





AI workflow automation requires reliability guarantees beyond simple scripting. LangChain provides the LLM orchestration layer with composable chains. Temporal adds durability with automatic retries, state persistence, and human-in-the-loop capabilities. Event-driven architectures enable loosely coupled, scalable pipelines. For production systems, combine all three: use LangChain for LLM logic within Temporal activities, and wire everything together with an event bus for scalability.