cki_tools.amqp_bridge

AMQP Bridge

Single service to act as a bridge between external AMQP 0.91, AMQP 1.0, SQS queues and Kafka topics and a single RabbitMQ exchange.

                          RabbitMQ
                        +----------------------------------------+
+--------------+        |  +----------+          +------------+  |
| AMQP091      +-----+---->+ Exchange +----+---->+ Consumer_1 |  |
+--------------+     |  |  +----------+    |     +------------+  |
+--------------+     |  |                  |     +------------+  |
| AMQP10       +-----+  |                  +---->+ Consumer_2 |  |
+--------------+     |  |                        +------------+  |
+--------------+     |  |                        :            :  |
| SQS          +-----+  |                        +------------+  |
+--------------+     |  |                        | Consumer_n |  |
+--------------+     |  +----------------------------------------+
| Kafka(rx,tx) +<----+
+--------------+

A different process needs to be spawned for each server to mirror. Each one will subscribe to a given list of topics and forward them to the configured RabbitMQ exchange.

In case of Kafka, the bridge can be bidirectional, with one process for each direction. Kafka TX consumes from RabbitMQ and produces to Kafka, while Kafka RX consumes from Kafka and produces to RabbitMQ.

Usage

usage: python3 -m cki_tools.amqp_bridge [-h]
           [--from-datagrepper] [--start START] [--end END]

options:
  -h, --help          show this help message and exit

Retrieve messages from datagrepper:
  --from-datagrepper  Retrieve messages from datagrepper instead of listening to AMQP
  --start START       Start time for datagrepper messages in ISO format
  --end END           End time for datagrepper messages in ISO format

When called without any arguments, the service will start listening to the configured AMQP servers and forward messages to the RabbitMQ exchange.

When called with --from-datagrepper, the service will retrieve messages from datagrepper instead of listening to AMQP. The --start and --end options specify the time range for the retrieved messages.

In case of Kafka, the service can bridge messages from RabbitMQ to Kafka but also from Kafka to RabbitMQ. This is controlled by the protocol configuration parameter that, in case of Kafka, can be kafka_rx or kafka_tx.

Configuration

Environment variable Description
CKI_DEPLOYMENT_ENVIRONMENT Define the deployment environment (production/staging)
RABBITMQ_HOST AMQP host
RABBITMQ_PORT AMQP port, TLS is used for port 443
RABBITMQ_USER AMQP user
RABBITMQ_PASSWORD AMQP password
RABBITMQ_CAFILE AMQP CA file path
RABBITMQ_CERTFILE AMQP certificate + private key file path
RABBITMQ_PUBLISH_EXCHANGE AMQP exchange for the forwarded messages
RABBITMQ_KEEPALIVE_S Time to keep AMQP connection alive between messages
SENTRY_SDN Sentry SDN
AMQP_BRIDGE_CONFIG YAML object containing the AMQP configuration
AWS_ACCESS_KEY_ID AWS access key ID (for SQS authentication)
AWS_SECRET_ACCESS_KEY AWS secret access key (for SQS authentication)
AWS_DEFAULT_REGION AWS region for SQS (e.g., us-east-1)
AWS_ENDPOINT_URL AWS endpoint URL for SQS (optional, for LocalStack)
KAFKA_HOSTS Kafka broker addresses, comma-separated (host:port)
KAFKA_USER Kafka SASL username. If unset, authentication disabled
KAFKA_PASSWORD Kafka SASL password
KAFKA_SECURITY_PROTOCOL Kafka security protocol (default: SASL_SSL)
KAFKA_SASL_MECHANISM Kafka SASL mechanism (default: SCRAM-SHA-512)
KAFKA_TOPICS Space-separated list of Kafka topics (kafka_rx protocol)
WEBHOOK_RECEIVER_EXCHANGE RabbitMQ exchange to consume from (kafka_tx protocol)
ROUTING_KEYS Space-separated RabbitMQ routing key patterns (kafka_tx protocol)
QUEUE_NAME RabbitMQ queue name (kafka_tx)
DRY_RUN If true, Kafka RX/TX log or print payloads only (no Rabbit/Kafka forward)

CKI_DEPLOYMENT_ENVIRONMENT

