Reducing memory consumption for pending inval messages

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Reducing memory consumption for pending inval messages
Дата
Msg-id 2380555.1622395376@sss.pgh.pa.us
обсуждение исходный текст
Ответы Re: Reducing memory consumption for pending inval messages  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
I got interested in $SUBJECT as a result of the thread at [1].
It turns out that the existing implementation in inval.c is quite
inefficient when a lot of individual commands each register just
a few invalidations --- but a few invalidations per command is
pretty typical.  As an example, consider

DO $do$
  BEGIN
    FOR i IN 1..200000000 LOOP
      execute 'create function foo' || i || '() returns int language sql as $$select 1$$';
      if (i % 100000 = 0) then
        raise notice '% loops done', i;
      end if;
    END LOOP;
  END
$do$;

Each CREATE FUNCTION registers three invalidation events, which
minimally would require 48 bytes ... but the current code actually
eats about 2kB per iteration, because we allocate a pair of new
"chunks" for each command.  The chunks themselves are intended
to hold 32 entries which'd take 512 bytes --- but there's some
overhead, causing aset.c to round up to 1024 bytes.  Ouch.

It gets worse though.  If you wrap the commands in subtransactions:

DO $do$
  BEGIN
    FOR i IN 1..200000000 LOOP
     begin
      execute 'create function foo' || i || '() returns int language sql as $$select 1$$';
      if (i % 100000 = 0) then
        raise notice '% loops done', i;
      end if;
     exception when division_by_zero then null;
     end;
    END LOOP;
  END
$do$;

the space consumption balloons to about 8kB per iteration, because the
chunks are allocated in the per-subtransaction CurTransactionContext,
which is given 8kB right off the bat.  In common cases this'll be the
*only* allocation in that context.

We can do a lot better, by exploiting what we know about the usage
patterns of invalidation requests.  New requests are always added to
the latest sublist, and the only management actions are (1) merge
latest sublist into next-to-latest sublist, or (2) drop latest
sublist, if a subtransaction aborts.  This means we could perfectly
well keep all the requests in a single, densely packed array in
TopTransactionContext, and replace the "list" control structures
with indexes into that array.  The attached patch does that.

I don't see any particular speed differential with this (unsurprising,
since the other actions that an inval event logs and then triggers
will surely swamp inval.c's management overhead).  But the space
consumption decreases gratifyingly.

