Distributed Tracing Across Async Boundaries: SQS, Kafka, and EventBridge

Your HTTP services are fully traced. Every REST call shows up in Jaeger or Tempo as a clean waterfall, latency broken down to the microsecond. Then a message hits SQS, and the trace just… ends. The consumer processes it 400 milliseconds later, starts a brand new root span, and you have no idea if those two things are even related.

This is the async boundary problem. It bites every team that moves beyond synchronous request-response, and most of them don’t notice until they’re trying to debug a production incident by correlating timestamps in two completely unrelated traces.

The fix isn’t complicated, but it requires you to be deliberate. Context propagation doesn’t happen automatically across message queues — you have to carry the trace header yourself, stuffed into whatever the broker gives you to work with.

This article walks through doing that properly for SQS, Kafka, and EventBridge, using OpenTelemetry as the instrumentation layer. No vendor lock-in, no X-Ray-specific hacks that only work inside AWS.


Why HTTP tracing doesn’t help you here

When you make an HTTP call, OpenTelemetry auto-instrumentation injects traceparent and tracestate into the request headers before the bytes leave your process. The receiving service extracts those headers, creates a child span, and the trace is connected.

Message brokers break this model in two ways. First, there’s a time gap — the producer and consumer don’t share a network call. Second, most SDKs don’t automatically inject trace context into message payloads. You have to do it yourself.

The standard you want is W3C Trace Context. Two headers: traceparent carries the version, trace ID, parent span ID, and flags. tracestate carries vendor-specific key-value pairs. If you use these, any OpenTelemetry-compatible backend will stitch the spans together correctly regardless of how many brokers or queues the event crosses.


Setup: OpenTelemetry SDK baseline

Before touching SQS or Kafka, get the OTel SDK wired up. I’ll use Python for the examples since it covers the most ground without being verbose, but the same concepts apply to Go, Java, and Node.js.

pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
# tracing.py — bootstrap once at process startup
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

def init_tracer(service_name: str) -> trace.Tracer:
    resource = Resource(attributes={"service.name": service_name})
    provider = TracerProvider(resource=resource)
    exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

For context propagation itself:

pip install opentelemetry-propagator-b3  # if you need B3 compat
# W3C is built into the SDK, no extra package needed

SQS: Message Attributes are your carrier

SQS gives you up to 10 message attributes per message, each typed as String, Binary, or Number. Trace context fits cleanly as a String attribute. AWS X-Ray uses AWSTraceHeader — ignore that if you’re going OpenTelemetry-native. Mixing both causes confusion.

Producer side

import boto3
import json
from opentelemetry import trace, propagate
from opentelemetry.propagators.textmap import DefaultSetter

tracer = init_tracer("order-service")
sqs = boto3.client("sqs", region_name="eu-west-1")

class SQSAttributeSetter(DefaultSetter):
    """Wraps OTel's setter interface to build SQS MessageAttributes format."""
    def set(self, carrier: dict, key: str, value: str):
        carrier[key] = {"DataType": "String", "StringValue": value}

def publish_order_created(queue_url: str, order: dict):
    with tracer.start_as_current_span("order.publish") as span:
        span.set_attribute("messaging.system", "aws_sqs")
        span.set_attribute("messaging.destination", queue_url)
        span.set_attribute("order.id", order["id"])

        # Inject W3C traceparent/tracestate into the attributes dict
        message_attributes = {}
        propagate.inject(message_attributes, setter=SQSAttributeSetter())

        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(order),
            MessageAttributes=message_attributes,
        )

The propagate.inject call writes traceparent and optionally tracestate into message_attributes using your custom setter. The result looks like:

{
  "traceparent": {
    "DataType": "String",
    "StringValue": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
  }
}

Consumer side

from opentelemetry.propagators.textmap import DefaultGetter

class SQSAttributeGetter(DefaultGetter):
    def get(self, carrier: dict, key: str):
        attr = carrier.get(key)
        if attr and "StringValue" in attr:
            return [attr["StringValue"]]
        return []

    def keys(self, carrier: dict):
        return list(carrier.keys())

def process_sqs_messages(queue_url: str):
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MessageAttributeNames=["All"],  # critical — defaults to none
        MaxNumberOfMessages=10,
    )

    for message in response.get("Messages", []):
        attributes = message.get("MessageAttributes", {})

        # Extract context — this creates a new span as a child of the producer span
        ctx = propagate.extract(attributes, getter=SQSAttributeGetter())

        with tracer.start_as_current_span(
            "order.process",
            context=ctx,
            kind=trace.SpanKind.CONSUMER,
        ) as span:
            span.set_attribute("messaging.system", "aws_sqs")
            span.set_attribute("messaging.message_id", message["MessageId"])
            # ... actual processing ...

