Re: Implementing queue semantics (novice)

Поиск
Список
Период
Сортировка
От Andrew Hammond
Тема Re: Implementing queue semantics (novice)
Дата
Msg-id 41E54DE5.9000505@ca.afilias.info
обсуждение исходный текст
Ответ на Implementing queue semantics (novice)  (KÖPFERL Robert <robert.koepferl@sonorys.at>)
Список pgsql-sql
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

The name for what you're looking to build is a concurrent batch
processing system. Here's a basic one.

- -- adding processes

BEGIN;

INSERT INTO queue (queue_id, processing_pid, processing_start,
~ processing_status, foreign_id)
VALUES (DEFAULT, NULL, NULL,
~ (SELECT queue_status_id FROM queue_status WHERE name = 'pending'),
~ foreign_id);

COMMIT;


- -- removing processes

BEGIN;

SELECT queue_id, foreign_id FROM queue
WHERE processing_status = (SELECT queue_status_id FROM queue_status
~ WHERE name = 'pending')
ORDER BY queue_id LIMIT 1
FOR UPDATE;

UPDATE queue
SET processing_pid = ?,
~ processing_start = now(),
~ processing_status = (SELECT queue_status_id FROM queue_status WHERE
~  name = 'active')
WHERE id = ?;

COMMIT;

- -- client code does whatever it's going to do here

BEGIN;

SELECT 1 FROM queue
WHERE queue_id = ? AND processing_pid = ?
FOR UPDATE;

- -- confirm that it exists

DELETE FROM queue WHERE queue_id = ?

INSERT INTO queue_history (queue_id, processing_pid, processing_start,
~ processing_complete, processing_status, foreign_id)
VALUES (queue_id, processing_pid, processing_start, now(),
~ (SELECT queue_status_id FROM queue_status WHERE name = 'done'),
~ foreign_id);

COMMIT;

- -- a seperate process reaps orphaned entries should processing fail.

BEGIN;
SELECT queue_id, processing_pid FROM queue
WHERE now() - processing_start > 'some reasonable interval'::interval
AND processing_status = (SELECT queue_status_id FROM queue_status WHERE
~ name = 'active' FOR UPDATE;

- -- for each entry, check to see if the PID is still running

UPDATE queue
SET
~ processing_pid = NULL,
~ processing_start = NULL,
~ processing_status = (SELECT id FROM queue_status WHERE name = 'pending')
WHERE id = ?;

COMMIT;

There are more complicated approaches available. If you plan to have
multiple machines processing, you probably want to add a processing_node
entry too.


KÖPFERL Robert wrote:
| Hi,
|
| since I am new to writing stored procedures I'd like to ask first bevore I
| do a mistake.
|
| I want to implement some kind of queue (fifo). There are n users/processes
| that add new records to a table and there are m consumers that take out
| these records and process them.
| It's however possible for a consumer to die or loose connection while
| records must not be unprocessed. They may rather be processed twice.
|
| This seems to me as a rather common problem. But also with
atomicy-holes to
| fall into.
| How is this commonly implemented?
|
|
| I can imagine an 'add' and a 'get' function together with one aditional
| 'processed' timestamp-column?
|
|
|
| Thanks for helping me do the right.
|
| ---------------------------(end of broadcast)---------------------------
| TIP 4: Don't 'kill -9' the postmaster


- --
Andrew Hammond    416-673-4138    ahammond@ca.afilias.info
Database Administrator, Afilias Canada Corp.
CB83 2838 4B67 D40F D086 3568 81FC E7E5 27AF 4A9A
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.5 (GNU/Linux)

iD8DBQFB5U3kgfzn5SevSpoRAoesAKCAZkr61I5knCw9tIr8rlO0xri7YACgifrn
N01nXZY8UKmIlTnGkngHKUo=
=UXRk
-----END PGP SIGNATURE-----


В списке pgsql-sql по дате отправления:

Предыдущее
От: KÖPFERL Robert
Дата:
Сообщение: Implementing queue semantics (novice)
Следующее
От: Kaloyan Iliev Iliev
Дата:
Сообщение: Problems with HAVING