Message queues are a critical component of many distributed systems, enabling asynchronous communication between different services. While dedicated tools like RabbitMQ or Kafka are often used for this purpose, you can design a lightweight message queue using MySQL for simpler use cases. This blog will demonstrate how to design such a system, ensuring that both producers and consumers can work concurrently without losing or duplicating tasks.


1. Key Features of the MySQL Message Queue

  • Concurrency: Support multiple producers and consumers.
  • Durability: Ensure no tasks are lost.
  • Idempotency: Avoid duplicate processing of tasks.

2. Table Design

We need a single table to represent the message queue. Let’s call it message_queue.

1
2
3
4
5
6
7
CREATE TABLE message_queue (
message_id BIGINT AUTO_INCREMENT PRIMARY KEY, -- Unique ID for each message
payload TEXT NOT NULL, -- The actual message content
status ENUM('PENDING', 'PROCESSING', 'DONE') DEFAULT 'PENDING', -- Task state
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- Message creation timestamp
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP -- Last update timestamp
);

3. Key Operations

a. Adding a Task (Producer)

Producers add tasks to the queue by inserting records into the message_queue table.

1
2
INSERT INTO message_queue (payload)
VALUES ('{"task": "process_file", "file_id": 123}');
b. Fetching a Task (Consumer)

Consumers fetch tasks in the PENDING state and mark them as PROCESSING to prevent other consumers from picking the same task.

1
2
3
4
begin; -- begin a transaction
select message_id into @message_id from message_queue where status = 'PENDING' limit 1 for update skip locked; -- pick a pending task id.
update message_queue set status='PROCESSING' where message_id=@message_id limit 1; -- update it to processing
commit;

Explanation:

  • FOR UPDATE SKIP LOCKED: Prevents other transactions from locking the same rows, enabling concurrent consumers.
  • RETURNING *: Returns the selected row for the consumer to process.
c. Completing a Task (Consumer)

Once a task is successfully processed, the consumer updates its status to DONE.

1
2
3
UPDATE message_queue
SET status = 'DONE', updated_at = CURRENT_TIMESTAMP
WHERE message_id = ?;
d. Retrying Failed Tasks

If a task fails or times out, its status can be reset to PENDING for retrying.

1
2
3
UPDATE message_queue
SET status = 'PENDING', updated_at = CURRENT_TIMESTAMP
WHERE status = 'PROCESSING' AND updated_at < NOW() - INTERVAL 1 HOUR;

4. Ensuring Reliability

  • Transactions: Wrap producer and consumer operations in transactions to ensure atomicity.
  • Indexes: Add an index on status to optimize queries for PENDING tasks.
1
CREATE INDEX idx_status ON message_queue (status);
  • Dead Letter Queue (Optional): Add a mechanism to move permanently failing tasks to a separate table.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE dead_letter_queue (
message_id BIGINT PRIMARY KEY,
payload TEXT NOT NULL,
failure_reason TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO dead_letter_queue (message_id, payload, failure_reason)
SELECT message_id, payload, 'Maximum retries exceeded'
FROM message_queue
WHERE status = 'PROCESSING' AND updated_at < NOW() - INTERVAL 1 DAY;

DELETE FROM message_queue WHERE message_id IN (
SELECT message_id FROM dead_letter_queue
);

5. Example Workflow

  1. Producer adds tasks:

    1
    2
    INSERT INTO message_queue (payload)
    VALUES ('{"task": "send_email", "email_id": 456}');
  2. Consumer fetches tasks:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    mysql> begin;
    Query OK, 0 rows affected (0.01 sec)

    mysql> select message_id into @message_id from message_queue where status = 'PENDING' limit 1 for update skip locked;
    Query OK, 1 row affected (0.00 sec)

    mysql> select @message_id;
    +-------------+
    | @message_id |
    +-------------+
    | 1 |
    +-------------+
    1 row in set (0.00 sec)

    mysql> update message_queue set status='PROCESSING' where message_id=@message_id limit 1;
    Query OK, 1 row affected (0.04 sec)
    Rows matched: 1 Changed: 1 Warnings: 0

    mysql> commit;
    Query OK, 0 rows affected (0.01 sec)

  3. Consumer completes tasks:

    1
    2
    3
    UPDATE message_queue
    SET status = 'DONE', updated_at = CURRENT_TIMESTAMP
    WHERE message_id = 1;
  4. Failed tasks are retried:

    1
    2
    3
    UPDATE message_queue
    SET status = 'PENDING', updated_at = CURRENT_TIMESTAMP
    WHERE status = 'PROCESSING' AND updated_at < NOW() - INTERVAL 1 HOUR;

6. Advantages and Limitations

Advantages:

  • Simple to implement using standard SQL.
  • No additional infrastructure required.
  • Supports concurrent producers and consumers.

Limitations:

  • Scalability is limited by MySQL’s performance under high load.
  • Requires careful handling of retries and dead letter queues.

By leveraging MySQL’s transactional guarantees and features like FOR UPDATE SKIP LOCKED, you can build a lightweight and reliable message queue suitable for many small to medium-scale applications. For more complex use cases, consider dedicated messaging systems like RabbitMQ or Kafka.