Yesterday, i came across a blog from inferable.ai https://www.inferable.ai/blog/posts/postgres-skip-locked, which walkthrough about using postgres as a queue. In this blog, i jot down notes on using postgres as a queue for future references.
PostgreSQL is a robust relational database that can be used for more than just storing structured data. With the SKIP LOCKED feature introduced in PostgreSQL 9.5, you can efficiently turn a PostgreSQL table into a job queue for distributed processing.
Why Use PostgreSQL as a Queue?
Using PostgreSQL as a queue can be advantageous because,
- Familiarity: If you’re already using PostgreSQL, there’s no need for an additional message broker.
- Durability: PostgreSQL ensures ACID compliance, offering reliability for your job processing.
- Simplicity: No need to manage another component like RabbitMQ or Kafka
Implementing a Queue with SKIP LOCKED
1. Create a Queue Table
To start, you need a table to store the jobs,
CREATE TABLE job_queue (
id SERIAL PRIMARY KEY,
job_data JSONB NOT NULL,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
This table has the following columns,
id: A unique identifier for each job.job_data: The data or payload for the job.status: Tracks the job’s state (‘pending’, ‘in_progress’, or ‘completed’).created_at: Timestamp of job creation.
2. Insert Jobs into the Queue
Adding jobs is straightforward,
INSERT INTO job_queue (job_data)
VALUES ('{"task": "send_email", "email": "user@example.com"}');
3. Fetch Jobs for Processing with SKIP LOCKED
Workers will fetch jobs from the queue using SELECT ... FOR UPDATE SKIP LOCKED to avoid contention,
WITH next_job AS (
SELECT id, job_data
FROM job_queue
WHERE status = 'pending'
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE job_queue
SET status = 'in_progress'
FROM next_job
WHERE job_queue.id = next_job.id
RETURNING job_queue.id, job_queue.job_data;
Key Points:
FOR UPDATElocks the selected row to prevent other workers from picking it up.SKIP LOCKEDensures locked rows are skipped, enabling concurrent workers to operate without waiting.LIMIT 1processes one job at a time per worker.
4. Mark Jobs as Completed
Once a worker finishes processing a job, it should update the job’s status,
UPDATE job_queue
SET status = 'completed'
WHERE id = $1; -- Replace $1 with the job ID
5. Delete Old or Processed Jobs
To keep the table clean, you can periodically remove completed jobs,
DELETE FROM job_queue
WHERE status = 'completed' AND created_at < NOW() - INTERVAL '30 days';
Example Worker Implementation
Here’s an example of a worker implemented in Python using psycopg2
import psycopg2
from psycopg2.extras import RealDictCursor
connection = psycopg2.connect("dbname=yourdb user=youruser")
while True:
with connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
WITH next_job AS (
SELECT id, job_data
FROM job_queue
WHERE status = 'pending'
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE job_queue
SET status = 'in_progress'
FROM next_job
WHERE job_queue.id = next_job.id
RETURNING job_queue.id, job_queue.job_data;
"""
)
job = cursor.fetchone()
if job:
print(f"Processing job {job['id']}: {job['job_data']}")
# Simulate job processing
cursor.execute("UPDATE job_queue SET status = 'completed' WHERE id = %s", (job['id'],))
else:
print("No jobs available. Sleeping...")
time.sleep(5)
connection.commit()
Considerations
- Transaction Isolation: Use the
REPEATABLE READorSERIALIZABLEisolation level cautiously to avoid unnecessary locks. - Row Locking:
SKIP LOCKEDonly skips rows locked by other transactions, not those locked within the same transaction. - Performance: Regularly archive or delete old jobs to prevent the table from growing indefinitely. Consider indexing the
statuscolumn to improve query performance. - Fault Tolerance: Ensure that workers handle crashes or timeouts gracefully. Use a timeout mechanism to revert jobs stuck in the ‘in_progress’ state.
- Scaling: Distribute workers across multiple nodes to handle a higher job throughput.
- The
SKIP LOCKEDclause only applies to row-level locks – the requiredROW SHAREtable-level lock is still taken normally. - Using
SKIP LOCKEDprovides an inconsistent view of the data by design. This is why it’s perfect for queue-like tables where we want to distribute work, but not suitable for general purpose work where consistency is required.
