What Are Lazy Queues?
- Lazy Queues are designed to store messages primarily on disk rather than in memory.
- They are optimized for use cases involving large message backlogs where minimizing memory usage is critical.
Key Characteristics
- Disk-Based Storage – Messages are stored on disk immediately upon arrival, rather than being held in memory.
- Low Memory Usage – Only minimal metadata for messages is kept in memory.
- Scalability – Can handle millions of messages without consuming significant memory.
- Message Retrieval – Retrieving messages is slower because messages are fetched from disk.
- Durability – Messages persist on disk, reducing the risk of data loss during RabbitMQ restarts.
Trade-offs
- Latency: Fetching messages from disk is slower than retrieving them from memory.
- Throughput: Not suitable for high-throughput, low-latency applications.
Choose Lazy Queues if
- You need to handle very large backlogs of messages.
- Memory is a constraint in your system.Latency and throughput are less critical.
Implementation
Pre-requisites
1. Install and run RabbitMQ on your local machine.
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
2. Install the pika library
pip install pika
Producer (producer.py)
This script sends a persistent message to a Lazy Queue.
import pika
# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")
# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"
# Declare the custom exchange
channel.exchange_declare(
exchange=exchange_name,
exchange_type="direct", # Direct exchange routes messages based on the routing key
durable=True
)
# Declare a Lazy Queue
channel.queue_declare(
queue=queue_name,
durable=True,
arguments={"x-queue-mode": "lazy"} # Configure the queue as lazy
)
# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key
)
# Publish a message
message = "Hello from the Producer via Custom Exchange!"
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(delivery_mode=2) # Persistent message
)
print(f"Message sent to Lazy Queue via Exchange: {message}")
# Close the connection
connection.close()
Consumer (consumer.py)
import pika
# RabbitMQ connection parameters for localhost
connection_params = pika.ConnectionParameters(host="localhost")
# Connect to RabbitMQ
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
# Custom Exchange and Routing Key
exchange_name = "custom_exchange"
routing_key = "custom_routing_key"
queue_name = "lazy_queue_example"
# Declare the custom exchange
channel.exchange_declare(
exchange=exchange_name,
exchange_type="direct", # Direct exchange routes messages based on the routing key
durable=True
)
# Declare the Lazy Queue
channel.queue_declare(
queue=queue_name,
durable=True,
arguments={"x-queue-mode": "lazy"} # Configure the queue as lazy
)
# Bind the queue to the custom exchange with the routing key
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key
)
# Callback function to process messages
def callback(ch, method, properties, body):
print(f"Received message: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge the message
# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print("Waiting for messages. To exit, press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Stopped consuming.")
# Close the connection
connection.close()
Explanation
- Producer
- Defines a custom exchange (
custom_exchange) of typedirect. - Declares a Lazy Queue (
lazy_queue_example). - Binds the queue to the exchange using a routing key (
custom_routing_key). - Publishes a persistent message via the custom exchange and routing key.
- Defines a custom exchange (
- Consumer
- Declares the same exchange and Lazy Queue to ensure they exist.
- Consumes messages routed to the queue through the custom exchange and routing key.
- Custom Exchange and Binding
- The
directexchange type routes messages based on an exact match of the routing key. - Binding ensures the queue receives messages published to the exchange with the specified key.
- The
- Lazy Queue Behavior
- Messages are stored directly on disk to minimize memory usage.