cki.cki_tools.amqp_bridge

AMQP Bridge

Single service to act as a bridge between external AMQP 0.91 and AMQP 1.0 queues and a single RabbitMQ exchange.

                      RabbitMQ
                     +----------------------------------------+
+-----------+        |  +----------+          +------------+  |
| AMQP091_1 +-----+---->+ Exchange +----+---->+ Consumer_1 |  |
+-----------+     |  |  +----------+    |     +------------+  |
+-----------+     |  |                  |     +------------+  |
| AMQP091_2 +-----+  |                  +---->+ Consumer_2 |  |
+-----------+     |  |                        +------------+  |
+-----------+     |  |                        :            :  |
| AMQP091_3 +-----+  |                        +------------+  |
+-----------+     |  |                        | Consumer_n |  |
:           :     :  |                        +------------+  |
+-----------+     |  |                                        |
| AMQP10_n  +-----+  |                                        |
+-----------+        |                                        |
                     +----------------------------------------+

A different process needs to be spawned for each server to mirror. Each one will subscribe to a given list of topics and forward them to the configured RabbitMQ exchange.

Configuration

Environment variable Description
CKI_DEPLOYMENT_ENVIRONMENT Define the deployment environment (production/staging)
RABBITMQ_HOST AMQP host
RABBITMQ_PORT AMQP port, TLS is used for port 443
RABBITMQ_USER AMQP user
RABBITMQ_PASSWORD AMQP password
RABBITMQ_CAFILE AMQP CA file path
RABBITMQ_CERTFILE AMQP certificate + private key file path
RABBITMQ_PUBLISH_EXCHANGE AMQP exchange for the forwarded messages
RABBITMQ_KEEPALIVE_S Time to keep AMQP connection alive between messages
SENTRY_SDN Sentry SDN
AMQP_BRIDGE_CONFIG YAML object containing the AMQP configuration

CKI_DEPLOYMENT_ENVIRONMENT

On staging developments (CKI_DEPLOYMENT_ENVIRONMENT != production), the behaviour changes the following way:

  • On AMQP 0.91, queue_name is not used and a volatile queue with an uuid name is created instead.
  • On AMQP 1.0, queues (queue://) in receiver_urls are translated into topics (topic://).
  • Messages are not forwarded to the destination but printed instead.

AMQP_BRIDGE_CONFIG

AMQP 1.0 Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol amqp10 for AMQP 1.0
message_topics List of topics/queues to subscribe to.
receiver_urls List of server URLs to connect to.
cert_path Path to the user authentication certificate.
{
  "name": "foobar",
  "protocol": "amqp10",
  "message_topics": [
    "queue://Consumer.consumer-name.foo.VirtualTopic.foo.>",
    "queue://Consumer.consumer-name.foo.VirtualTopic.bar.>"
  ],
  "receiver_urls": [
    "amqps://messaging-broker01.foobar.com:5671",
    "amqps://messaging-broker02.foobar.com:5671"
  ],
  "cert_path": "/path/to/cert.pem"
}

AMQP 0.91 Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol amqp091 for AMQP 0.91
routing_keys List of routing keys to subscribe to.
host URL of the AMQP 0.91 server. Can be a list of space-separated strings.
port Port of the AMQP 0.91 server.
cafile Path of the CA certificate of the AMQP 0.91 server.
cert Path of the client certificate of the AMQP 0.91 server.
virtual_host Virtual host of the AMQP 0.91 server.
exchange Name of the exchange to bind.
queue_name Name of the queue to use.
{
  "name": "foobar",
  "protocol": "amqp091",
  "routing_keys": [
    "some.routing.key",
    "other.key.#"
  ],
  "host": "host.org",
  "port": 5671,
  "cafile": "/path/to/ca.pem",
  "certfile": "/path/to/cert.pem",
  "virtual_host": "/virtual_host_name",
  "exchange": "exchange.name",
  "queue_name": "queue.name"
}

Resulting messages

The messages after being forwarded will have the same body as the original ones, but the headers and routing key will have the following format.

Routing key

Messages will be sent to the exchange keeping the original routing key, but prepending the bridge name: {name}.{routing_key}

For a bridge named foo, and a message received with the topic tests.finished, the resulting routing key will be foo.tests.finished.

Headers

AMQP 1.0 Headers

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol amqp10
<original-headers> Properties (headers) of the original message.

AMQP 0.91 Headers

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol amqp091
message-amqp091-topic Topic (routing key) of the original message.
<original-headers> Headers of the original message.