Re: Timeline following for logical slots

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Timeline following for logical slots
Дата
Msg-id 20160315091217.zkf6dtxfr7jhg4pr@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: Timeline following for logical slots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
Ответы Re: Timeline following for logical slots  (Craig Ringer <craig@2ndquadrant.com>)
Re: Timeline following for logical slots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
Список pgsql-hackers
Hi,


On 2016-03-14 20:10:58 -0300, Alvaro Herrera wrote:
> diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
> index fcb0872..7b60b8f 100644
> --- a/src/backend/access/transam/xlogreader.c
> +++ b/src/backend/access/transam/xlogreader.c
> @@ -10,9 +10,11 @@
>   *
>   * NOTES
>   *        See xlogreader.h for more notes on this facility.
> + *
> + *        This file is compiled as both front-end and backend code, so it
> + *        may not use ereport, server-defined static variables, etc.
>   *-------------------------------------------------------------------------
>   */
> -

Huh?

>  #include "postgres.h"
>  
>  #include "access/transam.h"
> @@ -116,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
>          return NULL;
>      }
>  
> +#ifndef FRONTEND
> +    /* Will be loaded on first read */
> +    state->timelineHistory = NIL;
> +#endif
> +
>      return state;
>  }
>  
> @@ -135,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
>      pfree(state->errormsg_buf);
>      if (state->readRecordBuf)
>          pfree(state->readRecordBuf);
> +#ifndef FRONTEND
> +    if (state->timelineHistory)
> +        list_free_deep(state->timelineHistory);
> +#endif

Hm. So we don't support timelines following for frontend code, although
it'd be rather helpful for pg_xlogdump. And possibly pg_rewind.


>      pfree(state->readBuf);
>      pfree(state);
>  }
> @@ -208,10 +219,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
>  
>      if (RecPtr == InvalidXLogRecPtr)
>      {
> +        /* No explicit start point; read the record after the one we just read */
>          RecPtr = state->EndRecPtr;
>  
>          if (state->ReadRecPtr == InvalidXLogRecPtr)
> -            randAccess = true;
> +            randAccess = true;    /* allow readPageTLI to go backwards */

randAccess is doing more than that, so I'm doubtful that comment is an
improvment.


>          /*
>           * RecPtr is pointing to end+1 of the previous WAL record.  If we're
> @@ -223,6 +235,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
>      else
>      {
>          /*
> +         * Caller supplied a position to start at.
> +         *
>           * In this case, the passed-in record pointer should already be
>           * pointing to a valid record starting position.
>           */
> @@ -309,8 +323,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
>          /* XXX: more validation should be done here */
>          if (total_len < SizeOfXLogRecord)
>          {
> -            report_invalid_record(state, "invalid record length at %X/%X",
> -                                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
> +            report_invalid_record(state,
> +                        "invalid record length at %X/%X: wanted %lu, got %u",
> +                                  (uint32) (RecPtr >> 32), (uint32) RecPtr,
> +                                  SizeOfXLogRecord, total_len);
>              goto err;
>          }
>          gotheader = false;
> @@ -466,9 +482,7 @@ err:
>       * Invalidate the xlog page we've cached. We might read from a different
>       * source after failure.
>       */
> -    state->readSegNo = 0;
> -    state->readOff = 0;
> -    state->readLen = 0;
> +    XLogReaderInvalCache(state);

I don't think that "cache" is the right way to describe this.


>  #include <unistd.h>
>  
> -#include "miscadmin.h"
> -

spurious change imo.



>  /*
> - * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
> - * we currently don't have the infrastructure (elog!) to share it.
> + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
> + * in timeline 'tli'.
> + *
> + * Will open, and keep open, one WAL segment stored in the static file
> + * descriptor 'sendFile'. This means if XLogRead is used once, there will
> + * always be one descriptor left open until the process ends, but never
> + * more than one.
> + *
> + * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
> + * in walsender.c but for small differences (such as lack of elog() in
> + * frontend).  Probably these should be merged at some point.
>   */
>  static void
>  XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
> @@ -648,8 +657,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
>      XLogRecPtr    recptr;
>      Size        nbytes;
>  
> +    /*
> +     * Cached state across calls.
> +     */

One line?


>      static int    sendFile = -1;
>      static XLogSegNo sendSegNo = 0;
> +    static TimeLineID sendTLI = 0;
>      static uint32 sendOff = 0;
>  
>      p = buf;
> @@ -664,11 +677,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
>  
>          startoff = recptr % XLogSegSize;
>  
> -        if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
> +        /* Do we need to open a new xlog segment? */
> +        if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
> +            sendTLI != tli)
>          {

s/open a new/open a different/?  New imo has connotations that we don't
really want here.


>              char        path[MAXPGPATH];
>  
> -            /* Switch to another logfile segment */
>              if (sendFile >= 0)
>                  close(sendFile);

E.g. you could just have moved the above comment.


>          /* Need to seek in the file? */
>          if (sendOff != startoff)
>          {
>              if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
> -            {
> -                char        path[MAXPGPATH];
> -
> -                XLogFilePath(path, tli, sendSegNo);
> -
>                  ereport(ERROR,
>                          (errcode_for_file_access(),
>                    errmsg("could not seek in log segment %s to offset %u: %m",
> -                         path, startoff)));
> -            }
> +                         XLogFileNameP(tli, sendSegNo), startoff)));
>              sendOff = startoff;
>          }

