I'm working with the logical replication support in psycopg2, and have found something surprising... this may be my
error,of course!
My sample program is below. It works wonderfully, but in the case when it starts, it re-receives the last message that
ithandled, even with flushing it.
Example:
postgres@localhost:~/wal2pubsub$ python waltest.py
{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}
^C
postgres@localhost:~/wal2pubsub$ python waltest.py
{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}
There was no database activity in that period; it just replayed the same message. Shouldn't it have flushed to the end
ofthe WAL stream and not reprocessed the last message?
--
import psycopg2
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL
conn = psycopg2.connect('dbname=postgres', connection_factory=LogicalReplicationConnection)
cur = conn.cursor()
cur.start_replication(slot_name='test_slot', slot_type=REPLICATION_LOGICAL)
from select import select
from datetime import datetime
def consume(msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
try:
cur.consume_stream(consume)
except:
pass
--
-- Christophe Pettus
xof@thebuild.com