There is one notable new assumption I had to make for this.  At end
of a subtransaction, we have to merge its inval events into the
"PriorCmd" list of the parent subtransaction.  (It has to be the
PriorCmd list, not the CurrentCmd list, because these events have
already been processed locally; we don't want to do that again.)
This means the parent's CurrentCmd list has to be empty at that
instant, else we'd be trying to merge sublists that aren't adjacent
in the array.  As far as I can tell, this is always true: the patch's
check for it doesn't trigger in a check-world run.  And there's an
argument that it must be true for semantic consistency (see comments
in patch).  So if that check ever fails, it probably means there is a
missing CommandCounterIncrement somewhere.  Still, this could use more
review and testing.

BTW, I noted with some amusement that this comment in
xactGetCommittedInvalidationMessages:

     * ... Maintain the order that they
     * would be processed in by AtEOXact_Inval(), to ensure emulated behaviour
     * in redo is as similar as possible to original. We want the same bugs,
     * if any, not new ones.

is making a claim that the existing code there actually does not
satisfy.  In particular it fails to maintain the correct ordering of
catcache vs. relcache events.  The patch fixes that, but I wonder
whether there is anything we need to do in the back branches.  I'm
inclined to think that it doesn't matter beyond the small efficiency
risk inherent in doing (some) relcache flushes before catcache
flushes.  The code already says that the order of events within any
one list isn't supposed to matter.

Anyway, I'll add this to the next CF.

            regards, tom lane

[1] https://www.postgresql.org/message-id/flat/88986113-6b01-452b-89d0-9492b6a79e33%40www.fastmail.com

diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index dcfd9e8389..e4dca1e1d4 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -71,11 +71,6 @@
  *    manipulating the init file is in relcache.c, but we keep track of the
  *    need for it here.
  *
- *    The request lists proper are kept in CurTransactionContext of their
- *    creating (sub)transaction, since they can be forgotten on abort of that
- *    transaction but must be kept till top-level commit otherwise.  For
- *    simplicity we keep the controlling list-of-lists in TopTransactionContext.
- *
  *    Currently, inval messages are sent without regard for the possibility
  *    that the object described by the catalog tuple might be a session-local
  *    object such as a temporary table.  This is because (1) this code has
@@ -120,35 +115,86 @@


 /*
- * To minimize palloc traffic, we keep pending requests in successively-
- * larger chunks (a slightly more sophisticated version of an expansible
- * array).  All request types can be stored as SharedInvalidationMessage
- * records.  The ordering of requests within a list is never significant.
+ * Pending requests are stored as ready-to-send SharedInvalidationMessages.
+ * We keep the messages themselves in arrays in TopTransactionContext
+ * (there are separate arrays for catcache and relcache messages).  Control
+ * information is kept in a chain of TransInvalidationInfo structs, also
+ * allocated in TopTransactionContext.  (We could keep a subtransaction's
+ * TransInvalidationInfo in its CurTransactionContext; but that's more
+ * wasteful not less so, since in very many scenarios it'd be the only
+ * allocation in the subtransaction's CurTransactionContext.)
+ *
+ * We can store the message arrays densely, and yet avoid moving data around
+ * within an array, because within any one subtransaction we need only
+ * distinguish between messages emitted by prior commands and those emitted
+ * by the current command.  Once a command completes and we've done local
+ * processing on its messages, we can fold those into the prior-commands
+ * messages just by changing array indexes in the TransInvalidationInfo
+ * struct.  Similarly, we need distinguish messages of prior subtransactions
+ * from those of the current subtransaction only until the subtransaction
+ * completes, after which we adjust the array indexes in the parent's
+ * TransInvalidationInfo to include the subtransaction's messages.
+ *
+ * The ordering of the individual messages within a command's or
+ * subtransaction's output is not considered significant, although this
+ * implementation happens to preserve the order in which they were queued.
+ * (Previous versions of this code did not preserve it.)
+ *
+ * For notational convenience, control information is kept in two-element
+ * arrays, the first for catcache messages and the second for relcache
+ * messages.
  */
-typedef struct InvalidationChunk
+#define CatCacheMsgs 0
+#define RelCacheMsgs 1
+
+/* Pointers to main arrays in TopTransactionContext */
+typedef struct InvalMessageArray
 {
-    struct InvalidationChunk *next; /* list link */
-    int            nitems;            /* # items currently stored in chunk */
-    int            maxitems;        /* size of allocated array in this chunk */
-    SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
-} InvalidationChunk;
+    SharedInvalidationMessage *msgs;    /* palloc'd array (can be expanded) */
+    int            maxmsgs;        /* current allocated size of array */
+} InvalMessageArray;

