cki_tools.amqp_bridge
Single service to act as a bridge between external AMQP 0.91, AMQP 1.0, SQS queues and Kafka topics and a single RabbitMQ exchange.
RabbitMQ
+----------------------------------------+
+--------------+ | +----------+ +------------+ |
| AMQP091 +-----+---->+ Exchange +----+---->+ Consumer_1 | |
+--------------+ | | +----------+ | +------------+ |
+--------------+ | | | +------------+ |
| AMQP10 +-----+ | +---->+ Consumer_2 | |
+--------------+ | | +------------+ |
+--------------+ | | : : |
| SQS +-----+ | +------------+ |
+--------------+ | | | Consumer_n | |
+--------------+ | +----------------------------------------+
| Kafka(rx,tx) +<----+
+--------------+
A different process needs to be spawned for each server to mirror. Each one will subscribe to a given list of topics and forward them to the configured RabbitMQ exchange.
In case of Kafka, the bridge can be bidirectional, with one process for each direction. Kafka TX consumes from RabbitMQ and produces to Kafka, while Kafka RX consumes from Kafka and produces to RabbitMQ.
Usage
usage: python3 -m cki_tools.amqp_bridge [-h]
[--from-datagrepper] [--start START] [--end END]
options:
-h, --help show this help message and exit
Retrieve messages from datagrepper:
--from-datagrepper Retrieve messages from datagrepper instead of listening to AMQP
--start START Start time for datagrepper messages in ISO format
--end END End time for datagrepper messages in ISO format
When called without any arguments, the service will start listening to the configured AMQP servers and forward messages to the RabbitMQ exchange.
When called with --from-datagrepper, the service will retrieve
messages from datagrepper instead of listening to AMQP. The --start
and --end options specify the time range for the retrieved messages.
In case of Kafka, the service can bridge messages from RabbitMQ to Kafka
but also from Kafka to RabbitMQ. This is controlled by the protocol
configuration parameter that, in case of Kafka, can be kafka_rx or kafka_tx.
Configuration
| Environment variable | Description |
|---|---|
CKI_DEPLOYMENT_ENVIRONMENT |
Define the deployment environment (production/staging) |
RABBITMQ_HOST |
AMQP host |
RABBITMQ_PORT |
AMQP port, TLS is used for port 443 |
RABBITMQ_USER |
AMQP user |
RABBITMQ_PASSWORD |
AMQP password |
RABBITMQ_CAFILE |
AMQP CA file path |
RABBITMQ_CERTFILE |
AMQP certificate + private key file path |
RABBITMQ_PUBLISH_EXCHANGE |
AMQP exchange for the forwarded messages |
RABBITMQ_KEEPALIVE_S |
Time to keep AMQP connection alive between messages |
SENTRY_SDN |
Sentry SDN |
AMQP_BRIDGE_CONFIG |
YAML object containing the AMQP configuration |
AWS_ACCESS_KEY_ID |
AWS access key ID (for SQS authentication) |
AWS_SECRET_ACCESS_KEY |
AWS secret access key (for SQS authentication) |
AWS_DEFAULT_REGION |
AWS region for SQS (e.g., us-east-1) |
AWS_ENDPOINT_URL |
AWS endpoint URL for SQS (optional, for LocalStack) |
KAFKA_HOSTS |
Kafka broker addresses, comma-separated (host:port) |
KAFKA_USER |
Kafka SASL username. If unset, authentication disabled |
KAFKA_PASSWORD |
Kafka SASL password |
KAFKA_SECURITY_PROTOCOL |
Kafka security protocol (default: SASL_SSL) |
KAFKA_SASL_MECHANISM |
Kafka SASL mechanism (default: SCRAM-SHA-512) |
KAFKA_TOPICS |
Space-separated list of Kafka topics (kafka_rx protocol) |
WEBHOOK_RECEIVER_EXCHANGE |
RabbitMQ exchange to consume from (kafka_tx protocol) |
ROUTING_KEYS |
Space-separated RabbitMQ routing key patterns (kafka_tx protocol) |
QUEUE_NAME |
RabbitMQ queue name (kafka_tx) |
DRY_RUN |
If true, Kafka RX/TX log or print payloads only (no Rabbit/Kafka forward) |
CKI_DEPLOYMENT_ENVIRONMENT
On staging developments (CKI_DEPLOYMENT_ENVIRONMENT != production), the
behaviour changes the following way:
- On AMQP 0.91,
queue_nameis not used and a volatile queue with an uuid name is created instead. - On AMQP 1.0, queues (
queue://) inreceiver_urlsare translated into topics (topic://). - Messages are not forwarded to the destination but printed instead.
AMQP_BRIDGE_CONFIG
AMQP 1.0 Configuration
| Field | Description |
|---|---|
name |
Name for the bridge. Will be on the forwarded message headers. |
protocol |
amqp10 for AMQP 1.0 |
datagrepper_url |
URL of datagrepper to recover messages. |
message_topics |
List of topics/queues to subscribe to. |
receiver_urls |
List of server URLs to connect to. |
cert_path |
Path to the user authentication certificate. |
name: foobar
protocol: amqp10
message_topics:
- queue://Consumer.consumer-name.foo.VirtualTopic.foo.>
- queue://Consumer.consumer-name.foo.VirtualTopic.bar.>
receiver_urls:
- amqps://messaging-broker01.foobar.com:5671
- amqps://messaging-broker02.foobar.com:5671
cert_path: /path/to/cert.pem
AMQP 0.91 Configuration
| Field | Description |
|---|---|
name |
Name for the bridge. Will be on the forwarded message headers. |
protocol |
amqp091 for AMQP 0.91 |
routing_keys |
List of routing keys to subscribe to. |
host |
URL of the AMQP 0.91 server. Can be a list of space-separated strings. |
port |
Port of the AMQP 0.91 server. |
cafile |
Path of the CA certificate of the AMQP 0.91 server. |
cert |
Path of the client certificate of the AMQP 0.91 server. |
virtual_host |
Virtual host of the AMQP 0.91 server. |
exchange |
Name of the exchange to bind. |
queue_name |
Name of the queue to use. |
queue_type |
Override type of the queue to use. |
mirror |
Do not modify message headers or routing keys. |
name: foobar
protocol: amqp091
routing_keys:
- some.routing.key
- other.key.#
host: host.org
port: 5671
cafile: /path/to/ca.pem
certfile: /path/to/cert.pem
virtual_host: /virtual_host_name
exchange: exchange.name
queue_name: queue.name
SQS Configuration
| Field | Description |
|---|---|
name |
Name for the bridge. Will be on the forwarded message headers. |
protocol |
sqs for SQS |
queue_url |
Full SQS queue URL |
visibility_timeout |
Duration (in seconds) that received messages are hidden from subsequent retrieve requests (default: 10 * 60) |
wait_time_seconds |
Long polling wait time (default: 20) |
The SQS receiver operates in mirror mode, passing through original message attributes without adding bridge headers.
On message processing failures, messages will be re-queued after the
visibility_timeout period. The default timeout of 10 minutes matches the CKI
RabbitMQ delay queues.
name: foobar
protocol: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
visibility_timeout: 600
wait_time_seconds: 20
Kafka Configuration
Routing keys
Traffic is split into two RabbitMQ namespaces so that consumers who want to
receive messages from Kafka can subscribe to kafka.rx.<topic>, while
producers who want to send messages to Kafka can use kafka.tx.<topic>.
RX Configuration (Kafka -> RabbitMQ)
Receives messages from Kafka topics and forwards them to the RabbitMQ exchange
configured by RABBITMQ_PUBLISH_EXCHANGE. Outgoing routing keys use the
kafka.rx. namespace (see Routing keys).
| Field | Description |
|---|---|
name |
Bridge name; appears on forwarded message headers. |
protocol |
kafka_rx |
consumer_config |
Merged into the confluent-kafka consumer config (e.g. auto.offset.reset, enable.auto.commit). |
name: vezdur
protocol: kafka_rx
consumer_config:
group.id: amqp-bridge-vezdur
auto.offset.reset: earliest
enable.auto.commit: false
TX Configuration (RabbitMQ -> Kafka)
Binds a queue to WEBHOOK_RECEIVER_EXCHANGE using ROUTING_KEYS, consumes
messages, and produces to Kafka. Only routing keys matching kafka.tx.<topic>
are accepted; the Kafka topic name is everything after that prefix (for example
kafka.tx.cki.results → topic cki.results). See
Routing keys.
RabbitMQ wildcard patterns (#, *) in ROUTING_KEYS only affect which messages
reach the queue; the concrete routing key on each message must still follow
kafka.tx.* for the bridge to derive the Kafka topic.
| Field | Description |
|---|---|
name |
Bridge name (logging / operations; not added to Kafka payloads). |
protocol |
kafka_tx |
producer_config |
Merged into the confluent-kafka producer config |
name: vezdur
protocol: kafka_tx
producer_config:
message.send.max.retries: 3
Resulting messages
The messages after being forwarded will have the same body as the original ones, but the headers and routing key will have the following format.
Routing key
For AMQP 0.91 and AMQP 1.0, messages are published with the bridge name
prepended: {name}.{routing_key}. For example, bridge foo and original topic
tests.finished become foo.tests.finished.
SQS forwards using the message-topic attribute as the routing key (no bridge
name prefix).
Kafka RX (kafka_rx) publishes with routing key kafka.rx.<kafka_topic> (no
bridge name prefix). <kafka_topic> is the Kafka topic the record came from.
Headers
AMQP 1.0 Headers
| Field | Value |
|---|---|
message-type |
amqp-bridge |
message-amqp-bridge-name |
Name of the bridge. |
message-amqp-bridge-protocol |
amqp10 |
<original-headers> |
Properties (headers) of the original message. |
AMQP 0.91 Headers
| Field | Value |
|---|---|
message-type |
amqp-bridge |
message-amqp-bridge-name |
Name of the bridge. |
message-amqp-bridge-protocol |
amqp091 |
message-amqp091-topic |
Topic (routing key) of the original message. |
<original-headers> |
Headers of the original message. |
Kafka Headers (Kafka -> RabbitMQ)
Kafka message headers are passed through filtered_headers (CKI / x- prefixes
dropped like other bridges). These headers are always set:
| Field | Value |
|---|---|
message-type |
amqp-bridge |
message-amqp-bridge-name |
Name of the bridge. |
message-amqp-bridge-protocol |
kafka_rx |
message-kafka-topic |
Original Kafka topic. |
Kafka Headers (RabbitMQ -> Kafka)
Messages bridged from Kafka TX (RabbitMQ -> Kafka) do not add bridge headers; the original message body is forwarded as-is to the Kafka topic.