Resilient Messaging Queues

How CKI manages data distribution.

CKI infrastructure is composed of a wide variety of services running on different OpenShift clusters. As the information shared by these services is critical for the CKI setup to operate correctly, communication needs to be handled in a reliable way.

To achieve this, the CKI Team uses an elaborated configuration of RabbitMQ replicated servers, queues and exchanges, creating a safe comunication channel for all the applications, scripts and microservices running both inside and outside Red Hat firewall.

Servers setup

Currently, 3 instances of RabbitMQ are running on AWS in a cluster configuration. With this setup, and following the high availability documentation, all queues are replicated on 2 nodes. This means that each queue has a master and a mirror node, which vary between the queues.

Connecting to the cluster

All nodes in the cluster have a public infra.cki-project.org sub domain name attached and use TLS encryption. The hosts follow the arr-cki-{prod|staging}-rabbitmq-{aws zone}{a|b|c}.infra.cki-project.org naming schema. Currently only us-east-1 zone is being used, so the hosts are the following:

  • arr-cki-prod-rabbitmq-us-east-1a.infra.cki-project.org
  • arr-cki-prod-rabbitmq-us-east-1b.infra.cki-project.org
  • arr-cki-prod-rabbitmq-us-east-1c.infra.cki-project.org

For a correct connection fallback, it is necessary to specify all the cluster nodes when connecting. This feature is supported by pika. A simple interface is provided via cki-lib’s messagequeue.MessageQueue helper that allows setting the list of nodes as a space-separated list on a string. This module is also able to pick up all the necessary configurations from RABBITMQ_{HOST,PORT,USER,PASSWORD,CAFILE} environment variables.

As pika will always connect to the first host of the list and uses the others only after the first one fails, messagequeue.MessageQueue randomizes the order of hosts to distribute the load smoothly across the servers.

Note: Do not hardcode these values. On deployment-all configs, the PUBLIC_RABBITMQ_HOSTS environment variable should be used instead.

Staging deployments

Messages on a queue can only be consumed once, i.e. when a client consumes a message it becomes unavailable for other clients using the same queue.

To enable correct staging deployments, messagequeue.MessageQueue uses the IS_PRODUCTION environment variable flag (via the is_production method) to determine whether to create staging queues instead of the persistent production queues.

These queues are disposable and emulate the behaviour of the production queues without “stealing” the messages from the real consumers.

Staging queues use a UUID4 as name, and have the auto_delete flag enabled, i.e. they will be automatically deleted after the last connection is closed. These temporary queues are bound to the same exchanges, and will start receiving messages after declaration. Messages already present on the production queues will not be available.

Retry messages with DLX

It’s not unusual that something goes wrong while a service is processing a message. In this case, there are 3 important things to consider:

  • The message must not be lost
  • The failure must not block the service
  • The message should be retried automatically

To accomplish this, a retry system was created using Dead Letter Exchanges. DLX + Time-To-Live allows us to requeue a rejected message after a certain period of time.

In this architecture, an extra “retry” queue per production queue is created. Messages waiting for retry are going to patiently sit there until it is time to be reprocessed. Having an extra queue per consumer provides per-application visibility and also per-application reprocess time period configuration.

The message flow is the following:

  • A message arrives at an exchange (e.prod.{name}), and is delivered to a certain queue (q.prod.{name}).
  • The consumer rejects the message calling nack.
  • The message is routed to the e.retry.in exchange via the DLX configuration of the queue. The original queue name is set as the routing key.
  • The e.retry.in exchange adds the message to the q.retry.{name} queue that has a message TTL configured.
  • After the TTL expires, the message is requeued to the e.retry.out exchange via the DLX configuration of the retry queue. The original queue name is kept as the routing key.
  • Using the routing key, the message is routed back to the original queue.
                     x-dead-letter-exchange = e.retry.in
                     x-dead-letter-routing-key = {name}
+---------------+    +---------------+              +----------+
| e.prod.{name} +--->+ q.prod.{name} +------------->+ consumer |
+---------------+    +--+--------+---+              +----------+
                        |        ^
      +-----------------+        +--------------+
      |                                         | routing-key = {name}
      v                                         |
+-----+------+    +----------------+    +-------+-----+
| e.retry.in +--->+ q.retry.{name} +--->+ e.retry.out |
+------------+    +----------------+    +-------------+
                  x-dead-letter-exchange = e.retry.out
                  x-message-ttl = 60 seconds

Configure TTL

To allow an easier configuration, x-message-ttl policy is not set on the retry queues by messagequeue.MessageQueue but is configured as a policy on the RabbitMQ server.

In order to use the messagequeue.MessageQueue retry feature with DLX on a different server, a policy needs to be added as follows:

name: retry queues
apply_to: queues
pattern: cki\.queue\.retry\..*
tags:
  # x-message-ttl: 10 minutes == 1000ms/s * 60s/1min * 10 minutes
  message-ttl: 600000

Use case: GitLab Webhooks

                                                         Anywhere
                                                       +------------+
internal &                                          +->+ Consumer_1 |
gitlab.com         AWS Lambda           AWS EC2     |  +------------+
+--------+    +------------------+    +----------+  |  +------------+
| GitLab +--->+ webhook receiver +--->+ RabbitMQ +--+->+ Consumer_2 |
+--------+    +------------------+    +----------+  |  +------------+
                                                    .  .            .
                                                    .  .            .
                                                    |  +------------+
                                                    +->+ Consumer_n |
                                                       +------------+

GitLab provides a webhook interface to plug in external services to GitLab events. These events include push actions, merge request interactions, and many other activities wanted by other running services, such as bots and data collectors.

As webhooks do not provide a reliable way to handle incidents such as infrastructure issues and consumers downtime, a message queue is vital to ensure that no messages are lost.

To accomplish this, a simple webhook-receiver is deployed using AWS Lambda. This script has the core responsibility of translating and publish the webhooks as RabbitMQ messages.

Being the Single Point of Failure, this script needs to be as simple and robust as possible. After a message is received, the json body is used as message payload and a routing key is generated containing the domain name of the GitLab instance and the project’s path.

kernel-webhooks contains a set of consumers that react to GitLab webhooks for kernel-related projects. Applications subscribe to different messages using routing keys to filter the needed projects and events.

The kernel-webhooks are not the only consumers of these messages. Other services such as the irc-bot and pipeline-herder also consume webhooks for other reasons.

Last modified August 2, 2021: Restructure hacking part of documentation (7c944b5)