-typedef struct InvalidationListHeader
+static InvalMessageArray InvalMessageArrays[2];
+
+/* Control information for one logical group of messages */
+typedef struct InvalidationMsgsGroup
 {
-    InvalidationChunk *cclist;    /* list of chunks holding catcache msgs */
-    InvalidationChunk *rclist;    /* list of chunks holding relcache msgs */
-} InvalidationListHeader;
+    int            firstmsg[2];    /* first index in relevant array */
+    int            nextmsg[2];        /* last+1 index */
+} InvalidationMsgsGroup;
+
+/* Macros to help preserve InvalidationMsgsGroup abstraction */
+#define SetSubGroupToFollow(targetgroup, priorgroup, subgroup) \
+    do { \
+        (targetgroup)->firstmsg[subgroup] = \
+            (targetgroup)->nextmsg[subgroup] = \
+            (priorgroup)->nextmsg[subgroup]; \
+    } while (0)
+
+#define SetGroupToFollow(targetgroup, priorgroup) \
+    do { \
+        SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \
+        SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \
+    } while (0)
+
+#define NumMessagesInSubGroup(group, subgroup) \
+    ((group)->nextmsg[subgroup] - (group)->firstmsg[subgroup])
+
+#define NumMessagesInGroup(group) \
+    (NumMessagesInSubGroup(group, CatCacheMsgs) + \
+     NumMessagesInSubGroup(group, RelCacheMsgs))
+

 /*----------------
- * Invalidation info is divided into two lists:
+ * Invalidation messages are divided into two groups:
  *    1) events so far in current command, not yet reflected to caches.
  *    2) events in previous commands of current transaction; these have
  *       been reflected to local caches, and must be either broadcast to
  *       other backends or rolled back from local cache when we commit
  *       or abort the transaction.
- * Actually, we need two such lists for each level of nested transaction,
+ * Actually, we need such groups for each level of nested transaction,
  * so that we can discard events from an aborted subtransaction.  When
- * a subtransaction commits, we append its lists to the parent's lists.
+ * a subtransaction commits, we append its events to the parent's groups.
  *
  * The relcache-file-invalidated flag can just be a simple boolean,
  * since we only act on it at transaction commit; we don't care which
@@ -164,11 +210,11 @@ typedef struct TransInvalidationInfo
     /* Subtransaction nesting depth */
     int            my_level;

-    /* head of current-command event list */
-    InvalidationListHeader CurrentCmdInvalidMsgs;
+    /* Events emitted by current command */
+    InvalidationMsgsGroup CurrentCmdInvalidMsgs;

-    /* head of previous-commands event list */
-    InvalidationListHeader PriorCmdInvalidMsgs;
+    /* Events emitted by previous commands of this (sub)transaction */
+    InvalidationMsgsGroup PriorCmdInvalidMsgs;

     /* init file must be invalidated? */
     bool        RelcacheInitFileInval;
@@ -176,10 +222,6 @@ typedef struct TransInvalidationInfo

 static TransInvalidationInfo *transInvalInfo = NULL;

-static SharedInvalidationMessage *SharedInvalidMessagesArray;
-static int    numSharedInvalidMessagesArray;
-static int    maxSharedInvalidMessagesArray;
-
 /* GUC storage */
 int            debug_invalidate_system_caches_always = 0;

@@ -217,124 +259,118 @@ static struct RELCACHECALLBACK
 static int    relcache_callback_count = 0;

 /* ----------------------------------------------------------------
- *                Invalidation list support functions
- *
- * These three routines encapsulate processing of the "chunked"
- * representation of what is logically just a list of messages.
+ *                Invalidation subgroup support functions
  * ----------------------------------------------------------------
  */

 /*
  * AddInvalidationMessage
- *        Add an invalidation message to a list (of chunks).
+ *        Add an invalidation message to a (sub)group.
+ *
+ * The group must be the last active one, since we assume we can add to the
+ * end of the relevant InvalMessageArray.
  *
- * Note that we do not pay any great attention to maintaining the original
- * ordering of the messages.
+ * subgroup must be CatCacheMsgs or RelCacheMsgs.
  */
 static void
