Your payment service processed the same order twice. Your inventory system decremented stock by 2 instead of 1. Your notification service sent the same welcome email three times to a new user. All of these happened because someone wrote a message consumer that assumed the broker would deliver each message exactly once — and the broker laughed.
Exactly-once delivery is the most dangerous assumption in distributed systems. It’s not that brokers advertise it falsely — Kafka’s transactions and RabbitMQ quorum queues get close, under specific conditions, with the right configuration. The problem is that "exactly-once processing" is a different beast entirely, and most teams conflate the two. The broker delivers once; your consumer crashes mid-handler; the message gets redelivered; you’ve now processed it twice. The broker kept its promise. You just didn’t handle it.
This article is about building and testing consumers that survive the real world — where duplicate messages are routine, not exceptional.
The Three Guarantees, Honestly Explained
Before writing a single test, you need to internalize what each guarantee actually means at the application layer.
At-most-once: fire and forget. The broker sends the message, you acknowledge before processing, and if your handler crashes, the message is gone. You get no duplicates, but you drop data. Fine for metrics aggregation. Catastrophic for financial transactions.
At-least-once: the practical default. Acknowledge after successful processing. If the consumer crashes after processing but before acking, the broker redelivers. You might process the message twice, but you never lose it. Most systems run here, whether they intend to or not.
Exactly-once: requires coordinated transactions between your broker, your consumer, and your downstream side effects — all within a single atomic boundary. Kafka Streams can genuinely achieve this if your entire pipeline lives inside Kafka. The moment you write to a database or call an HTTP API inside your consumer, you’ve broken the transaction boundary and fallen back to at-least-once semantics with extra ceremony.
The honest take: design your consumers for at-least-once, test for it aggressively, and treat exactly-once as an optimization for specific pipeline-internal scenarios — not as a design principle.
What You’re Actually Testing
Most teams test the happy path: message arrives, gets processed, side effect happens, done. That’s not enough. The test matrix for a message consumer should cover:
- Duplicate delivery: same message, same payload, delivered twice
- Out-of-order delivery: messages arrive in a different sequence than they were published
- Partial failure: consumer processes halfway through, then throws
- Slow consumers: processing takes longer than the broker’s visibility timeout
- Poison messages: malformed payloads that crash the handler
- Broker disconnection mid-flight: consumer loses the connection after receiving but before acking
Most of these aren’t exotic edge cases. On any reasonably loaded system with a consumer deployment or a network hiccup, they happen daily.
Building a Testable Consumer
The biggest architectural mistake is writing consumers that mix message handling with business logic in one blob. You can’t test the idempotency logic without spinning up a broker, and you can’t test the broker interaction without triggering business logic. Separate them.
Here’s a pattern that works. The consumer is a thin adapter; the handler contains the logic; the handler is tested independently.
# consumer.py — thin adapter, minimal logic
import json
import logging
from typing import Callable
log = logging.getLogger(__name__)
class MessageConsumer:
def __init__(self, broker_client, handler: Callable):
self.client = broker_client
self.handler = handler
def run(self):
for message in self.client.poll():
try:
payload = json.loads(message.body)
self.handler(payload)
self.client.ack(message)
except Exception as e:
log.error("handler failed, nacking: %s", e)
self.client.nack(message, requeue=True)
# order_handler.py — business logic, fully testable without a broker
class OrderHandler:
def __init__(self, db, payment_gateway):
self.db = db
self.payment_gateway = payment_gateway
def __call__(self, payload: dict):
order_id = payload["order_id"]
# Idempotency check — the most important line in this file
if self.db.order_exists(order_id):
log.info("duplicate order %s, skipping", order_id)
return
self.db.create_order(order_id, payload)
self.payment_gateway.charge(order_id, payload["amount"])
self.db.mark_order_processed(order_id)
The idempotency check on order_id is doing the heavy lifting. If the same message arrives twice, the second invocation is a no-op. This is the core of surviving at-least-once delivery.
Testing Idempotency Directly
With the handler decoupled from the broker, you can write clear, fast unit tests that prove idempotency without any infrastructure.
# test_order_handler.py
import pytest
from unittest.mock import MagicMock, call
from order_handler import OrderHandler
@pytest.fixture
def db():
mock = MagicMock()
mock.order_exists.return_value = False # fresh by default
return mock
@pytest.fixture
def payment_gateway():
return MagicMock()
@pytest.fixture
def handler(db, payment_gateway):
return OrderHandler(db, payment_gateway)
def test_processes_new_order(handler, db, payment_gateway):
payload = {"order_id": "ord-123", "amount": 4999}
handler(payload)
db.create_order.assert_called_once_with("ord-123", payload)
payment_gateway.charge.assert_called_once_with("ord-123", 4999)
def test_skips_duplicate_order(handler, db, payment_gateway):
db.order_exists.return_value = True # already processed
payload = {"order_id": "ord-123", "amount": 4999}
handler(payload)
# No side effects should fire on a duplicate
db.create_order.assert_not_called()
payment_gateway.charge.assert_not_called()
def test_idempotent_under_repeated_calls(handler, db, payment_gateway):
payload = {"order_id": "ord-456", "amount": 1999}
# First call processes normally
handler(payload)
assert db.create_order.call_count == 1
# Simulate second delivery — now db.order_exists returns True
db.order_exists.return_value = True
handler(payload)
# Still only one create, one charge
assert db.create_order.call_count == 1
assert payment_gateway.charge.call_count == 1
This is fast, deterministic, and catches the most common class of bugs. Run these in CI on every commit. They don’t need Docker, they don’t need Kafka running, and they execute in milliseconds.
Integration Tests: Injecting Real Duplicates
Unit tests give you confidence the logic is right. Integration tests prove the whole consumer pipeline handles duplicate delivery without exploding. Use Testcontainers to spin up a real broker instance in your test suite.
# test_consumer_integration.py
import pytest
import json
from testcontainers.rabbitmq import RabbitMqContainer
from your_project.consumer import MessageConsumer
from your_project.order_handler import OrderHandler
@pytest.fixture(scope="module")
def rabbitmq():
with RabbitMqContainer("rabbitmq:3.12-management") as container:
yield container
def test_duplicate_message_processed_once(rabbitmq, real_db):
handler = OrderHandler(db=real_db, payment_gateway=FakePaymentGateway())
consumer = MessageConsumer(
broker_client=connect_to(rabbitmq),
handler=handler
)
payload = {"order_id": "ord-789", "amount": 2500}
message_body = json.dumps(payload)
# Publish the same message twice — simulating broker redelivery
publish(rabbitmq, queue="orders", body=message_body)
publish(rabbitmq, queue="orders", body=message_body)
consumer.process_pending(max_messages=2)
# Only one order should exist in the database
assert real_db.count_orders("ord-789") == 1
The key here is publishing the same payload twice before your consumer processes anything. This is a realistic simulation of what happens when a consumer crashes after processing but before acking, and the broker redelivers on reconnect.
Gotcha: The Window Between Processing and Acking
Here’s a failure mode that even experienced engineers miss. Your handler succeeds, your database write commits, but the process crashes before it can ack the message. The broker redelivers. Your idempotency check catches it. All good, right?
Not if your database write and your downstream call aren’t atomic.
# Dangerous pattern
def __call__(self, payload):
order_id = payload["order_id"]
if self.db.order_exists(order_id):
return
self.db.create_order(order_id, payload) # succeeds
self.payment_gateway.charge(order_id, ...) # crashes here
self.db.mark_order_processed(order_id) # never reached
On redelivery, order_exists returns False (order was created but never marked processed), so you charge the customer again. The fix is a single status field that acts as your idempotency gate, and you only flip it after all side effects complete.
# Safer pattern — check the terminal state, not intermediate
def __call__(self, payload):
order_id = payload["order_id"]
# Only skip if fully processed, not partially
if self.db.get_order_status(order_id) == "COMPLETED":
return
with self.db.transaction():
order = self.db.upsert_order(order_id, payload, status="PROCESSING")
self.payment_gateway.charge(order_id, payload["amount"])
self.db.update_order_status(order_id, "COMPLETED")
If the charge succeeds but the status update fails, you retry the whole thing — but your payment gateway should also be idempotent (most are, with an idempotency key). Pass order_id as the idempotency key to the payment gateway so even charging twice is a no-op at the gateway level.
Gotcha: Visibility Timeout Is a Trap
SQS users hit this one constantly. Your message visibility timeout is 30 seconds. Your consumer takes 45 seconds to process a large batch. SQS makes the message visible again, another consumer instance picks it up, and you’re processing it concurrently with the original.
Your idempotency check won’t save you here because both processes pass the check at the same time before either has committed.
The fix is a combination of:
- Heartbeating: extend the visibility timeout while you’re still working. Most broker clients support this.
- Optimistic locking: use a database constraint so only one insert wins.
-- The constraint is your real idempotency guarantee
CREATE TABLE processed_orders (
order_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
def __call__(self, payload):
order_id = payload["order_id"]
try:
# This fails with IntegrityError if another worker got here first
self.db.execute(
"INSERT INTO processed_orders (order_id) VALUES (%s)",
[order_id]
)
except IntegrityError:
log.info("concurrent duplicate detected for %s, skipping", order_id)
return
# Safe to proceed — we hold the lock via unique constraint
self._do_the_work(payload)
The INSERT acts as a distributed mutex. Only one process wins, the other safely skips. This is more reliable than a SELECT followed by an INSERT, which has a TOCTOU race.
Chaos Testing: Prove It Survives a Crash
Unit and integration tests cover the logic. Chaos tests cover the runtime. You need to verify that your consumer, running for real, survives crashes at arbitrary points without causing duplicate side effects.
A simple but effective approach: inject a crash after the first processing attempt, assert the second processing attempt is a no-op.
# test_crash_recovery.py
import threading
import time
def test_handler_survives_crash_and_redelivery(rabbitmq, real_db):
crash_after_first = {"should_crash": True}
def crashable_handler(payload):
order_id = payload["order_id"]
if real_db.get_order_status(order_id) == "COMPLETED":
return # idempotency gate
real_db.upsert_order(order_id, payload)
real_payment_gateway.charge(order_id, payload["amount"])
if crash_after_first["should_crash"]:
crash_after_first["should_crash"] = False
raise RuntimeError("simulated crash before ack") # broker will redeliver
real_db.update_order_status(order_id, "COMPLETED")
payload = {"order_id": "ord-chaos-001", "amount": 999}
publish(rabbitmq, queue="orders", body=json.dumps(payload))
# First attempt: crashes before ack, message redelivers
try:
consume_one(rabbitmq, handler=crashable_handler)
except RuntimeError:
pass
# Second attempt: should complete, idempotency prevents double-charge
consume_one(rabbitmq, handler=crashable_handler)
assert real_db.get_order_status("ord-chaos-001") == "COMPLETED"
assert real_payment_gateway.charge_count("ord-chaos-001") == 1
This is the test most teams never write, and it’s the one that would have caught the most production incidents.
Gotcha: Message Ordering and Idempotency Are Separate Problems
Idempotency handles duplicates. It doesn’t handle out-of-order delivery. If you receive ORDER_CREATED after ORDER_CANCELLED, your idempotency logic might correctly process ORDER_CANCELLED once — but then also process ORDER_CREATED once, leaving the order in an active state when it should be cancelled.
This requires an optimistic version check, not just a duplicate check.
def __call__(self, payload):
order_id = payload["order_id"]
event_version = payload["version"]
current = self.db.get_order(order_id)
if current and current.version >= event_version:
log.info("stale event v%d for order %s (current v%d), skipping",
event_version, order_id, current.version)
return
self.db.apply_event(order_id, payload, version=event_version)
Publishers need to include a monotonically increasing version on every event. If your publisher doesn’t do this today, add it. Everything downstream becomes easier.
Production Checklist
Before you ship a consumer to production, run through this:
Idempotency: does your handler have a unique, stable identifier to deduplicate on? Is the deduplication check and the side effect within the same transaction boundary?
Dead letter queue: what happens to a message your handler can never process successfully? You need a DLQ and alerts on it. Without one, poison messages sit at the head of the queue and block everything.
Visibility timeout tuning: is your max processing time well under the visibility timeout, or are you heartbeating? A consumer that takes 120 seconds on a queue with a 30-second timeout will generate cascading duplicates under load.
Monitoring: track your redelivery rate. A healthy consumer on a stable system should have near-zero redeliveries. A spike in redeliveries tells you something is wrong before you get duplicate-processing complaints.
Idempotency key storage: how long do you keep processed message IDs? A 24-hour window catches most broker redeliveries; a 7-day window catches edge cases after extended outages. Don’t keep them forever — that’s unbounded storage growth.
The Mental Model to Take Away
Think of your message consumer as a function that might be called any number of times with the same input. Your job is to make sure calling it twice produces the same observable outcome as calling it once. That’s the entire contract.
The broker’s delivery guarantee is about whether messages arrive. Your idempotency guarantee is about what happens when they do. These are independent problems. Solve yours regardless of what the broker promises, and you’ll never ship a double-charge bug again.