Home Home

RabbitMQ clustering

Arseni Mourzenko
Founder and lead developer, specializing in developer productivity and code quality
134
articles
January 19, 2019

I'm working on an application which would rely extensively on RabbitMQ for two things: transmitting messages from sensors to the end user, and transmitting the orders from the end user to the actuators (such as lamp switches).

For the first category of the messages, most don't have to be reliable. In other words, if a message is not delivered, it is not very important. For instance, if a temperature probe sends its data once per second in order to display the current temperature on the screen, as well as a chart showing how temperature changes over time, losing one message won't be a big deal, and won't be noticeable by the user.

What would be noticeable, on the other hand, is a downtime of several seconds, i.e. if for, say, ten seconds, there are no messages whatsoever for any of the probes. Such case would have two consequences: (1) the interface would look unresponsive, and (2) it will trigger a switch which indicates that something is wrong with the infrastructure.

To prevent this, I had to find a way to minimize the period of time between the loss of an instance in RabbitMQ cluster and the moment when everything is operational again.

Creating a test app

The test application is a Flask app which sends a message to the MQS every second. Another Python application listens to a queue, and, when a message is received, informs the Flask app about it. The Flask app then correlates this with the message it sent, and records the reception time. This way, one would know how long does it take for a message to reach the destination.

Usually, the log would look like this:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 15:13:34.273 | 15:13:34.375 |        102
      2 | 15:13:35.273 | 15:13:35.372 |         99
      3 | 15:13:36.273 | 15:13:36.370 |         97
      4 | 15:13:37.273 | 15:13:37.371 |         98
      5 | 15:13:38.274 | 15:13:38.374 |        100
      6 | 15:13:39.273 | 15:13:39.371 |         98
      7 | 15:13:40.273 | 15:13:40.372 |         99
      8 | 15:13:41.273 | 15:13:41.377 |        104

If I terminate the script which listens to the queue once the fifth message is transmitted, it looks like this:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 15:14:01.685 | 15:14:01.794 |        109
      2 | 15:14:02.685 | 15:14:02.784 |         99
      3 | 15:14:03.685 | 15:14:03.787 |        102
      4 | 15:14:04.685 | 15:14:04.780 |         95
      5 | 15:14:05.686 | 15:14:05.782 |         96
      6 | 15:14:06.685 |              |        ---
      7 | 15:14:07.685 |              |        ---
      8 | 15:14:08.685 |              |        ---

To test a cluster, I have two virtual machines, each hosting one node of RabbitMQ cluster. Let's call them rabbit1 and rabbit2.

Flask application sends messages to any of those two nodes, randomly. If it doesn't respond within a second, it tries another instance. This means that if all two nodes are reachable, the delay for every message will be around 100 ms. If one node is down, the delay will be 100 ms. half of the time, and 1,100 ms. the other half.

The consumer is connected to rabbit2. If the consumer is notified that the connection was dropped, it reconnects automatically to rabbit1.

During the test, I pause the machine hosting rabbit2 and see what happens. As explained in the introduction, I accept a potential loss of message 6 and maybe message 7, and I expect message 8 and the following to have a delay of 100 ms. or 1,100 ms.

At the end of every test, the virtual machine hosting rabbit2 is brought back, and I run rabbitmqctl stop_app; rabbitmqctl reset; rabbitmqctl start_app in order for the cluster stop showing “Network partition detected” error.

Hacking the heartbeat

Very logically, I started by adjusting the connection parameters according to the documentation of Pika:

parameters = pika.ConnectionParameters(
    [...]
    heartbeat=1,
    socket_timeout=1,
    connection_attempts=1,
    retry_delay=0)

To my surprise, this didn't have the effect I was expecting. My expectations were that heartbeat would be honored as an absolute value. It appeared that this is not exactly the case:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 15:36:40.690 | 15:36:40.795 |        105
      2 | 15:36:41.690 | 15:36:41.798 |        108
      3 | 15:36:42.690 | 15:36:42.787 |         97
      4 | 15:36:43.689 | 15:36:43.775 |         86
      5 | 15:36:44.689 | 15:36:44.782 |         93
      6 | 15:36:45.690 |              |        ---
      7 | 15:36:46.690 |              |        ---
      8 | 15:36:47.690 |              |        ---
      9 | 15:36:48.690 |              |        ---
     10 | 15:36:49.690 |              |        ---
     11 | 15:36:50.690 |              |        ---
     12 | 15:36:51.690 |              |        ---
     13 | 15:36:52.690 | 15:36:53.830 |       1140
     14 | 15:36:53.690 | 15:36:53.813 |        123
     15 | 15:36:54.690 | 15:36:54.786 |         96
     16 | 15:36:55.690 | 15:36:56.795 |       1105
     17 | 15:36:56.690 | 15:36:57.813 |       1123
     18 | 15:36:57.690 | 15:36:57.790 |        100
     19 | 15:36:58.690 | 15:36:58.783 |         93
     20 | 15:36:59.690 | 15:36:59.789 |         99