-AddInvalidationMessage(InvalidationChunk **listHdr,
-                       SharedInvalidationMessage *msg)
+AddInvalidationMessage(InvalidationMsgsGroup *group, int subgroup,
+                       const SharedInvalidationMessage *msg)
 {
-    InvalidationChunk *chunk = *listHdr;
+    InvalMessageArray *ima = &InvalMessageArrays[subgroup];
+    int            nextindex = group->nextmsg[subgroup];

-    if (chunk == NULL)
-    {
-        /* First time through; create initial chunk */
-#define FIRSTCHUNKSIZE 32
-        chunk = (InvalidationChunk *)
-            MemoryContextAlloc(CurTransactionContext,
-                               offsetof(InvalidationChunk, msgs) +
-                               FIRSTCHUNKSIZE * sizeof(SharedInvalidationMessage));
-        chunk->nitems = 0;
-        chunk->maxitems = FIRSTCHUNKSIZE;
-        chunk->next = *listHdr;
-        *listHdr = chunk;
-    }
-    else if (chunk->nitems >= chunk->maxitems)
+    if (nextindex >= ima->maxmsgs)
     {
-        /* Need another chunk; double size of last chunk */
-        int            chunksize = 2 * chunk->maxitems;
-
-        chunk = (InvalidationChunk *)
-            MemoryContextAlloc(CurTransactionContext,
-                               offsetof(InvalidationChunk, msgs) +
-                               chunksize * sizeof(SharedInvalidationMessage));
-        chunk->nitems = 0;
-        chunk->maxitems = chunksize;
-        chunk->next = *listHdr;
-        *listHdr = chunk;
+        if (ima->msgs == NULL)
+        {
+            /* Create new storage array in TopTransactionContext */
+            int            reqsize = 32;    /* arbitrary */
+
+            ima->msgs = (SharedInvalidationMessage *)
+                MemoryContextAlloc(TopTransactionContext,
+                                   reqsize * sizeof(SharedInvalidationMessage));
+            ima->maxmsgs = reqsize;
+            Assert(nextindex == 0);
+        }
+        else
+        {
+            /* Enlarge storage array */
+            int            reqsize = 2 * ima->maxmsgs;
+
+            ima->msgs = (SharedInvalidationMessage *)
+                repalloc(ima->msgs,
+                         reqsize * sizeof(SharedInvalidationMessage));
+            ima->maxmsgs = reqsize;
+        }
     }
-    /* Okay, add message to current chunk */
-    chunk->msgs[chunk->nitems] = *msg;
-    chunk->nitems++;
+    /* Okay, add message to current group */
+    ima->msgs[nextindex] = *msg;
+    group->nextmsg[subgroup]++;
 }

 /*
- * Append one list of invalidation message chunks to another, resetting
- * the source chunk-list pointer to NULL.
+ * Append one subgroup of invalidation messages to another, resetting
+ * the source subgroup to empty.
  */
 static void
-AppendInvalidationMessageList(InvalidationChunk **destHdr,
-                              InvalidationChunk **srcHdr)
+AppendInvalidationMessageSubGroup(InvalidationMsgsGroup *dest,
+                                  InvalidationMsgsGroup *src,
+                                  int subgroup)
 {
-    InvalidationChunk *chunk = *srcHdr;
-
-    if (chunk == NULL)
-        return;                    /* nothing to do */
-
-    while (chunk->next != NULL)
-        chunk = chunk->next;
+    /* Messages must be adjacent in main array */
+    Assert(dest->nextmsg[subgroup] == src->firstmsg[subgroup]);

-    chunk->next = *destHdr;
+    /* ... which makes this easy: */
+    dest->nextmsg[subgroup] = src->nextmsg[subgroup];

-    *destHdr = *srcHdr;
-
-    *srcHdr = NULL;
+    /*
+     * This is handy for some callers and irrelevant for others.  But we do it
+     * always, reasoning that it's bad to leave different groups pointing at
+     * the same fragment of the message array.
+     */
+    SetSubGroupToFollow(src, dest, subgroup);
 }

 /*
- * Process a list of invalidation messages.
+ * Process a subgroup of invalidation messages.
  *
  * This is a macro that executes the given code fragment for each message in
- * a message chunk list.  The fragment should refer to the message as *msg.
+ * a message subgroup.  The fragment should refer to the message as *msg.
  */
