Python's asyncio has matured into the standard way to write concurrent I/O-bound code, but the learning curve remains steep. The introduction of Task Groups in Python 3.11 and improved exception handling make async Python more ergonomic than ever. This guide covers the patterns that work — and the ones that bite you — with production-ready examples.

Core Concepts

ConceptWhat It IsWhen to Use
CoroutineA function defined with async def — can be suspended and resumedAny I/O-bound operation: HTTP requests, DB queries, file I/O
TaskA coroutine wrapped and scheduled to run on the event loopRun multiple coroutines concurrently
Event LoopThe scheduler that runs async code (one thread, cooperative multitasking)You rarely touch this directly — asyncio.run() handles it
AwaitableAnything you can await: coroutine, Task, FutureUse await to pause until the result is ready

Task Groups (Python 3.11+): The Modern Way

Best for: Running multiple async tasks concurrently with proper error handling. Replaces asyncio.gather() with structured concurrency — if one task fails, all sibling tasks are cancelled.

import asyncio
import aiohttp

async def fetch_url(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()

async def main():
    urls = ['https://api1.example.com', 'https://api2.example.com']
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_url(url)) for url in urls]
    # All tasks completed (or exception propagated) after exiting TaskGroup
    return [t.result() for t in tasks]

results = asyncio.run(main())

asyncio.gather vs as_completed

FunctionBehaviorUse Case
asyncio.gather()Run all tasks, return results in order. If one raises, it propagates. Prefer TaskGroup in 3.11+.When you need all results and order matters
asyncio.as_completed()Yield results as each task finishes (fastest first)When you want to process results as they arrive
asyncio.wait()Low-level: wait for FIRST_COMPLETED or ALL_COMPLETEDTimeouts, race conditions, advanced patterns

Common Pitfalls and Solutions

1. Running Blocking Code in Async Functions

Problem: Calling a sync function (e.g., time.sleep, requests.get, heavy CPU work) inside an async function blocks the entire event loop. Solution: Use asyncio.to_thread() for I/O-bound blocking code, or run_in_executor() for CPU-bound work.

# Bad: blocks the event loop
result = requests.get('https://api.example.com')

# Good: offload to a thread
result = await asyncio.to_thread(requests.get, 'https://api.example.com')

2. Unhandled Exceptions in Background Tasks

Problem: If a Task raises an exception and you never await it, the exception is silently lost until garbage collection. Solution: Always use TaskGroup — it ensures exceptions are propagated. For fire-and-forget tasks, add a done callback that logs exceptions.

3. Creating Too Many Concurrent Connections

Solution: Use asyncio.Semaphore to limit concurrency:

sem = asyncio.Semaphore(20)  # Max 20 concurrent requests
async def rate_limited_fetch(url):
    async with sem:
        return await fetch_url(url)

Bottom line: Use TaskGroup for structured concurrency — it replaces the error-prone gather() pattern. Keep async code purely async (offload blocking code to threads). Always limit concurrency with Semaphore when making external requests. See also: Node.js Streams Guide and Error Handling Best Practices.