What about round-robin queues with RabbitMQ?

What about round-robin queues with RabbitMQ?

ยท

4 min read

Recently, I've been diving deep into scalability, queues, and other techniques and tools of the ecosystem.

Motivation

During a System Design interview, I had the idea of implementing multiple queues with RabbitMQ. In a way, the goal was to split the workload across all the queues bound to the broker. That means, if 100 messages are sent, having 5 queues, each queue would have 20 messages: thus round-robin.

It's common knowledge that we can scale up workers easily, we have metrics and cloud services for that. But what about equally distributing the workload among queues AND workers?

โ“
Is that even possible with RabbitMQ?

Take a look at the docs:

RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin.

This way, a consumer will receive the message (A), but the next consumer, will receive the next message (B). This way, the workload is distributed across consumers. Close to what I want, but instead, I want to work with multiple queues.

But what is round robin?

Round-robin is a simple load-balancing algorithm used in various computing and networking contexts. In the context of distributing messages or tasks to multiple consumers or resources, round-robin ensures an equal and sequential distribution of items to each available recipient in a cyclic order.

Implementation

For the first implementation, I wanted to start simple. Implement the round-robin logic within the producer. This is how our implementation should look like:

Simulating the implementation with RabbitMQ simulator.

.

โ—
Important to mention: I'm not taking any consideration regarding the size of the message (payload) that goes into each queue.

Producer

import * as amqp from "amqplib";

const exchangeName = "your_direct_exchange";
const messageCount = 20;

async function main() {
  const connection = await amqp.connect("amqp://localhost:5672");
  const channel = await connection.createChannel();

  // Declare the direct exchange
  await channel.assertExchange(exchangeName, "direct");

  // Producer sends messages to the exchange
  for (let i = 0; i < messageCount; i++) {
    const queue = (i % 5) + 1;
    const routingKey = `queue_${queue}`;
    const message = `Message ${i} to ${routingKey}`;

    channel.publish(exchangeName, routingKey, Buffer.from(message));
    console.log(`Sent: ${message}`);
  }

  await channel.close();
  await connection.close();
}

main().catch(console.error);

The producer acts like any other server that is going to publish messages. The code is simple, it produces 20 messages in total, to one exchange, routing them to each bonded queue (queue_1 to queue_5).

This should be the output of the console.log:

Sent: Message 0 to queue_1
Sent: Message 1 to queue_2
Sent: Message 2 to queue_3
Sent: Message 3 to queue_4
Sent: Message 4 to queue_5
Sent: Message 5 to queue_1
Sent: Message 6 to queue_2
Sent: Message 7 to queue_3
Sent: Message 8 to queue_4
Sent: Message 9 to queue_5
Sent: Message 10 to queue_1
Sent: Message 11 to queue_2
Sent: Message 12 to queue_3
Sent: Message 13 to queue_4
Sent: Message 14 to queue_5
Sent: Message 15 to queue_1
Sent: Message 16 to queue_2
Sent: Message 17 to queue_3
Sent: Message 18 to queue_4
Sent: Message 19 to queue_5

That gives us, 4 messages per queue. Exactly what I want.

Consumer

import * as amqp from "amqplib";

const queueNames = ["queue_1", "queue_2", "queue_3", "queue_4", "queue_5"];
const exchangeName = "your_direct_exchange";

async function main() {
  const connection = await amqp.connect("amqp://localhost");

  for (const queueName of queueNames) {
    const channel = await connection.createChannel();

    // Declare and bind queues to the exchange with routing key
    await channel.assertQueue(queueName);
    channel.bindQueue(queueName, exchangeName, queueName);

    console.log(`Consuming from ${queueName}`);

    // Consumer for each queue
    channel.consume(queueName, (message) => {
      if (message) {
        console.log(`Received: ${message.content.toString()}`);
        // Process the message here
        channel.ack(message);
      }
    });
  }
}

main().catch(console.error);

Nothing fancy either. We're just reading the message in each queue. Not the focus here.

Limitations

Many. To be honest, I don't even know RabbitMQ that much to say how many limitations, however, one limitation is more than clear right now.

  • What if we have more than one producer?

  • Will they need to sync? Or is there any simpler approach to keep them round-robin through the queues?

  • How to keep the state of the last queue used and which queue should be used right now?

Next steps of the adventure

I'll keep digging for a way to achieve this solution. Perhaps using Redis as a shared memory between producers may do the job.

Feel free to contribute

Again, I'm not that experienced with RabbitMQ. Feel free to contribute with your ideas, experiments, articles, any anything that can help me.