-#define ProcessMessageList(listHdr, codeFragment) \
+#define ProcessMessageSubGroup(group, subgroup, codeFragment) \
     do { \
-        InvalidationChunk *_chunk; \
-        for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
+        int        _msgindex = (group)->firstmsg[subgroup]; \
+        int        _endmsg = (group)->nextmsg[subgroup]; \
+        for (; _msgindex < _endmsg; _msgindex++) \
         { \
-            int        _cindex; \
-            for (_cindex = 0; _cindex < _chunk->nitems; _cindex++) \
-            { \
-                SharedInvalidationMessage *msg = &_chunk->msgs[_cindex]; \
-                codeFragment; \
-            } \
+            SharedInvalidationMessage *msg = \
+                &InvalMessageArrays[subgroup].msgs[_msgindex]; \
+            codeFragment; \
         } \
     } while (0)

 /*
- * Process a list of invalidation messages group-wise.
+ * Process a subgroup of invalidation messages as an array.
  *
  * As above, but the code fragment can handle an array of messages.
  * The fragment should refer to the messages as msgs[], with n entries.
  */
-#define ProcessMessageListMulti(listHdr, codeFragment) \
+#define ProcessMessageSubGroupMulti(group, subgroup, codeFragment) \
     do { \
-        InvalidationChunk *_chunk; \
-        for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
-        { \
-            SharedInvalidationMessage *msgs = _chunk->msgs; \
-            int        n = _chunk->nitems; \
+        int        n = NumMessagesInSubGroup(group, subgroup); \
+        if (n > 0) { \
+            SharedInvalidationMessage *msgs = \
+                &InvalMessageArrays[subgroup].msgs[(group)->firstmsg[subgroup]]; \
             codeFragment; \
         } \
     } while (0)


 /* ----------------------------------------------------------------
- *                Invalidation set support functions
+ *                Invalidation group support functions
  *
  * These routines understand about the division of a logical invalidation
- * list into separate physical lists for catcache and relcache entries.
+ * group into separate physical arrays for catcache and relcache entries.
  * ----------------------------------------------------------------
  */

@@ -342,7 +378,7 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
  * Add a catcache inval entry
  */
 static void
-AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
+AddCatcacheInvalidationMessage(InvalidationMsgsGroup *group,
                                int id, uint32 hashValue, Oid dbId)
 {
     SharedInvalidationMessage msg;
@@ -363,14 +399,14 @@ AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
      */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->cclist, &msg);
+    AddInvalidationMessage(group, CatCacheMsgs, &msg);
 }

 /*
  * Add a whole-catalog inval entry
  */
 static void
-AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
+AddCatalogInvalidationMessage(InvalidationMsgsGroup *group,
                               Oid dbId, Oid catId)
 {
     SharedInvalidationMessage msg;
@@ -381,14 +417,14 @@ AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
     /* check AddCatcacheInvalidationMessage() for an explanation */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->cclist, &msg);
+    AddInvalidationMessage(group, CatCacheMsgs, &msg);
 }

 /*
  * Add a relcache inval entry
  */
 static void
-AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
+AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
                                Oid dbId, Oid relId)
 {
     SharedInvalidationMessage msg;
@@ -398,11 +434,11 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
      * it will never change. InvalidOid for relId means all relations so we
      * don't need to add individual ones when it is present.
      */
-    ProcessMessageList(hdr->rclist,
-                       if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
-                           (msg->rc.relId == relId ||
-                            msg->rc.relId == InvalidOid))
-                       return);
+    ProcessMessageSubGroup(group, RelCacheMsgs,
+                           if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
+                               (msg->rc.relId == relId ||
+                                msg->rc.relId == InvalidOid))
+                           return);

     /* OK, add the item */
     msg.rc.id = SHAREDINVALRELCACHE_ID;
@@ -411,24 +447,26 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
     /* check AddCatcacheInvalidationMessage() for an explanation */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->rclist, &msg);
+    AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }

 /*
  * Add a snapshot inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
  */
 static void
-AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
+AddSnapshotInvalidationMessage(InvalidationMsgsGroup *group,
                                Oid dbId, Oid relId)
 {
     SharedInvalidationMessage msg;

     /* Don't add a duplicate item */
     /* We assume dbId need not be checked because it will never change */
-    ProcessMessageList(hdr->rclist,
-                       if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
-                           msg->sn.relId == relId)
-                       return);
+    ProcessMessageSubGroup(group, RelCacheMsgs,
+                           if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
+                               msg->sn.relId == relId)
+                           return);

     /* OK, add the item */
     msg.sn.id = SHAREDINVALSNAPSHOT_ID;