Gotcha #1: If you forget MessageAttributeNames=["All"] in receive_message, SQS returns zero attributes. The trace will extract nothing, create a new root span, and you’ll be staring at disconnected traces wondering what went wrong. This is the single most common mistake.

Gotcha #2: SQS FIFO queues support the same attribute mechanism, but if you use message group IDs, make sure your consumer doesn’t accidentally process messages out of order in a way that corrupts span timing. The trace will look fine but timestamps will lie.


Kafka: Headers are the right tool

Kafka has had message headers since version 0.11. They’re byte-array key-value pairs — exactly what OTel expects from a carrier. Don’t stuff trace context into the message key or value JSON. Headers exist for metadata.

Producer side

from confluent_kafka import Producer
from opentelemetry import trace, propagate
from opentelemetry.propagators.textmap import DefaultSetter

class KafkaHeaderSetter(DefaultSetter):
    def set(self, carrier: list, key: str, value: str):
        # Kafka headers are a list of (key, value) tuples, value as bytes
        carrier.append((key, value.encode("utf-8")))

producer = Producer({"bootstrap.servers": "kafka:9092"})
tracer = init_tracer("payment-service")

def publish_payment_event(topic: str, payload: dict):
    with tracer.start_as_current_span("payment.publish") as span:
        span.set_attribute("messaging.system", "kafka")
        span.set_attribute("messaging.destination", topic)

        headers = []
        propagate.inject(headers, setter=KafkaHeaderSetter())

        producer.produce(
            topic=topic,
            value=json.dumps(payload).encode("utf-8"),
            headers=headers,
        )
        producer.flush()

Consumer side

from confluent_kafka import Consumer
from opentelemetry.propagators.textmap import DefaultGetter

class KafkaHeaderGetter(DefaultGetter):
    def get(self, carrier: list, key: str):
        # carrier is a list of (header_key, header_value_bytes) tuples
        for k, v in carrier:
            if k == key:
                return [v.decode("utf-8")]
        return []

    def keys(self, carrier: list):
        return [k for k, _ in carrier]

consumer = Consumer({
    "bootstrap.servers": "kafka:9092",
    "group.id": "payment-processor",
    "auto.offset.reset": "earliest",
})
consumer.subscribe(["payments"])

while True:
    msg = consumer.poll(1.0)
    if msg is None or msg.error():
        continue

    headers = msg.headers() or []
    ctx = propagate.extract(headers, getter=KafkaHeaderGetter())

    with tracer.start_as_current_span(
        "payment.process",
        context=ctx,
        kind=trace.SpanKind.CONSUMER,
    ) as span:
        span.set_attribute("messaging.system", "kafka")
        span.set_attribute("messaging.kafka.topic", msg.topic())
        span.set_attribute("messaging.kafka.partition", msg.partition())
        span.set_attribute("messaging.kafka.offset", msg.offset())
        # ... process payment ...

Gotcha #3: Kafka’s msg.headers() returns None when there are no headers, not an empty list. Always do msg.headers() or []. One AttributeError in production at 2 AM is enough to teach this lesson.

Gotcha #4: If you’re using kafka-python instead of confluent-kafka, headers look slightly different — they’re [(key_str, value_bytes), ...] in both cases, but the consumer API calls are different. Don’t mix the two libraries in the same service expecting compatible header formats.

Gotcha #5: Kafka compaction can remove older messages. If your consumer is far behind and the broker has compacted the topic, the trace chain breaks at compaction boundaries. This is expected behavior, not a bug — just document it for your team so nobody spends an hour wondering why old traces are disconnected.


EventBridge: The awkward one

EventBridge doesn’t have message attributes or headers. Every event has a fixed structure: source, detail-type, detail, and a handful of AWS-managed fields. The detail field is a free-form JSON object. That’s where trace context has to live.

This is a compromise. You’re mixing application payload with infrastructure metadata. There’s no clean separation. Accept it and standardize on a key name — I use _otel to make it obvious this isn’t business data.

Producer side

import boto3
import json
from opentelemetry import trace, propagate

events = boto3.client("events", region_name="eu-west-1")
tracer = init_tracer("inventory-service")

def publish_stock_updated(item_id: str, quantity: int):
    with tracer.start_as_current_span("stock.publish") as span:
        span.set_attribute("messaging.system", "aws_eventbridge")

        # Collect propagation headers into a plain dict
        trace_carrier = {}
        propagate.inject(trace_carrier)

        detail = {
            "item_id": item_id,
            "quantity": quantity,
            "_otel": trace_carrier,  # embed trace context in detail
        }

        events.put_events(Entries=[{
            "Source": "com.mycompany.inventory",
            "DetailType": "StockUpdated",
            "Detail": json.dumps(detail),
            "EventBusName": "myapp-bus",
        }])

Consumer side (Lambda)

import json
from opentelemetry import trace, propagate

