Syed Jafer K

Its all about Trade-Offs

Learning Notes #16 – Prefetch Count | RabbitMQ

Today, i learnt about prefetch count and how it saves consumer from overwhelming of messages. Messages in RabbitMQ are pushed from the broker to the consumers. The RabbitMQ default prefetch setting gives clients an unlimited buffer, meaning that RabbitMQ, by default, sends as many messages as it can to any consumer that appears ready to accept them. It is, therefore, possible to have more than one message “in-flight” on a channel at any given moment.

In this blog, i have curated notes on how prefetch works ,

What is Prefetch in RabbitMQ?

Prefetch is a mechanism that defines how many messages can be delivered to a consumer at a time before the consumer sends an acknowledgment back to the broker. This ensures that the consumer does not get overwhelmed with too many unprocessed messages, which could lead to high memory usage and potential performance issues.

By default, RabbitMQ does not limit the number of unacknowledged messages a consumer can receive. While this default behavior works for certain scenarios, it can create bottlenecks in systems where consumers process messages at varying speeds.

Why is Prefetch Important?

The prefetch setting directly impacts how efficiently messages are processed in RabbitMQ. Here are some reasons why configuring prefetch is critical:

  1. Avoid Overloading Consumers: Setting an appropriate prefetch value prevents consumers from being overloaded with too many messages, which could degrade performance or even crash the consumer application.
  2. Fair Message Distribution: Prefetch helps distribute messages fairly among consumers in a competing-consumer scenario. Without it, faster consumers might hog more messages, leaving slower ones underutilized.
  3. Improved Throughput: Properly tuning prefetch can balance the load between consumers and ensure optimal throughput for the system.
  4. Memory Efficiency: Limiting the number of unacknowledged messages reduces the risk of excessive memory usage on both the consumer and the broker.

How Prefetch Works

Prefetch can be set at two levels – Channel Level , Consumer Level

Channel Level

Channel Level: Applies to all consumers on the channel. Setting the prefetch value at this level ensures that the total number of unacknowledged messages across all consumers sharing the same channel does not exceed the specified limit.

The channel prefetch value defines the maximum number of unacknowledged deliveries that are permitted on a channel. Setting a limit on this buffer caps the number of received messages before the broker waits for an acknowledgment.

Because a single channel may consume from multiple queues, coordination between them is required to ensure that they don’t pass the limit. This can be a slow process especially when consuming across a cluster, and it is not the recommended approach.

    
    import pika
    
    # Establish a connection and a channel
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # Set channel-level prefetch
    channel.basic_qos(prefetch_count=5, global=True)
    
    # Declare and consume from multiple queues
    channel.queue_declare(queue='queue1')
    channel.queue_declare(queue='queue2')
    
    def callback(ch, method, properties, body):
        print(f"Received: {body}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(queue='queue1', on_message_callback=callback)
    channel.basic_consume(queue='queue2', on_message_callback=callback)
    
    print("Waiting for messages...")
    channel.start_consuming()
    

    Consumer Level

    Applies to individual consumers. This provides granular control over message delivery to each consumer. The best practice is to set a consumer prefetch by setting a limit on the number of unacked messages at the client.

    
    import pika
    
    # Establish a connection and a channel
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # Declare a queue
    channel.queue_declare(queue='task_queue', durable=True)
    
    # Set consumer-level prefetch
    channel.basic_qos(prefetch_count=1, global=False)
    
    # Define a callback function for message processing
    def callback(ch, method, properties, body):
        print(f"Received {body}")
        # Simulate message processing
        time.sleep(1)
        print("Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # Consume messages
    channel.basic_consume(queue='task_queue', on_message_callback=callback)
    
    print("Waiting for messages. To exit press CTRL+C")
    channel.start_consuming()
    

    The key difference in this example is the global=False parameter, which ensures the prefetch count applies only to the individual consumer.

    Best Practices for Setting Prefetch

    1. Analyze Consumer Speed: Determine the average time it takes for a consumer to process a message and set the prefetch value accordingly.
    2. Monitor System Metrics: Use RabbitMQ’s management interface or monitoring tools to track unacknowledged messages and adjust the prefetch setting as needed.
    3. Test and Iterate: Experiment with different prefetch values to find the optimal configuration for your workload.
    4. Combine with Acknowledgments: Always use manual acknowledgments (basic_ack) to ensure that messages are processed successfully before the consumer receives new ones.

    References

    1. Rabbitmq Prefetch – https://www.rabbitmq.com/docs/consumer-prefetch
    2. https://training.cloudamqp.com/course/37