@@ -437,33 +475,33 @@ AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
     /* check AddCatcacheInvalidationMessage() for an explanation */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->rclist, &msg);
+    AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }

 /*
- * Append one list of invalidation messages to another, resetting
- * the source list to empty.
+ * Append one group of invalidation messages to another, resetting
+ * the source group to empty.
  */
 static void
-AppendInvalidationMessages(InvalidationListHeader *dest,
-                           InvalidationListHeader *src)
+AppendInvalidationMessages(InvalidationMsgsGroup *dest,
+                           InvalidationMsgsGroup *src)
 {
-    AppendInvalidationMessageList(&dest->cclist, &src->cclist);
-    AppendInvalidationMessageList(&dest->rclist, &src->rclist);
+    AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs);
+    AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs);
 }

 /*
- * Execute the given function for all the messages in an invalidation list.
- * The list is not altered.
+ * Execute the given function for all the messages in an invalidation group.
+ * The group is not altered.
  *
  * catcache entries are processed first, for reasons mentioned above.
  */
 static void
-ProcessInvalidationMessages(InvalidationListHeader *hdr,
+ProcessInvalidationMessages(InvalidationMsgsGroup *group,
                             void (*func) (SharedInvalidationMessage *msg))
 {
-    ProcessMessageList(hdr->cclist, func(msg));
-    ProcessMessageList(hdr->rclist, func(msg));
+    ProcessMessageSubGroup(group, CatCacheMsgs, func(msg));
+    ProcessMessageSubGroup(group, RelCacheMsgs, func(msg));
 }

 /*
@@ -471,11 +509,11 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
  * rather than just one at a time.
  */
 static void
-ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
+ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group,
                                  void (*func) (const SharedInvalidationMessage *msgs, int n))
 {
-    ProcessMessageListMulti(hdr->cclist, func(msgs, n));
-    ProcessMessageListMulti(hdr->rclist, func(msgs, n));
+    ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n));
+    ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n));
 }

 /* ----------------------------------------------------------------
@@ -730,7 +768,7 @@ AcceptInvalidationMessages(void)

 /*
  * PrepareInvalidationState
- *        Initialize inval lists for the current (sub)transaction.
+ *        Initialize inval data for the current (sub)transaction.
  */
 static void
 PrepareInvalidationState(void)
@@ -747,12 +785,45 @@ PrepareInvalidationState(void)
     myInfo->parent = transInvalInfo;
     myInfo->my_level = GetCurrentTransactionNestLevel();

-    /*
-     * If there's any previous entry, this one should be for a deeper nesting
-     * level.
-     */
-    Assert(transInvalInfo == NULL ||
-           myInfo->my_level > transInvalInfo->my_level);
+    /* Now, do we have a previous stack entry? */
+    if (transInvalInfo != NULL)
+    {
+        /* Yes; this one should be for a deeper nesting level. */
+        Assert(myInfo->my_level > transInvalInfo->my_level);
+
+        /*
+         * The parent (sub)transaction must not have any current (i.e.,
+         * not-yet-locally-processed) messages.  If it did, we'd have a
+         * semantic problem: the new subtransaction presumably ought not be
+         * able to see those events yet, but since the CommandCounter is
+         * linear, that can't work once the subtransaction advances the
+         * counter.  This is a convenient place to check for that, as well as
+         * being important to keep management of the message arrays simple.
+         */
+        if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) != 0)
+            elog(ERROR, "cannot start a subtransaction when there are unprocessed inval messages");
+
+        /*
+         * MemoryContextAllocZero set firstmsg = nextmsg = 0 in each group,
+         * which is fine for the first (sub)transaction, but otherwise we need
+         * to update them to follow whatever is already in the arrays.
+         */
+        SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
+                         &transInvalInfo->CurrentCmdInvalidMsgs);
+        SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
+                         &myInfo->PriorCmdInvalidMsgs);
+    }
+    else
+    {
+        /*
+         * Here, we need only clear any array pointers left over from a prior
+         * transaction.
+         */
+        InvalMessageArrays[CatCacheMsgs].msgs = NULL;
+        InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
+        InvalMessageArrays[RelCacheMsgs].msgs = NULL;
+        InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
+    }

     transInvalInfo = myInfo;
 }
