RabbitMQ clustering

Arseni Mourzenko
Founder and lead developer
176
articles
January 20, 2019
Tags: rabbitmq 1 mqs 1 clustering 1 high-availability 1

I'm work­ing on an ap­pli­ca­tion which would rely ex­ten­sive­ly on Rab­bit­MQ for two things: trans­mit­ting mes­sages from sen­sors to the end user, and trans­mit­ting the or­ders from the end user to the ac­tu­a­tors (such as lamp switch­es).

For the first cat­e­go­ry of the mes­sages, most don't have to be re­li­able. In oth­er words, if a mes­sage is not de­liv­ered, it is not very im­por­tant. For in­stance, if a tem­per­a­ture probe sends its data once per sec­ond in or­der to dis­play the cur­rent tem­per­a­ture on the screen, as well as a chart show­ing how tem­per­a­ture changes over time, los­ing one mes­sage won't be a big deal, and won't be no­tice­able by the user.

What would be no­tice­able, on the oth­er hand, is a down­time of sev­er­al sec­onds, i.e. if for, say, ten sec­onds, there are no mes­sages what­so­ev­er for any of the probes. Such case would have two con­se­quences: (1) the in­ter­face would look un­re­spon­sive, and (2) it will trig­ger a switch which in­di­cates that some­thing is wrong with the in­fra­struc­ture.

To pre­vent this, I had to find a way to min­i­mize the pe­ri­od of time be­tween the loss of an in­stance in Rab­bit­MQ clus­ter and the mo­ment when every­thing is op­er­a­tional again.

Cre­at­ing a test app

The test ap­pli­ca­tion is a Flask app which sends a mes­sage to the MQS every sec­ond. An­oth­er Python ap­pli­ca­tion lis­tens to a queue, and, when a mes­sage is re­ceived, in­forms the Flask app about it. The Flask app then cor­re­lates this with the mes­sage it sent, and records the re­cep­tion time. This way, one would know how long does it take for a mes­sage to reach the des­ti­na­tion.

Usu­al­ly, the log would look like this:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 15:13:34.273 | 15:13:34.375 |        102
      2 | 15:13:35.273 | 15:13:35.372 |         99
      3 | 15:13:36.273 | 15:13:36.370 |         97
      4 | 15:13:37.273 | 15:13:37.371 |         98
      5 | 15:13:38.274 | 15:13:38.374 |        100
      6 | 15:13:39.273 | 15:13:39.371 |         98
      7 | 15:13:40.273 | 15:13:40.372 |         99
      8 | 15:13:41.273 | 15:13:41.377 |        104

If I ter­mi­nate the script which lis­tens to the queue once the fifth mes­sage is trans­mit­ted, it looks like this:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 15:14:01.685 | 15:14:01.794 |        109
      2 | 15:14:02.685 | 15:14:02.784 |         99
      3 | 15:14:03.685 | 15:14:03.787 |        102
      4 | 15:14:04.685 | 15:14:04.780 |         95
      5 | 15:14:05.686 | 15:14:05.782 |         96
      6 | 15:14:06.685 |              |        ---
      7 | 15:14:07.685 |              |        ---
      8 | 15:14:08.685 |              |        ---

To test a clus­ter, I have two vir­tu­al ma­chines, each host­ing one node of Rab­bit­MQ clus­ter. Let's call them rab­bit1 and rab­bit2.

Flask ap­pli­ca­tion sends mes­sages to any of those two nodes, ran­dom­ly. If it doesn't re­spond with­in a sec­ond, it tries an­oth­er in­stance. This means that if all two nodes are reach­able, the de­lay for every mes­sage will be around 100 ms. If one node is down, the de­lay will be 100 ms. half of the time, and 1,100 ms. the oth­er half.

The con­sumer is con­nect­ed to rab­bit2. If the con­sumer is no­ti­fied that the con­nec­tion was dropped, it re­con­nects au­to­mat­i­cal­ly to rab­bit1.

Dur­ing the test, I pause the ma­chine host­ing rab­bit2 and see what hap­pens. As ex­plained in the in­tro­duc­tion, I ac­cept a po­ten­tial loss of mes­sage 6 and maybe mes­sage 7, and I ex­pect mes­sage 8 and the fol­low­ing to have a de­lay of 100 ms. or 1,100 ms.

At the end of every test, the vir­tu­al ma­chine host­ing rab­bit2 is brought back, and I run rabbitmqctl stop_app; rabbitmqctl reset; rabbitmqctl start_app in or­der for the clus­ter stop show­ing “Net­work par­ti­tion de­tect­ed” er­ror.

Hack­ing the heart­beat

Very log­i­cal­ly, I start­ed by ad­just­ing the con­nec­tion pa­ra­me­ters ac­cord­ing to the doc­u­men­ta­tion of Pika:

parameters = pika.ConnectionParameters(
    [...]
    heartbeat=1,
    socket_timeout=1,
    connection_attempts=1,
    retry_delay=0)

