Introduction
Message brokers are the nervous system of distributed architectures, enabling asynchronous communication between services at scale. Apache Kafka, RabbitMQ, and Apache Pulsar represent three distinct approaches to message processing. Choosing the wrong broker leads to architecture that fights against the tool's strengths. This article provides a technical comparison to guide your decision.
Architecture and Message Model
Apache Kafka
Kafka uses a distributed commit log model with partitioned topics:
# Kafka topic configuration
topic_config:
name: orders
partitions: 12
replication_factor: 3
configs:
cleanup.policy: delete
retention.ms: 604800000 # 7 days
retention.bytes: 1073741824 # 1 GB
compression.type: snappy
min.insync.replicas: 2
max.message.bytes: 1048576 # 1 MB
# Producer configuration (Go)
producer_config:
acks: all # Wait for all replicas
retries: 3
batch.size: 16384 # 16KB
linger.ms: 5 # Wait up to 5ms to batch
compression: snappy
enable.idempotence: true # Exactly-once semantics
Kafka consumers track their position via offsets, enabling replay:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
group_id='order-processor',
enable_auto_commit=False, # Manual offset management
auto_offset_reset='earliest', # Start from beginning if no offset
max_poll_records=500,
session_timeout_ms=30000,
)
for message in consumer:
process_order(message.value)
# Commit offset after successful processing
consumer.commit()
RabbitMQ
RabbitMQ uses a message broker model with exchanges and queues:
import pika
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='rabbitmq-1',
port=5672,
credentials=pika.PlainCredentials('user', 'pass'),
heartbeat=600,
blocked_connection_timeout=300,
)
)
channel = connection.channel()
# Declare exchange and queue
channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True,
)
channel.queue_declare(
queue='order-processing',
durable=True,
arguments={
'x-queue-type': 'quorum', # Highly available queue
'x-message-ttl': 86400000, # 24 hours
'x-dead-letter-exchange': 'orders-dlx',
}
)
# Bind queue to exchange with routing key
channel.queue_bind(
exchange='orders',
queue='order-processing',
routing_key='order.created.*',
)
# Publish message with delivery mode 2 for persistence
channel.basic_publish(
exchange='orders',
routing_key='order.created.europe',
body=json.dumps(order_data),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
priority=5,
),
)
Apache Pulsar
Pulsar separates compute and storage with a two-layer architecture:
import pulsar
# Pulsar client
client = pulsar.Client(
'pulsar://pulsar-broker:6650',
authentication=pulsar.AuthenticationToken('token'),
)
# Producer with schema
producer = client.create_producer(
'persistent://public/default/orders',
schema=pulsar.schema.JsonSchema(Order),
send_timeout_millis=30000,
batching_enabled=True,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.LZ4,
)
# Consumer with subscription type
consumer = client.subscribe(
'persistent://public/default/orders',
subscription_name='order-processor',
subscription_type=pulsar.SubscriptionType.Shared, # Load balanced
initial_position=pulsar.InitialPosition.Earliest,
)
# Negative acknowledgement for retry
while True:
msg = consumer.receive()
try:
order = msg.value()
process_order(order)
consumer.acknowledge(msg)
except Exception:
consumer.negative_acknowledge(msg) # Requeue for retry
Performance Comparison
| Metric | Kafka | RabbitMQ | Pulsar |
|---|---|---|---|
| Max throughput (single partition) | ~100 MB/s | ~10 MB/s | ~100 MB/s |
| End-to-end latency (p99) | 5-50ms | <1ms | 5-20ms |
| Max message size | 1MB (default, configurable) | 128MB | 5MB |
| Partition scaling | Add partitions (no rebalancing in v3+) | Cluster of nodes | Segmented (no rebalancing) |
| Storage efficiency | High (zero-copy) | Medium | Very high (tiered storage) |
Message Persistence and Durability
| Feature | Kafka | RabbitMQ | Pulsar |
|---|---|---|---|
| Default persistence | Disk (all messages) | Memory (configurable) | Disk (all messages) |
| Replication | Partition-based | Queue mirroring / Quorum | BookKeeper (segments) |
| Data retention | Time/Size based | Queue TTL + DLQ | Time/Size + tiered storage |
| Exactly-once | Yes (idempotent producer) | No (at-least-once by default) | Yes (deduplication) |
| Message replay | Yes (offset reset) | No (consumed messages deleted) | Yes (cursor reset) |
Routing and Message Patterns
Kafka: Topic-based routing
# Kafka routing via topic naming convention
TOPIC_ORDER_CREATED = "orders.events.created"
TOPIC_ORDER_CANCELLED = "orders.events.cancelled"
TOPIC_ORDER_SHIPPED = "orders.events.shipped"
RabbitMQ: Flexible exchange types
# Direct exchange: route by exact routing key
channel.exchange_declare('direct-orders', 'direct')
channel.queue_bind('europe-queue', 'direct-orders', 'order.europe')
channel.queue_bind('asia-queue', 'direct-orders', 'order.asia')
# Topic exchange: pattern matching
channel.exchange_declare('topic-orders', 'topic')
channel.queue_bind('all-orders', 'topic-orders', 'order.#')
channel.queue_bind('europe-only', 'topic-orders', 'order.europe.*')
# Fanout exchange: broadcast
channel.exchange_declare('broadcast', 'fanout')
channel.queue_bind('audit-log', 'broadcast', '')
channel.queue_bind('metrics', 'broadcast', '')
Pulsar: Flexible subscriptions
# Exclusive: only one consumer
consumer = client.subscribe('orders', 'exclusive-sub',
subscription_type=pulsar.SubscriptionType.Exclusive)
# Shared: round-robin across consumers
consumer = client.subscribe('orders', 'shared-sub',
subscription_type=pulsar.SubscriptionType.Shared)
# Failover: primary + standby consumers
consumer = client.subscribe('orders', 'failover-sub',
subscription_type=pulsar.SubscriptionType.Failover)
# Key_Shared: ordered delivery per key
consumer = client.subscribe('orders', 'key-shared-sub',
subscription_type=pulsar.SubscriptionType.Key_Shared)
Ecosystem and Operations
| Factor | Kafka | RabbitMQ | Pulsar |
|---|---|---|---|
| Client libraries | Java, Python, Go, .NET, C/C++ | 30+ languages | 10+ languages |
| Monitoring | JMX, Prometheus exporter | Management plugin, Prometheus | Prometheus, Grafana |
| Schema registry | Confluent Schema Registry | Custom | Built-in Schema Registry |
| Tiered storage | Confluent Tiered Storage | No | Built-in (S3, GCS) |
| Multi-tenancy | Topic naming convention | Virtual hosts | Built-in (namespaces, tenants) |
Migration Scenarios
# Dual-write migration pattern
class DualWriteBroker:
def __init__(self, source_broker, target_broker):
self.source = source_broker
self.target = target_broker
async def publish(self, topic: str, message: dict):
# Write to both brokers during migration
results = await asyncio.gather(
self.source.publish(topic, message),
self.target.publish(topic, message),
return_exceptions=True,
)
# Check source first, then target
if results[0]:
raise results[0]
if results[1]:
print(f"Warning: target broker failed: {results[1]}")
async def switch_to_target(self):
"""Once all consumers migrated, stop source writes."""
self.source = NullBroker() # No-op
async def backfill(self, source_topic: str, target_topic: str):
"""Backfill historical data to new broker."""
for message in self.source.consume_from_beginning(source_topic):
await self.target.publish(target_topic, message)
Decision Guide
Kafka remains the default choice for most event-driven architectures, but RabbitMQ excels where complex routing and low latency are paramount, and Pulsar is the strongest contender for large-scale multi-tenant deployments.