@@ -776,48 +847,8 @@ PostPrepare_Inval(void)
 }

 /*
- * Collect invalidation messages into SharedInvalidMessagesArray array.
- */
-static void
-MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n)
-{
-    /*
-     * Initialise array first time through in each commit
-     */
-    if (SharedInvalidMessagesArray == NULL)
-    {
-        maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE;
-        numSharedInvalidMessagesArray = 0;
-
-        /*
-         * Although this is being palloc'd we don't actually free it directly.
-         * We're so close to EOXact that we now we're going to lose it anyhow.
-         */
-        SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray
-                                            * sizeof(SharedInvalidationMessage));
-    }
-
-    if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
-    {
-        while ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
-            maxSharedInvalidMessagesArray *= 2;
-
-        SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray,
-                                              maxSharedInvalidMessagesArray
-                                              * sizeof(SharedInvalidationMessage));
-    }
-
-    /*
-     * Append the next chunk onto the array
-     */
-    memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray,
-           msgs, n * sizeof(SharedInvalidationMessage));
-    numSharedInvalidMessagesArray += n;
-}
-
-/*
- * xactGetCommittedInvalidationMessages() is executed by
- * RecordTransactionCommit() to add invalidation messages onto the
+ * xactGetCommittedInvalidationMessages() is called by
+ * RecordTransactionCommit() to collect invalidation messages to add to the
  * commit record. This applies only to commit message types, never to
  * abort records. Must always run before AtEOXact_Inval(), since that
  * removes the data we need to see.
@@ -832,7 +863,9 @@ int
 xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
                                      bool *RelcacheInitFileInval)
 {
-    MemoryContext oldcontext;
+    SharedInvalidationMessage *msgarray;
+    int            nummsgs;
+    int            nmsgs;

     /* Quick exit if we haven't done anything with invalidation messages. */
     if (transInvalInfo == NULL)
@@ -853,27 +886,48 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
     *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;

     /*
-     * Walk through TransInvalidationInfo to collect all the messages into a
-     * single contiguous array of invalidation messages. It must be contiguous
-     * so we can copy directly into WAL message. Maintain the order that they
-     * would be processed in by AtEOXact_Inval(), to ensure emulated behaviour
-     * in redo is as similar as possible to original. We want the same bugs,
-     * if any, not new ones.
+     * Collect all the pending messages into a single contiguous array of
+     * invalidation messages, to simplify what needs to happen while building
+     * the commit WAL message.  Maintain the order that they would be
+     * processed in by AtEOXact_Inval(), to ensure emulated behaviour in redo
+     * is as similar as possible to original.  We want the same bugs, if any,
+     * not new ones.
      */
-    oldcontext = MemoryContextSwitchTo(CurTransactionContext);
-
-    ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                     MakeSharedInvalidMessagesArray);
-    ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
-                                     MakeSharedInvalidMessagesArray);
-    MemoryContextSwitchTo(oldcontext);
-
-    Assert(!(numSharedInvalidMessagesArray > 0 &&
-             SharedInvalidMessagesArray == NULL));
-
-    *msgs = SharedInvalidMessagesArray;
-
-    return numSharedInvalidMessagesArray;
+    nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
+        NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
+
+    *msgs = msgarray = (SharedInvalidationMessage *)
+        MemoryContextAlloc(CurTransactionContext,
+                           nummsgs * sizeof(SharedInvalidationMessage));
+
+    nmsgs = 0;
+    ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                CatCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                CatCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                RelCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                RelCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    Assert(nmsgs == nummsgs);
+
+    return nmsgs;
 }

 /*
@@ -942,7 +996,7 @@ ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs,
  * about CurrentCmdInvalidMsgs too, since those changes haven't touched
  * the caches yet.
  *
- * In any case, reset the various lists to empty.  We need not physically
+ * In any case, reset our state to empty.  We need not physically
  * free memory here, since TopTransactionContext is about to be emptied
  * anyway.
  *
@@ -986,8 +1040,6 @@ AtEOXact_Inval(bool isCommit)

     /* Need not free anything explicitly */
     transInvalInfo = NULL;