In Pika, heartbeat mechanism follows specific rules. Although the value indeed adjusts the heartbeat, it doesn't mean that Pika would indeed raise an exception a second later the RabbitMQ node is down. Instead, it's based on a timer which checks periodically that the connection is still up. Since the timer duration is at least six seconds, this means that there is no way for the consumer to be notified of connection loss in less than two seconds.

Since I didn't want to write my own heartbeat checker, I preferred changing the value of the default one with an ugly hack:

connection = pika.BlockingConnection(parameters)
connection._impl.heartbeat._check_interval = 1  # Ugly hack.

This did the job:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 14:32:22.221 | 14:32:22.326 |        105
      2 | 14:32:23.221 | 14:32:23.319 |         98
      3 | 14:32:24.221 | 14:32:24.303 |         82
      4 | 14:32:25.222 | 14:32:25.319 |         97
      5 | 14:32:26.221 | 14:32:26.312 |         91
      6 | 14:32:27.221 |              |        ---
      7 | 14:32:28.221 | 14:32:29.330 |       1109
      8 | 14:32:29.221 | 14:32:29.309 |         88
      9 | 14:32:30.221 | 14:32:30.311 |         90
     10 | 14:32:31.221 | 14:32:31.313 |         92
     11 | 14:32:32.221 | 14:32:32.319 |         98
     12 | 14:32:33.221 | 14:32:34.340 |       1119
     13 | 14:32:34.221 | 14:32:34.313 |         92

Now, I was losing messages for one or two seconds only, which was perfectly acceptable.

Testing durable queues

Discussing the subject with Luke Bakken, one of the top contributors of Pika, he advised me to use durable queues, so I did a few tests with durable queues as well.

I started by adding a policy to enable high availability for the queue:

rabbitmqctl set_policy ha-all "^test-queue-" '{"ha-mode":"all"}' -p sandbox

Then, in channel.queue_declare(), instead of auto_delete=True, I used durable=True. Here's the result:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 14:06:54.492 | 14:06:54.604 |        112
      2 | 14:06:55.493 | 14:06:55.596 |        103
      3 | 14:06:56.492 | 14:06:56.590 |         98
      4 | 14:06:57.492 | 14:06:57.587 |         95
      5 | 14:06:58.492 | 14:06:58.595 |        103
      6 | 14:06:59.492 | 14:07:00.634 |       1142
      7 | 14:07:00.492 | 14:07:00.623 |        131
      8 | 14:07:01.492 | 14:07:02.620 |       1128
      9 | 14:07:02.492 | 14:07:02.591 |         99
     10 | 14:07:03.492 | 14:07:03.586 |         94

The sixth message took a bit more time to be delivered, but wasn't lost. This was the expected behavior, and it works. When I tried to remove the _check_interval hack, the results were disappointing:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 13:49:03.669 | 13:49:03.776 |        107
      2 | 13:49:04.668 | 13:49:04.767 |         99
      3 | 13:49:05.669 | 13:49:05.770 |        101
      4 | 13:49:06.668 | 13:49:06.768 |        100
      5 | 13:49:07.669 | 13:49:07.765 |         96
      6 | 13:49:08.668 | 13:49:18.366 |       9698
      7 | 13:49:09.668 | 13:49:18.394 |       8726
      8 | 13:49:10.668 | 13:49:18.376 |       7708
      9 | 13:49:11.669 | 13:49:18.397 |       6728
     10 | 13:49:12.669 | 13:49:18.420 |       5751
     11 | 13:49:13.668 | 13:49:18.408 |       4740
     12 | 13:49:14.669 | 13:49:18.431 |       3762
     13 | 13:49:15.669 | 13:49:18.441 |       2772
     14 | 13:49:16.669 | 13:49:18.468 |       1799
     15 | 13:49:17.668 | 13:49:18.449 |        781
     16 | 13:49:18.669 | 13:49:19.781 |       1112
     17 | 13:49:19.668 | 13:49:19.772 |        104

Nine messages (depending on the tests, it could be six, or eight, or ten, depending on the moment when the timer triggered) were queued up and delivered all at once. Therefore, the hack is required for both volatile and durable queues.