Re: Proposal: Generic WAL logical messages

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Proposal: Generic WAL logical messages
Дата
Msg-id 20160322114701.GE3790@awork2.anarazel.de
обсуждение исходный текст
Ответ на Re: Proposal: Generic WAL logical messages  (Petr Jelinek <petr@2ndquadrant.com>)
Ответы Re: Proposal: Generic WAL logical messages  (Petr Jelinek <petr@2ndquadrant.com>)
Список pgsql-hackers
On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> Just noticed there is missing symlink in the pg_xlogdump.

>  create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
>  create mode 120000 src/bin/pg_xlogdump/logicalmsgdesc.c

Uh, src/bin/pg_xlogdump/logicalmsgdesc.c shouldn't be there. The symlink
is supposed to be automatically created by the Makefile.

Were you perhaps confused because it showed up in git status? If so,
that's probably because it isn't in
src/bin/pg_xlogdump/.gitignore. Perhaps we should change that file to
ignore *desc.c?

> +      <row>
> +       <entry id="pg-logical-emit-message-text">
> +        <indexterm>
> +         <primary>pg_logical_emit_message</primary>
> +        </indexterm>
> +        <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>,
<parameter>prefix</parameter><type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
 
> +       </entry>
> +       <entry>
> +        void
> +       </entry>
> +       <entry>
> +        Write text logical decoding message. This can be used to pass generic
> +        messages to logical decoding plugins through WAL. The parameter
> +        <parameter>transactional</parameter> specifies if the message should
> +        be part of current transaction or if it should be written immediately
> +        and decoded as soon as the logical decoding reads the record. The
> +        <parameter>prefix</parameter> is textual prefix used by the logical
> +        decoding plugins to easily recognize interesting messages for them.
> +        The <parameter>content</parameter> is the text of the message.
> +       </entry>
> +      </row>

s/write/emit/?

> +
> +    <sect3 id="logicaldecoding-output-plugin-message">
> +     <title>Generic Message Callback</title>
> +
> +     <para>
> +      The optional <function>message_cb</function> callback is called whenever
> +      a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> +    struct LogicalDecodingContext *,
> +    ReorderBufferTXN *txn,
> +    XLogRecPtr message_lsn,
> +    const char *prefix,
> +    Size message_size,
> +    const char *message
> +);

I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?


> +/*
> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> + */
> +static void
> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> +    SnapBuild  *builder = ctx->snapshot_builder;
> +    XLogReaderState *r = buf->record;
> +    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> +    xl_logical_message *message;
> +
> +    if (info != XLOG_LOGICAL_MESSAGE)
> +        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
> +
> +    message = (xl_logical_message *) XLogRecGetData(r);
> +
> +    if (message->transactional)
> +    {
> +        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
> +            return;
> +
> +        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> +                                  buf->endptr,
> +                                  message->message, /* first part of message is prefix */
> +                                  message->message_size,
> +                                  message->message + message->prefix_size);
> +    }
> +    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> +             !SnapBuildXactNeedsSkip(builder, buf->origptr))
> +    {
> +        volatile Snapshot    snapshot_now;
> +        ReorderBuffer       *rb = ctx->reorder;
> +
> +        /* setup snapshot to allow catalog access */
> +        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
> +        SetupHistoricSnapshot(snapshot_now, NULL);
> +        rb->message(rb, NULL, buf->origptr, message->message,
> +                    message->message_size,
> +                    message->message + message->prefix_size);
> +        TeardownHistoricSnapshot(false);
> +    }
> +}

A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for  transactional messages - we can't yet necessarily
builda snapshot.
 
2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
3) This lacks error handling, we surely don't want to error out while  still having the historic snapshot setup
4) Without 3) the volatile is bogus.
5) Misses a ReorderBufferProcessXid() call.

> + * Every message carries prefix to avoid conflicts between different decoding
> + * plugins. The prefix has to be registered before the message using that
> + * prefix can be written to XLOG. The prefix can be registered exactly once to
> + * avoid situation where multiple third party extensions try to use same
> + * prefix.

Outdated afaics?


> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>                  change->data.tp.oldtuple = NULL;
>              }
>              break;
> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            if (change->data.msg.prefix != NULL)
> +                pfree(change->data.msg.prefix);
> +            change->data.msg.prefix = NULL;
> +            if (change->data.msg.message != NULL)
> +                pfree(change->data.msg.message);
> +            change->data.msg.message = NULL;
> +            break;

Hm, this will have some overhead, but I guess the messages won't be
super frequent, and usually not very large.

> +/*
> + * Queue message into a transaction so it can be processed upon commit.
> + */
> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> +                          const char *prefix, Size msg_sz, const char *msg)
> +{
> +    ReorderBufferChange *change;
> +
> +    Assert(xid != InvalidTransactionId);
> +
> +    change = ReorderBufferGetChange(rb);
> +    change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> +    change->data.msg.prefix = pstrdup(prefix);
> +    change->data.msg.message_size = msg_sz;
> +    change->data.msg.message = palloc(msg_sz);
> +    memcpy(change->data.msg.message, msg, msg_sz);
> +
> +    ReorderBufferQueueChange(rb, xid, lsn, change);
> +}

I'm not sure right now if there's any guarantee that the current memory
context is meaningful here? IIRC other long-lived allocations explicitly
use a context?

> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            {
> +                char       *data;
> +                size_t        prefix_size = strlen(change->data.msg.prefix) + 1;
> +
> +                sz += prefix_size + change->data.msg.message_size;
> +                ReorderBufferSerializeReserve(rb, sz);
> +
> +                data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
> +                memcpy(data, change->data.msg.prefix,
> +                       prefix_size);
> +                memcpy(data + prefix_size, change->data.msg.message,
> +                       change->data.msg.message_size);
> +                break;
> +            }

Can you please include the sizes of the blocks explicitly, rather than
relying on 0 termination?


> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
>  PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
>  PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
>  PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
>  logicalmsg_desc, logicalmsg_identify, NULL, NULL)

Did you consider doing this via the standby rmgr instead?

> +typedef struct xl_logical_message
> +{
> +    bool        transactional;                    /* is message transactional? */
> +    size_t        prefix_size;                    /* length of prefix */
> +    size_t        message_size;                    /* size of the message */
> +    char        message[FLEXIBLE_ARRAY_MEMBER];    /* message including the null
> +                                                 * terminated prefx of length
> +                                                 * prefix_size */
> +} xl_logical_message;
>

"prefx".

Greetings,

Andres Freund



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Etsuro Fujita
Дата:
Сообщение: Re: Odd system-column handling in postgres_fdw join pushdown patch
Следующее
От: Andres Freund
Дата:
Сообщение: Re: Timeline following for logical slots