RabbitMQ clustering
I'm working on an application which would rely extensively on RabbitMQ for two things: transmitting messages from sensors to the end user, and transmitting the orders from the end user to the actuators (such as lamp switches).
For the first category of the messages, most don't have to be reliable. In other words, if a message is not delivered, it is not very important. For instance, if a temperature probe sends its data once per second in order to display the current temperature on the screen, as well as a chart showing how temperature changes over time, losing one message won't be a big deal, and won't be noticeable by the user.
What would be noticeable, on the other hand, is a downtime of several seconds, i.e. if for, say, ten seconds, there are no messages whatsoever for any of the probes. Such case would have two consequences: (1) the interface would look unresponsive, and (2) it will trigger a switch which indicates that something is wrong with the infrastructure.
To prevent this, I had to find a way to minimize the period of time between the loss of an instance in RabbitMQ cluster and the moment when everything is operational again.
Creating a test app
The test application is a Flask app which sends a message to the MQS every second. Another Python application listens to a queue, and, when a message is received, informs the Flask app about it. The Flask app then correlates this with the message it sent, and records the reception time. This way, one would know how long does it take for a message to reach the destination.
Usually, the log would look like this:
Counter | Sent | Received | Delay (ms)
1 | 15:13:34.273 | 15:13:34.375 | 102
2 | 15:13:35.273 | 15:13:35.372 | 99
3 | 15:13:36.273 | 15:13:36.370 | 97
4 | 15:13:37.273 | 15:13:37.371 | 98
5 | 15:13:38.274 | 15:13:38.374 | 100
6 | 15:13:39.273 | 15:13:39.371 | 98
7 | 15:13:40.273 | 15:13:40.372 | 99
8 | 15:13:41.273 | 15:13:41.377 | 104
If I terminate the script which listens to the queue once the fifth message is transmitted, it looks like this:
Counter | Sent | Received | Delay (ms) 1 | 15:14:01.685 | 15:14:01.794 | 109 2 | 15:14:02.685 | 15:14:02.784 | 99 3 | 15:14:03.685 | 15:14:03.787 | 102 4 | 15:14:04.685 | 15:14:04.780 | 95 5 | 15:14:05.686 | 15:14:05.782 | 96 6 | 15:14:06.685 | | --- 7 | 15:14:07.685 | | --- 8 | 15:14:08.685 | | ---
To test a cluster, I have two virtual machines, each hosting one node of RabbitMQ cluster. Let's call them rabbit1 and rabbit2.
Flask application sends messages to any of those two nodes, randomly. If it doesn't respond within a second, it tries another instance. This means that if all two nodes are reachable, the delay for every message will be around 100 ms. If one node is down, the delay will be 100 ms. half of the time, and 1,100 ms. the other half.
The consumer is connected to rabbit2. If the consumer is notified that the connection was dropped, it reconnects automatically to rabbit1.
During the test, I pause the machine hosting rabbit2 and see what happens. As explained in the introduction, I accept a potential loss of message 6 and maybe message 7, and I expect message 8 and the following to have a delay of 100 ms. or 1,100 ms.
At the end of every test, the virtual machine hosting rabbit2 is brought back, and I run rabbitmqctl stop_app; rabbitmqctl reset; rabbitmqctl start_app
in order for the cluster stop showing “Network partition detected” error.
Hacking the heartbeat
Very logically, I started by adjusting the connection parameters according to the documentation of Pika:
parameters = pika.ConnectionParameters(
[...]
heartbeat=1,
socket_timeout=1,
connection_attempts=1,
retry_delay=0)
To my surprise, this didn't have the effect I was expecting. My expectations were that heartbeat
would be honored as an absolute value. It appeared that this is not exactly the case:
Counter | Sent | Received | Delay (ms) 1 | 15:36:40.690 | 15:36:40.795 | 105 2 | 15:36:41.690 | 15:36:41.798 | 108 3 | 15:36:42.690 | 15:36:42.787 | 97 4 | 15:36:43.689 | 15:36:43.775 | 86 5 | 15:36:44.689 | 15:36:44.782 | 93 6 | 15:36:45.690 | | --- 7 | 15:36:46.690 | | --- 8 | 15:36:47.690 | | --- 9 | 15:36:48.690 | | --- 10 | 15:36:49.690 | | --- 11 | 15:36:50.690 | | --- 12 | 15:36:51.690 | | --- 13 | 15:36:52.690 | 15:36:53.830 | 1140 14 | 15:36:53.690 | 15:36:53.813 | 123 15 | 15:36:54.690 | 15:36:54.786 | 96 16 | 15:36:55.690 | 15:36:56.795 | 1105 17 | 15:36:56.690 | 15:36:57.813 | 1123 18 | 15:36:57.690 | 15:36:57.790 | 100 19 | 15:36:58.690 | 15:36:58.783 | 93 20 | 15:36:59.690 | 15:36:59.789 | 99
In Pika, heartbeat mechanism follows specific rules. Although the value indeed adjusts the heartbeat, it doesn't mean that Pika would indeed raise an exception a second later the RabbitMQ node is down. Instead, it's based on a timer which checks periodically that the connection is still up. Since the timer duration is at least six seconds, this means that there is no way for the consumer to be notified of connection loss in less than two seconds.
Since I didn't want to write my own heartbeat checker, I preferred changing the value of the default one with an ugly hack:
connection = pika.BlockingConnection(parameters)
connection._impl.heartbeat._check_interval = 1 # Ugly hack.
This did the job:
Counter | Sent | Received | Delay (ms) 1 | 14:32:22.221 | 14:32:22.326 | 105 2 | 14:32:23.221 | 14:32:23.319 | 98 3 | 14:32:24.221 | 14:32:24.303 | 82 4 | 14:32:25.222 | 14:32:25.319 | 97 5 | 14:32:26.221 | 14:32:26.312 | 91 6 | 14:32:27.221 | | --- 7 | 14:32:28.221 | 14:32:29.330 | 1109 8 | 14:32:29.221 | 14:32:29.309 | 88 9 | 14:32:30.221 | 14:32:30.311 | 90 10 | 14:32:31.221 | 14:32:31.313 | 92 11 | 14:32:32.221 | 14:32:32.319 | 98 12 | 14:32:33.221 | 14:32:34.340 | 1119 13 | 14:32:34.221 | 14:32:34.313 | 92
Now, I was losing messages for one or two seconds only, which was perfectly acceptable.
Testing durable queues
Discussing the subject with Luke Bakken, one of the top contributors of Pika, he advised me to use durable queues, so I did a few tests with durable queues as well.
I started by adding a policy to enable high availability for the queue:
rabbitmqctl set_policy ha-all "^test-queue-" '{"ha-mode":"all"}' -p sandbox
Then, in channel.queue_declare()
, instead of auto_delete=True
, I used durable=True
. Here's the result:
Counter | Sent | Received | Delay (ms) 1 | 14:06:54.492 | 14:06:54.604 | 112 2 | 14:06:55.493 | 14:06:55.596 | 103 3 | 14:06:56.492 | 14:06:56.590 | 98 4 | 14:06:57.492 | 14:06:57.587 | 95 5 | 14:06:58.492 | 14:06:58.595 | 103 6 | 14:06:59.492 | 14:07:00.634 | 1142 7 | 14:07:00.492 | 14:07:00.623 | 131 8 | 14:07:01.492 | 14:07:02.620 | 1128 9 | 14:07:02.492 | 14:07:02.591 | 99 10 | 14:07:03.492 | 14:07:03.586 | 94
The sixth message took a bit more time to be delivered, but wasn't lost. This was the expected behavior, and it works. When I tried to remove the _check_interval
hack, the results were disappointing:
Counter | Sent | Received | Delay (ms) 1 | 13:49:03.669 | 13:49:03.776 | 107 2 | 13:49:04.668 | 13:49:04.767 | 99 3 | 13:49:05.669 | 13:49:05.770 | 101 4 | 13:49:06.668 | 13:49:06.768 | 100 5 | 13:49:07.669 | 13:49:07.765 | 96 6 | 13:49:08.668 | 13:49:18.366 | 9698 7 | 13:49:09.668 | 13:49:18.394 | 8726 8 | 13:49:10.668 | 13:49:18.376 | 7708 9 | 13:49:11.669 | 13:49:18.397 | 6728 10 | 13:49:12.669 | 13:49:18.420 | 5751 11 | 13:49:13.668 | 13:49:18.408 | 4740 12 | 13:49:14.669 | 13:49:18.431 | 3762 13 | 13:49:15.669 | 13:49:18.441 | 2772 14 | 13:49:16.669 | 13:49:18.468 | 1799 15 | 13:49:17.668 | 13:49:18.449 | 781 16 | 13:49:18.669 | 13:49:19.781 | 1112 17 | 13:49:19.668 | 13:49:19.772 | 104
Nine messages (depending on the tests, it could be six, or eight, or ten, depending on the moment when the timer triggered) were queued up and delivered all at once. Therefore, the hack is required for both volatile and durable queues.