-    SharedInvalidMessagesArray = NULL;
-    numSharedInvalidMessagesArray = 0;
 }

 /*
@@ -1043,10 +1095,21 @@ AtEOSubXact_Inval(bool isCommit)
             return;
         }

-        /* Pass up my inval messages to parent */
+        /*
+         * Pass up my inval messages to parent.  Notice that we stick them in
+         * PriorCmdInvalidMsgs, not CurrentCmdInvalidMsgs, since they've
+         * already been locally processed.  (This would trigger the Assert in
+         * AppendInvalidationMessageSubGroup if the parent's
+         * CurrentCmdInvalidMsgs isn't empty; but we already checked that in
+         * PrepareInvalidationState.)
+         */
         AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs,
                                    &myInfo->PriorCmdInvalidMsgs);

+        /* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
+        SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
+                         &myInfo->parent->PriorCmdInvalidMsgs);
+
         /* Pending relcache inval becomes parent's problem too */
         if (myInfo->RelcacheInitFileInval)
             myInfo->parent->RelcacheInitFileInval = true;
@@ -1514,31 +1577,24 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 /*
  * LogLogicalInvalidations
  *
- * Emit WAL for invalidations.  This is currently only used for logging
- * invalidations at the command end or at commit time if any invalidations
- * are pending.
+ * Emit WAL for invalidations caused by the current command.
+ *
+ * This is currently only used for logging invalidations at the command end
+ * or at commit time if any invalidations are pending.
  */
 void
-LogLogicalInvalidations()
+LogLogicalInvalidations(void)
 {
     xl_xact_invals xlrec;
-    SharedInvalidationMessage *invalMessages;
-    int            nmsgs = 0;
+    InvalidationMsgsGroup *group;
+    int            nmsgs;

     /* Quick exit if we haven't done anything with invalidation messages. */
     if (transInvalInfo == NULL)
         return;

-    ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                     MakeSharedInvalidMessagesArray);
-
-    Assert(!(numSharedInvalidMessagesArray > 0 &&
-             SharedInvalidMessagesArray == NULL));
-
-    invalMessages = SharedInvalidMessagesArray;
-    nmsgs = numSharedInvalidMessagesArray;
-    SharedInvalidMessagesArray = NULL;
-    numSharedInvalidMessagesArray = 0;
+    group = &transInvalInfo->CurrentCmdInvalidMsgs;
+    nmsgs = NumMessagesInGroup(group);

     if (nmsgs > 0)
     {
@@ -1549,10 +1605,12 @@ LogLogicalInvalidations()
         /* perform insertion */
         XLogBeginInsert();
         XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
-        XLogRegisterData((char *) invalMessages,
-                         nmsgs * sizeof(SharedInvalidationMessage));
+        ProcessMessageSubGroupMulti(group, CatCacheMsgs,
+                                    XLogRegisterData((char *) msgs,
+                                                     n * sizeof(SharedInvalidationMessage)));
+        ProcessMessageSubGroupMulti(group, RelCacheMsgs,
+                                    XLogRegisterData((char *) msgs,
+                                                     n * sizeof(SharedInvalidationMessage)));
         XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
-
-        pfree(invalMessages);
     }
 }

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Omar Kilani
Дата:
Сообщение: Re: Clear empty space in a page.
Следующее
От: Tomas Vondra
Дата:
Сообщение: Re: Fdw batch insert error out when set batch_size > 65535