On staging developments (CKI_DEPLOYMENT_ENVIRONMENT != production), the behaviour changes the following way:

  • On AMQP 0.91, queue_name is not used and a volatile queue with an uuid name is created instead.
  • On AMQP 1.0, queues (queue://) in receiver_urls are translated into topics (topic://).
  • Messages are not forwarded to the destination but printed instead.

AMQP_BRIDGE_CONFIG

AMQP 1.0 Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol amqp10 for AMQP 1.0
datagrepper_url URL of datagrepper to recover messages.
message_topics List of topics/queues to subscribe to.
receiver_urls List of server URLs to connect to.
cert_path Path to the user authentication certificate.
name: foobar
protocol: amqp10
message_topics:
  - queue://Consumer.consumer-name.foo.VirtualTopic.foo.>
  - queue://Consumer.consumer-name.foo.VirtualTopic.bar.>
receiver_urls:
  - amqps://messaging-broker01.foobar.com:5671
  - amqps://messaging-broker02.foobar.com:5671
cert_path: /path/to/cert.pem

AMQP 0.91 Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol amqp091 for AMQP 0.91
routing_keys List of routing keys to subscribe to.
host URL of the AMQP 0.91 server. Can be a list of space-separated strings.
port Port of the AMQP 0.91 server.
cafile Path of the CA certificate of the AMQP 0.91 server.
cert Path of the client certificate of the AMQP 0.91 server.
virtual_host Virtual host of the AMQP 0.91 server.
exchange Name of the exchange to bind.
queue_name Name of the queue to use.
queue_type Override type of the queue to use.
mirror Do not modify message headers or routing keys.
name: foobar
protocol: amqp091
routing_keys:
  - some.routing.key
  - other.key.#
host: host.org
port: 5671
cafile: /path/to/ca.pem
certfile: /path/to/cert.pem
virtual_host: /virtual_host_name
exchange: exchange.name
queue_name: queue.name

SQS Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol sqs for SQS
queue_url Full SQS queue URL
visibility_timeout Duration (in seconds) that received messages are hidden from subsequent retrieve requests (default: 10 * 60)
wait_time_seconds Long polling wait time (default: 20)

The SQS receiver operates in mirror mode, passing through original message attributes without adding bridge headers.

On message processing failures, messages will be re-queued after the visibility_timeout period. The default timeout of 10 minutes matches the CKI RabbitMQ delay queues.

name: foobar
protocol: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
visibility_timeout: 600
wait_time_seconds: 20

Kafka Configuration

Routing keys

Traffic is split into two RabbitMQ namespaces so that consumers who want to receive messages from Kafka can subscribe to kafka.rx.<topic>, while producers who want to send messages to Kafka can use kafka.tx.<topic>.

RX Configuration (Kafka -> RabbitMQ)

Receives messages from Kafka topics and forwards them to the RabbitMQ exchange configured by RABBITMQ_PUBLISH_EXCHANGE. Outgoing routing keys use the kafka.rx. namespace (see Routing keys).

Field Description
name Bridge name; appears on forwarded message headers.
protocol kafka_rx
consumer_config Merged into the confluent-kafka consumer config (e.g. auto.offset.reset, enable.auto.commit).
name: vezdur
protocol: kafka_rx
consumer_config:
  group.id: amqp-bridge-vezdur
  auto.offset.reset: earliest
  enable.auto.commit: false
TX Configuration (RabbitMQ -> Kafka)

Binds a queue to WEBHOOK_RECEIVER_EXCHANGE using ROUTING_KEYS, consumes messages, and produces to Kafka. Only routing keys matching kafka.tx.<topic> are accepted; the Kafka topic name is everything after that prefix (for example kafka.tx.cki.results → topic cki.results). See Routing keys.

RabbitMQ wildcard patterns (#, *) in ROUTING_KEYS only affect which messages reach the queue; the concrete routing key on each message must still follow kafka.tx.* for the bridge to derive the Kafka topic.

Field Description
name Bridge name (logging / operations; not added to Kafka payloads).
protocol kafka_tx
producer_config Merged into the confluent-kafka producer config
name: vezdur
protocol: kafka_tx
producer_config:
  message.send.max.retries: 3

Resulting messages

The messages after being forwarded will have the same body as the original ones, but the headers and routing key will have the following format.

Routing key

For AMQP 0.91 and AMQP 1.0, messages are published with the bridge name prepended: {name}.{routing_key}. For example, bridge foo and original topic tests.finished become foo.tests.finished.

SQS forwards using the message-topic attribute as the routing key (no bridge name prefix).

Kafka RX (kafka_rx) publishes with routing key kafka.rx.<kafka_topic> (no bridge name prefix). <kafka_topic> is the Kafka topic the record came from.

Headers

AMQP 1.0 Headers

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol amqp10
<original-headers> Properties (headers) of the original message.

AMQP 0.91 Headers

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol amqp091
message-amqp091-topic Topic (routing key) of the original message.
<original-headers> Headers of the original message.

Kafka Headers (Kafka -> RabbitMQ)

Kafka message headers are passed through filtered_headers (CKI / x- prefixes dropped like other bridges). These headers are always set:

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol kafka_rx
message-kafka-topic Original Kafka topic.

Kafka Headers (RabbitMQ -> Kafka)

Messages bridged from Kafka TX (RabbitMQ -> Kafka) do not add bridge headers; the original message body is forwarded as-is to the Kafka topic.