Parottasalna

AI, Backend Engineering & Architecture Guides

Learning Notes #51 – Postgres as a Queue using SKIP LOCKED

Yesterday, i came across a blog from inferable.ai https://www.inferable.ai/blog/posts/postgres-skip-locked, which walkthrough about using postgres as a queue. In this blog, i jot down notes on using postgres as a queue for future references.

PostgreSQL is a robust relational database that can be used for more than just storing structured data. With the SKIP LOCKED feature introduced in PostgreSQL 9.5, you can efficiently turn a PostgreSQL table into a job queue for distributed processing.

Why Use PostgreSQL as a Queue?

Using PostgreSQL as a queue can be advantageous because,

  • Familiarity: If you’re already using PostgreSQL, there’s no need for an additional message broker.
  • Durability: PostgreSQL ensures ACID compliance, offering reliability for your job processing.
  • Simplicity: No need to manage another component like RabbitMQ or Kafka

Implementing a Queue with SKIP LOCKED

1. Create a Queue Table

To start, you need a table to store the jobs,

CREATE TABLE job_queue (
    id SERIAL PRIMARY KEY,
    job_data JSONB NOT NULL,
    status TEXT DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

This table has the following columns,

  • id: A unique identifier for each job.
  • job_data: The data or payload for the job.
  • status: Tracks the job’s state (‘pending’, ‘in_progress’, or ‘completed’).
  • created_at: Timestamp of job creation.

2. Insert Jobs into the Queue

Adding jobs is straightforward,

INSERT INTO job_queue (job_data)
VALUES ('{"task": "send_email", "email": "user@example.com"}');

3. Fetch Jobs for Processing with SKIP LOCKED

Workers will fetch jobs from the queue using SELECT ... FOR UPDATE SKIP LOCKED to avoid contention,

WITH next_job AS (
    SELECT id, job_data
    FROM job_queue
    WHERE status = 'pending'
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE job_queue
SET status = 'in_progress'
FROM next_job
WHERE job_queue.id = next_job.id
RETURNING job_queue.id, job_queue.job_data;

Key Points:

  • FOR UPDATE locks the selected row to prevent other workers from picking it up.
  • SKIP LOCKED ensures locked rows are skipped, enabling concurrent workers to operate without waiting.
  • LIMIT 1 processes one job at a time per worker.

4. Mark Jobs as Completed

Once a worker finishes processing a job, it should update the job’s status,

UPDATE job_queue
SET status = 'completed'
WHERE id = $1; -- Replace $1 with the job ID

5. Delete Old or Processed Jobs

To keep the table clean, you can periodically remove completed jobs,

DELETE FROM job_queue
WHERE status = 'completed' AND created_at < NOW() - INTERVAL '30 days';

Example Worker Implementation

Here’s an example of a worker implemented in Python using psycopg2

import psycopg2
from psycopg2.extras import RealDictCursor

connection = psycopg2.connect("dbname=yourdb user=youruser")

while True:
    with connection.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(
            """
            WITH next_job AS (
                SELECT id, job_data
                FROM job_queue
                WHERE status = 'pending'
                FOR UPDATE SKIP LOCKED
                LIMIT 1
            )
            UPDATE job_queue
            SET status = 'in_progress'
            FROM next_job
            WHERE job_queue.id = next_job.id
            RETURNING job_queue.id, job_queue.job_data;
            """
        )

        job = cursor.fetchone()
        if job:
            print(f"Processing job {job['id']}: {job['job_data']}")

            # Simulate job processing
            cursor.execute("UPDATE job_queue SET status = 'completed' WHERE id = %s", (job['id'],))

        else:
            print("No jobs available. Sleeping...")
            time.sleep(5)

    connection.commit()

Considerations

  1. Transaction Isolation: Use the REPEATABLE READ or SERIALIZABLE isolation level cautiously to avoid unnecessary locks.
  2. Row Locking: SKIP LOCKED only skips rows locked by other transactions, not those locked within the same transaction.
  3. Performance: Regularly archive or delete old jobs to prevent the table from growing indefinitely. Consider indexing the status column to improve query performance.
  4. Fault Tolerance: Ensure that workers handle crashes or timeouts gracefully. Use a timeout mechanism to revert jobs stuck in the ‘in_progress’ state.
  5. Scaling: Distribute workers across multiple nodes to handle a higher job throughput.
  6. The SKIP LOCKED clause only applies to row-level locks – the required ROW SHARE table-level lock is still taken normally.
  7. Using SKIP LOCKED provides an inconsistent view of the data by design. This is why it’s perfect for queue-like tables where we want to distribute work, but not suitable for general purpose work where consistency is required.

Discover more from Parottasalna

Subscribe now to keep reading and get access to the full archive.

Continue reading