tracer = init_tracer("warehouse-service")

def handler(event, context):
    detail = event.get("detail", {})
    trace_carrier = detail.pop("_otel", {})  # extract and strip from payload

    ctx = propagate.extract(trace_carrier)

    with tracer.start_as_current_span(
        "stock.process",
        context=ctx,
        kind=trace.SpanKind.CONSUMER,
    ) as span:
        span.set_attribute("messaging.system", "aws_eventbridge")
        span.set_attribute("faas.trigger", "pubsub")
        # ... process detail ...

Gotcha #6: EventBridge has a 256 KB event size limit. The _otel block adds roughly 200-300 bytes (traceparent is 55 characters). Not a problem in practice, but if you’re embedding large baggage in tracestate, it can push you closer to the limit. Keep baggage minimal.

Gotcha #7: EventBridge Pipes and EventBridge Scheduler transform events as they pass through. If you’ve set up an input transformer on a rule, it may strip fields from detail that aren’t in your template — including _otel. Check your transformer templates if traces stop connecting at a pipe boundary.

Gotcha #8: AWS X-Ray has native EventBridge integration that injects its own trace header. If you’re running both X-Ray and OTel simultaneously, you’ll see duplicate and conflicting context in detail. Pick one and stick with it across all services. Half-migrated setups are worse than no tracing.


Production patterns worth adopting

Correlation ID as a span attribute. Always emit a business-level correlation ID (order ID, session ID, request ID) as a span attribute alongside the OTel trace context. When a non-technical person files a bug report, they’ll have an order ID, not a trace ID. Having both lets you find the trace from either direction.

span.set_attribute("order.id", order_id)
span.set_attribute("correlation.id", correlation_id)

Sampling decisions must be made at the producer. If your producer samples at 10% and injects a non-sampled traceparent, the consumer must respect that decision and not create a new sampled span. OpenTelemetry handles this correctly as long as you use propagate.extract — it reads the sampling flag from traceparent and creates an unsampled span automatically. Don’t override this with your own sampling logic on the consumer side.

Dead Letter Queue tracing. When a message lands in a DLQ after retries, the trace context is still in the message attributes. Instrument your DLQ processor the same way as any consumer. You’ll get a trace that shows the original publish, all the failed attempts, and the final DLQ processing — invaluable for debugging retry exhaustion bugs.

Span naming conventions. Use messaging.publish and messaging.process as the pattern, prefixed by the domain concept. order.publish, order.process. Not send_sqs_message or lambda_handler. Your traces will read like a business workflow, not an infrastructure diagram.

Baggage for fan-out pipelines. If one event fans out to multiple consumers who each spawn their own downstream events, use OTel Baggage to carry a root workflow ID. Every span across the entire pipeline carries it as an attribute, making it trivial to query "all spans involved in processing workflow X" even when they span multiple trace IDs.

from opentelemetry.baggage import set_baggage, get_baggage
from opentelemetry import context

ctx = set_baggage("workflow.id", workflow_id)
# attach ctx to your span start

Testing your propagation

The fastest way to validate that propagation works: write an integration test that publishes a message with a known trace ID, consumes it, and asserts that the consumer span has the correct trace_id and parent_span_id.

def test_sqs_trace_propagation():
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

    exporter = InMemorySpanExporter()
    provider = TracerProvider()
    provider.add_span_processor(SimpleSpanProcessor(exporter))
    trace.set_tracer_provider(provider)

    tracer = trace.get_tracer("test")
    with tracer.start_as_current_span("producer-root") as root_span:
        publish_order_created(TEST_QUEUE_URL, {"id": "test-123"})

    messages = receive_from_queue(TEST_QUEUE_URL)
    process_sqs_messages_single(messages[0])

    spans = exporter.get_finished_spans()
    producer_span = next(s for s in spans if s.name == "order.publish")
    consumer_span = next(s for s in spans if s.name == "order.process")

    assert consumer_span.context.trace_id == producer_span.context.trace_id
    assert consumer_span.parent.span_id == producer_span.context.span_id

Run this against LocalStack for SQS or a local Kafka broker. If the trace IDs don’t match, you have a propagation bug. Find it now, not in production.


Async boundaries don’t have to be observability black holes. The W3C Trace Context standard exists precisely for this problem, and OpenTelemetry’s propagation API is designed to work with any carrier — whether that’s HTTP headers, SQS attributes, Kafka headers, or a JSON field stuffed into an EventBridge detail payload.

The work is mechanical. Write the setter, write the getter, inject on produce, extract on consume. Do it once per broker type, wrap it in a utility module, and every service in your org gets connected traces for free.

The teams that skip this are the ones correlating timestamps by hand during incidents. Don’t be those teams.

Leave a comment

👁 Views: 2,289 · Unique visitors: 1,646