Not a serious issue, more a general remark: I'm doubtful that going for
palloc in error situations is good practice. This will be allocated in
the current memory context; without access to the emergency error
reserves.


I'm also getting the feeling that the patch is bordering on doing some
relatively random cleanups mixed in with architectural changes. Makes
things a bit harder to review.


> +static void
> +XLogReadDetermineTimeline(XLogReaderState *state)
> +{
> +    /* Read the history on first time through */
> +    if (state->timelineHistory == NIL)
> +        state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
> +
> +    /*
> +     * Are we reading the record immediately following the one we read last
> +     * time?  If not, then don't use the cached timeline info.
> +     */
> +    if (state->currRecPtr != state->EndRecPtr)
> +    {
> +        state->currTLI = 0;
> +        state->currTLIValidUntil = InvalidXLogRecPtr;
> +    }


Hm. So we grow essentially a second version of the last end position and
the randAccess stuff in XLogReadRecord().


> +    if (state->currTLI == 0)
> +    {
> +        /*
> +         * Something changed; work out what timeline this record is on. We
> +         * might read it from the segment on this TLI or, if the segment is
> +         * also contained by newer timelines, the copy from a newer TLI.
> +         */
> +        state->currTLI = tliOfPointInHistory(state->currRecPtr,
> +                                             state->timelineHistory);
> +
> +        /*
> +         * Look for the most recent timeline that's on the same xlog segment
> +         * as this record, since that's the only one we can assume is still
> +         * readable.
> +         */
> +        while (state->currTLI != ThisTimeLineID &&
> +               state->currTLIValidUntil == InvalidXLogRecPtr)
> +        {
> +            XLogRecPtr    tliSwitch;
> +            TimeLineID    nextTLI;
> +
> +            tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
> +                                       &nextTLI);
> +
> +            /* round ValidUntil down to start of seg containing the switch */
> +            state->currTLIValidUntil =
> +                ((tliSwitch / XLogSegSize) * XLogSegSize);
> +
> +            if (state->currRecPtr >= state->currTLIValidUntil)
> +            {
> +                /*
> +                 * The new currTLI ends on this WAL segment so check the next
> +                 * TLI to see if it's the last one on the segment.
> +                 *
> +                 * If that's the current TLI we'll stop searching.

I don't really understand how we're stopping searching here?

> +                 */
> +                state->currTLI = nextTLI;
> +                state->currTLIValidUntil = InvalidXLogRecPtr;
> +            }
> +        }
> +}


XLogReadDetermineTimeline() doesn't sit quite right with me, I do wonder
whether there's not a simpler way to write this.