To my sur­prise, this didn't have the ef­fect I was ex­pect­ing. My ex­pec­ta­tions were that heartbeat would be hon­ored as an ab­solute val­ue. It ap­peared that this is not ex­act­ly the case:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 15:36:40.690 | 15:36:40.795 |        105
      2 | 15:36:41.690 | 15:36:41.798 |        108
      3 | 15:36:42.690 | 15:36:42.787 |         97
      4 | 15:36:43.689 | 15:36:43.775 |         86
      5 | 15:36:44.689 | 15:36:44.782 |         93
      6 | 15:36:45.690 |              |        ---
      7 | 15:36:46.690 |              |        ---
      8 | 15:36:47.690 |              |        ---
      9 | 15:36:48.690 |              |        ---
     10 | 15:36:49.690 |              |        ---
     11 | 15:36:50.690 |              |        ---
     12 | 15:36:51.690 |              |        ---
     13 | 15:36:52.690 | 15:36:53.830 |       1140
     14 | 15:36:53.690 | 15:36:53.813 |        123
     15 | 15:36:54.690 | 15:36:54.786 |         96
     16 | 15:36:55.690 | 15:36:56.795 |       1105
     17 | 15:36:56.690 | 15:36:57.813 |       1123
     18 | 15:36:57.690 | 15:36:57.790 |        100
     19 | 15:36:58.690 | 15:36:58.783 |         93
     20 | 15:36:59.690 | 15:36:59.789 |         99

In Pika, heart­beat mech­a­nism fol­lows spe­cif­ic rules. Al­though the val­ue in­deed ad­justs the heart­beat, it doesn't mean that Pika would in­deed raise an ex­cep­tion a sec­ond lat­er the Rab­bit­MQ node is down. In­stead, it's based on a timer which checks pe­ri­od­i­cal­ly that the con­nec­tion is still up. Since the timer du­ra­tion is at least six sec­onds, this means that there is no way for the con­sumer to be no­ti­fied of con­nec­tion loss in less than two sec­onds.

Since I didn't want to write my own heart­beat check­er, I pre­ferred chang­ing the val­ue of the de­fault one with an ugly hack:

connection = pika.BlockingConnection(parameters)
connection._impl.heartbeat._check_interval = 1  # Ugly hack.

This did the job:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 14:32:22.221 | 14:32:22.326 |        105
      2 | 14:32:23.221 | 14:32:23.319 |         98
      3 | 14:32:24.221 | 14:32:24.303 |         82
      4 | 14:32:25.222 | 14:32:25.319 |         97
      5 | 14:32:26.221 | 14:32:26.312 |         91
      6 | 14:32:27.221 |              |        ---
      7 | 14:32:28.221 | 14:32:29.330 |       1109
      8 | 14:32:29.221 | 14:32:29.309 |         88
      9 | 14:32:30.221 | 14:32:30.311 |         90
     10 | 14:32:31.221 | 14:32:31.313 |         92
     11 | 14:32:32.221 | 14:32:32.319 |         98
     12 | 14:32:33.221 | 14:32:34.340 |       1119
     13 | 14:32:34.221 | 14:32:34.313 |         92

Now, I was los­ing mes­sages for one or two sec­onds only, which was per­fect­ly ac­cept­able.

Test­ing durable queues

Dis­cussing the sub­ject with Luke Bakken, one of the top con­trib­u­tors of Pika, he ad­vised me to use durable queues, so I did a few tests with durable queues as well.

I start­ed by adding a pol­i­cy to en­able high avail­abil­i­ty for the queue:

rabbitmqctl set_policy ha-all "^test-queue-" '{"ha-mode":"all"}' -p sandbox

Then, in channel.queue_declare(), in­stead of auto_delete=True, I used durable=True. Here's the re­sult:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 14:06:54.492 | 14:06:54.604 |        112
      2 | 14:06:55.493 | 14:06:55.596 |        103
      3 | 14:06:56.492 | 14:06:56.590 |         98
      4 | 14:06:57.492 | 14:06:57.587 |         95
      5 | 14:06:58.492 | 14:06:58.595 |        103
      6 | 14:06:59.492 | 14:07:00.634 |       1142
      7 | 14:07:00.492 | 14:07:00.623 |        131
      8 | 14:07:01.492 | 14:07:02.620 |       1128
      9 | 14:07:02.492 | 14:07:02.591 |         99
     10 | 14:07:03.492 | 14:07:03.586 |         94

The sixth mes­sage took a bit more time to be de­liv­ered, but wasn't lost. This was the ex­pect­ed be­hav­ior, and it works. When I tried to re­move the _check_interval hack, the re­sults were dis­ap­point­ing:

Counter |     Sent     |   Received   | Delay (ms)
      1 | 13:49:03.669 | 13:49:03.776 |        107
      2 | 13:49:04.668 | 13:49:04.767 |         99
      3 | 13:49:05.669 | 13:49:05.770 |        101
      4 | 13:49:06.668 | 13:49:06.768 |        100
      5 | 13:49:07.669 | 13:49:07.765 |         96
      6 | 13:49:08.668 | 13:49:18.366 |       9698
      7 | 13:49:09.668 | 13:49:18.394 |       8726
      8 | 13:49:10.668 | 13:49:18.376 |       7708
      9 | 13:49:11.669 | 13:49:18.397 |       6728
     10 | 13:49:12.669 | 13:49:18.420 |       5751
     11 | 13:49:13.668 | 13:49:18.408 |       4740
     12 | 13:49:14.669 | 13:49:18.431 |       3762
     13 | 13:49:15.669 | 13:49:18.441 |       2772
     14 | 13:49:16.669 | 13:49:18.468 |       1799
     15 | 13:49:17.668 | 13:49:18.449 |        781
     16 | 13:49:18.669 | 13:49:19.781 |       1112
     17 | 13:49:19.668 | 13:49:19.772 |        104

Nine mes­sages (de­pend­ing on the tests, it could be six, or eight, or ten, de­pend­ing on the mo­ment when the timer trig­gered) were queued up and de­liv­ered all at once. There­fore, the hack is re­quired for both volatile and durable queues.