> +/*
> + * XLogPageReadCB callback for reading local xlog files
>   *
>   * Public because it would likely be very helpful for someone writing another
>   * output method outside walsender, e.g. in a bgworker.
>   *
> - * TODO: The walsender has it's own version of this, but it relies on the
> + * TODO: The walsender has its own version of this, but it relies on the
>   * walsender's latch being set whenever WAL is flushed. No such infrastructure
>   * exists for normal backends, so we have to do a check/sleep/repeat style of
>   * loop for now.
> @@ -754,46 +897,88 @@ int
>  read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
>      int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
>  {
> -    XLogRecPtr    flushptr,
> +    XLogRecPtr    read_upto,
>                  loc;
>      int            count;
>  
>      loc = targetPagePtr + reqLen;
> +
> +    /* Make sure enough xlog is available... */
>      while (1)
>      {
>          /*
> -         * TODO: we're going to have to do something more intelligent about
> -         * timelines on standbys. Use readTimeLineHistory() and
> -         * tliOfPointInHistory() to get the proper LSN? For now we'll catch
> -         * that case earlier, but the code and TODO is left in here for when
> -         * that changes.
> +         * Check which timeline to get the record from.
> +         *
> +         * We have to do it each time through the loop because if we're in
> +         * recovery as a cascading standby, the current timeline might've
> +         * become historical.
>           */
> -        if (!RecoveryInProgress())
> +        XLogReadDetermineTimeline(state);
> +
> +        if (state->currTLI == ThisTimeLineID)
>          {
> -            *pageTLI = ThisTimeLineID;
> -            flushptr = GetFlushRecPtr();
> +            /*
> +             * We're reading from the current timeline so we might have to
> +             * wait for the desired record to be generated (or, for a standby,
> +             * received & replayed)
> +             */
> +            if (!RecoveryInProgress())
> +            {
> +                *pageTLI = ThisTimeLineID;
> +                read_upto = GetFlushRecPtr();
> +            }
> +            else
> +                read_upto = GetXLogReplayRecPtr(pageTLI);
> +
> +            if (loc <= read_upto)
> +                break;
> +
> +            CHECK_FOR_INTERRUPTS();
> +            pg_usleep(1000L);
>          }
>          else
> -            flushptr = GetXLogReplayRecPtr(pageTLI);
> +        {
> +            /*
> +             * We're on a historical timeline, so limit reading to the switch
> +             * point where we moved to the next timeline.
> +             */
> +            read_upto = state->currTLIValidUntil;

Hm. Is it ok to not check GetFlushRecPtr/GetXLogReplayRecPtr() here? If
so, how come?

> -        if (loc <= flushptr)
> +            /*
> +             * Setting pageTLI to our wanted record's TLI is slightly wrong;
> +             * the page might begin on an older timeline if it contains a
> +             * timeline switch, since its xlog segment will have been copied
> +             * from the prior timeline. This is pretty harmless though, as
> +             * nothing cares so long as the timeline doesn't go backwards.  We
> +             * should read the page header instead; FIXME someday.
> +             */
> +            *pageTLI = state->currTLI;
> +
> +            /* No need to wait on a historical timeline */
>              break;
> -
> -        CHECK_FOR_INTERRUPTS();
> -        pg_usleep(1000L);
> +        }
>      }
>  
> -    /* more than one block available */
> -    if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
> +    if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
> +    {
> +        /*
> +         * more than one block available; read only that block, have caller
> +         * come back if they need more.
> +         */
>          count = XLOG_BLCKSZ;
> -    /* not enough data there */
> -    else if (targetPagePtr + reqLen > flushptr)
> +    }
> +    else if (targetPagePtr + reqLen > read_upto)
> +    {
> +        /* not enough data there */
>          return -1;
> -    /* part of the page available */
> +    }
>      else
> -        count = flushptr - targetPagePtr;
> +    {
> +        /* enough bytes available to satisfy the request */
> +        count = read_upto - targetPagePtr;
> +    }
>  
> -    XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
> +    XLogRead(cur_page, *pageTLI, targetPagePtr, count);

When are we reading less than a page? That should afaik never be required.


> +        /*
> +         * We start reading xlog from the restart lsn, even though in
> +         * CreateDecodingContext we set the snapshot builder up using the
> +         * slot's candidate_restart_lsn. This means we might read xlog we
> +         * don't actually decode rows from, but the snapshot builder might
> +         * need it to get to a consistent point. The point we start returning
> +         * data to *users* at is the candidate restart lsn from the decoding
> +         * context.
> +         */

Uh? Where are we using candidate_restart_lsn that way? I seriously doubt
it is - candidate_restart_lsn is about a potential future restart_lsn,
which we can set once we get reception confirmation from the client.


> @@ -299,6 +312,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
>              CHECK_FOR_INTERRUPTS();
>          }
>  
> +        /* Make sure timeline lookups use the start of the next record */
> +        startptr = ctx->reader->EndRecPtr;

Huh? startptr isn't used after this, so I'm not sure what this even
means?

> +        /*
> +         * The XLogReader will read a page past the valid end of WAL because
> +         * it doesn't know about timelines. When we switch timelines and ask
> +         * it for the first page on the new timeline it will think it has it
> +         * cached, but it'll have the old partial page and say it can't find
> +         * the next record. So flush the cache.
> +         */
> +        XLogReaderInvalCache(ctx->reader);
> +

dito.


> diff --git a/src/test/modules/decoding_failover/decoding_failover.c
b/src/test/modules/decoding_failover/decoding_failover.c
> new file mode 100644
> index 0000000..669e6c4
> --- /dev/null
> +++ b/src/test/modules/decoding_failover/decoding_failover.c

> +
> +/*
> + * Create a new logical slot, with invalid LSN and xid, directly. This does not
> + * use the snapshot builder or logical decoding machinery. It's only intended
> + * for creating a slot on a replica that mirrors the state of a slot on an
> + * upstream master.
> + *
> + * You should immediately decoding_failover_advance_logical_slot(...) it
> + * after creation.
> + */

Uh. I doubt we want this, even if it's formally located in
src/test/modules.  These comments make it appear not to be only intended
for that, and I have serious doubts about the validity of the concept as
is.


This seems to need some more polishing.


Greetings,

Andres Freund



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Oleg Bartunov
Дата:
Сообщение: Re: [PATCH] we have added support for box type in SP-GiST index
Следующее
От: Ashutosh Bapat
Дата:
Сообщение: Re: Re: [COMMITTERS] pgsql: Only try to push down foreign joins if the user mapping OIDs mat