Обсуждение: [HACKERS] Assorted leaks and weirdness in parallel execution

Поиск
Список
Период
Сортировка

[HACKERS] Assorted leaks and weirdness in parallel execution

От
Tom Lane
Дата:
I complained a couple weeks ago that nodeGatherMerge looked like it
leaked a lot of memory when commanded to rescan.  Attached are three
proposed patches that, in combination, demonstrably result in zero
leakage across repeated rescans.

The first thing I noticed when I started digging into this was that
there was some leakage in TopMemoryContext, which seemed pretty weird.
What it turned out to be was on_dsm_detach callback registration records.
This happens because, although the comments for shm_mq_attach() claim
that shm_mq_detach() will free the shm_mq_handle, it does no such thing,
and it doesn't worry about canceling the on_dsm_detach registration
either.  So over repeated attach/detach cycles, we leak shm_mq_handles
and also callback registrations.  This isn't just a memory leak: it
means that, whenever we finally do detach from the DSM segment, we'll
execute a bunch of shm_mq_detach() calls pointed at long-since-detached-
and-reused shm_mq structs.  That seems incredibly dangerous.  It manages
not to fail ATM because our stylized use of DSM means that a shm_mq
would only ever be re-used as another shm_mq; so the only real effect is
that our last counterparty process, if still attached, would receive N
SetLatch events not just one.  But it's going to crash and burn someday.

For extra fun, the error MQs weren't ever explicitly detached from,
just left to rot until on_dsm_detach time.  Although we did pfree the
shm_mq_handles out from under them.

So the first patch attached cleans this up by making shm_mq_detach
do what it was advertised to, ie fully reverse what shm_mq_attach
does.  That means it needs to take a shm_mq_handle, not a bare shm_mq,
but that actually makes the callers cleaner anyway.  (With this patch,
there are no callers of shm_mq_get_queue(); should we remove that?)

The second patch cleans up assorted garden-variety leaks when
rescanning a GatherMerge node, by having it allocate its work
arrays just once and then re-use them across rescans.

The last patch fixes the one remaining leak I saw after applying the
first two patches, namely that execParallel.c leaks the array palloc'd
by ExecParallelSetupTupleQueues --- just the array storage, not any of
the shm_mq_handles it points to.  The given patch just adds a pfree
to ExecParallelFinish, but TBH I find this pretty unsatisfactory.
It seems like a significant modularity violation that execParallel.c
is responsible for creating those shm_mqs but not for cleaning them up.
That cleanup currently happens as a result of DestroyTupleQueueReader
calls done by nodeGather.c or nodeGatherMerge.c.  I'm tempted to
propose that we should move both the creation and the destruction of
the TupleQueueReaders into execParallel.c; the current setup is not
just weird but requires duplicative coding in the Gather nodes.
(That would make it more difficult to do the early reader destruction
that nodeGather currently does, but I am not sure we care about that.)
Another thing that seems like a poor factorization choice is that
DestroyTupleQueueReader is charged with doing shm_mq_detach even though
tqueue.c did not do the shm_mq_attach ... should we rethink that?

Comments?

            regards, tom lane

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 17b1038..ce1b907 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
              */
             any_registrations_failed = true;
             pcxt->worker[i].bgwhandle = NULL;
-            pfree(pcxt->worker[i].error_mqh);
+            shm_mq_detach(pcxt->worker[i].error_mqh);
             pcxt->worker[i].error_mqh = NULL;
         }
     }
@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt)
             {
                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);

-                pfree(pcxt->worker[i].error_mqh);
+                shm_mq_detach(pcxt->worker[i].error_mqh);
                 pcxt->worker[i].error_mqh = NULL;
             }
         }
@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)

         case 'X':                /* Terminate, indicating clean exit */
             {
-                pfree(pcxt->worker[i].error_mqh);
+                shm_mq_detach(pcxt->worker[i].error_mqh);
                 pcxt->worker[i].error_mqh = NULL;
                 break;
             }
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 4c4fcf5..cb262d8 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -578,7 +578,7 @@ tqueueShutdownReceiver(DestReceiver *self)
 {
     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;

-    shm_mq_detach(shm_mq_get_queue(tqueue->queue));
+    shm_mq_detach(tqueue->queue);
 }

 /*
@@ -650,7 +650,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-    shm_mq_detach(shm_mq_get_queue(reader->queue));
+    shm_mq_detach(reader->queue);
     if (reader->typmodmap != NULL)
         hash_destroy(reader->typmodmap);
     /* Is it worth trying to free substructure of the remap tree? */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 8fbc038..e1a24b6 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -21,7 +21,6 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"

-static shm_mq *pq_mq;
 static shm_mq_handle *pq_mq_handle;
 static bool pq_mq_busy = false;
 static pid_t pq_mq_parallel_master_pid = 0;
@@ -56,7 +55,6 @@ void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
     PqCommMethods = &PqCommMqMethods;
-    pq_mq = shm_mq_get_queue(mqh);
     pq_mq_handle = mqh;
     whereToSendOutput = DestRemote;
     FrontendProtocol = PG_PROTOCOL_LATEST;
@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 static void
 pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
 {
-    pq_mq = NULL;
     pq_mq_handle = NULL;
     whereToSendOutput = DestNone;
 }
@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
      */
     if (pq_mq_busy)
     {
-        if (pq_mq != NULL)
-            shm_mq_detach(pq_mq);
-        pq_mq = NULL;
+        if (pq_mq_handle != NULL)
+            shm_mq_detach(pq_mq_handle);
         pq_mq_handle = NULL;
         return EOF;
     }
@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
      * be generated late in the shutdown sequence, after all DSMs have already
      * been detached.
      */
-    if (pq_mq == NULL)
+    if (pq_mq_handle == NULL)
         return 0;

     pq_mq_busy = true;
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f45a67c..5429711 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -83,7 +83,9 @@ struct shm_mq
  * This structure is a backend-private handle for access to a queue.
  *
  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
- * a pointer to the dynamic shared memory segment that contains it.
+ * an optional pointer to the dynamic shared memory segment that contains it.
+ * (If mqh_segment is provided, we register an on_dsm_detach callback to
+ * make sure we detach from the queue before detaching from DSM.)
  *
  * If this queue is intended to connect the current process with a background
  * worker that started it, the user can pass a pointer to the worker handle
@@ -139,6 +141,7 @@ struct shm_mq_handle
     MemoryContext mqh_context;
 };

+static void shm_mq_detach_internal(shm_mq *mq);
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
                   const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
@@ -766,6 +769,25 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)

 /*
  * Detach a shared message queue.
+ */
+void
+shm_mq_detach(shm_mq_handle *mqh)
+{
+    /* Notify counterparty that we're outta here. */
+    shm_mq_detach_internal(mqh->mqh_queue);
+
+    /* Cancel on_dsm_detach callback, if any. */
+    if (mqh->mqh_segment)
+        cancel_on_dsm_detach(mqh->mqh_segment,
+                             shm_mq_detach_callback,
+                             PointerGetDatum(mqh->mqh_queue));
+
+    /* Release memory allocated by shm_mq_attach. */
+    pfree(mqh);
+}
+
+/*
+ * Notify counterparty that we're detaching from shared message queue.
  *
  * The purpose of this function is to make sure that the process
  * with which we're communicating doesn't block forever waiting for us to
@@ -773,9 +795,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
  * detaches, the receiver can read any messages remaining in the queue;
  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ *
+ * This is separated out from shm_mq_detach() because if the on_dsm_detach
+ * callback fires, we only want to do this much.  We do not try to touch
+ * the local shm_mq_handle, as it may have been pfree'd already.
  */
-void
-shm_mq_detach(shm_mq *mq)
+static void
+shm_mq_detach_internal(shm_mq *mq)
 {
     volatile shm_mq *vmq = mq;
     PGPROC       *victim;
@@ -1193,5 +1219,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 {
     shm_mq       *mq = (shm_mq *) DatumGetPointer(arg);

-    shm_mq_detach(mq);
+    shm_mq_detach_internal(mq);
 }
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 02a93e0..97f0280 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -63,7 +63,7 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
 extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);

 /* Break connection. */
-extern void shm_mq_detach(shm_mq *);
+extern void shm_mq_detach(shm_mq_handle *mqh);

 /* Get the shm_mq from handle. */
 extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 67da5ff..0ffe120 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
 static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
 static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
                   bool nowait, bool *done);
-static void gather_merge_init(GatherMergeState *gm_state);
 static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
+static void gather_merge_setup(GatherMergeState *gm_state);
+static void gather_merge_init(GatherMergeState *gm_state);
+static void gather_merge_clear_tuples(GatherMergeState *gm_state);
 static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
                       bool nowait);
 static void load_tuple_array(GatherMergeState *gm_state, int reader);
@@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
     }

     /*
-     * store the tuple descriptor into gather merge state, so we can use it
-     * later while initializing the gather merge slots.
+     * Store the tuple descriptor into gather merge state, so we can use it
+     * while initializing the gather merge slots.
      */
     if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
         hasoid = false;
     tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
     gm_state->tupDesc = tupDesc;

+    /* Now allocate the workspace for gather merge */
+    gather_merge_setup(gm_state);
+
     return gm_state;
 }

@@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
     /* Make sure any existing workers are gracefully shut down */
     ExecShutdownGatherMergeWorkers(node);

+    /* Free any unused tuples, so we don't leak memory across rescans */
+    gather_merge_clear_tuples(node);
+
     /* Mark node so that shared state will be rebuilt at next call */
     node->initialized = false;
     node->gm_initialized = false;
@@ -370,49 +378,86 @@ ExecReScanGatherMerge(GatherMergeState *node)
 }

 /*
- * Initialize the Gather merge tuple read.
+ * Set up the data structures that we'll need for Gather Merge.
  *
- * Pull at least a single tuple from each worker + leader and set up the heap.
+ * We allocate these once on the basis of gm->num_workers, which is an
+ * upper bound for the number of workers we'll actually have.  During
+ * a rescan, we reset the structures to empty.  This approach simplifies
+ * not leaking memory across rescans.
  */
 static void
-gather_merge_init(GatherMergeState *gm_state)
+gather_merge_setup(GatherMergeState *gm_state)
 {
-    int            nreaders = gm_state->nreaders;
-    bool        nowait = true;
+    GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
+    int            nreaders = gm->num_workers;
     int            i;

     /*
      * Allocate gm_slots for the number of workers + one more slot for leader.
-     * Last slot is always for leader. Leader always calls ExecProcNode() to
-     * read the tuple which will return the TupleTableSlot. Later it will
-     * directly get assigned to gm_slot. So just initialize leader gm_slot
-     * with NULL. For other slots, code below will call
-     * ExecInitExtraTupleSlot() to create a slot for the worker's results.
+     * Last slot is always for leader.  Leader always calls ExecProcNode() to
+     * read the tuple, and then stores it directly into its gm_slots entry.
+     * For other slots, code below will call ExecInitExtraTupleSlot() to
+     * create a slot for the worker's results.
      */
-    gm_state->gm_slots =
-        palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
-    gm_state->gm_slots[gm_state->nreaders] = NULL;
-
-    /* Initialize the tuple slot and tuple array for each worker */
-    gm_state->gm_tuple_buffers =
-        (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
-                                        gm_state->nreaders);
-    for (i = 0; i < gm_state->nreaders; i++)
+    gm_state->gm_slots = (TupleTableSlot **)
+        palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
+
+    /* Allocate the tuple slot and tuple array for each worker */
+    gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
+        palloc0(nreaders * sizeof(GMReaderTupleBuffer));
+
+    for (i = 0; i < nreaders; i++)
     {
         /* Allocate the tuple array with length MAX_TUPLE_STORE */
         gm_state->gm_tuple_buffers[i].tuple =
             (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);

-        /* Initialize slot for worker */
+        /* Initialize tuple slot for worker */
         gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state);
         ExecSetSlotDescriptor(gm_state->gm_slots[i],
                               gm_state->tupDesc);
     }

     /* Allocate the resources for the merge */
-    gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+    gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
                                             heap_compare_slots,
                                             gm_state);
+}
+
+/*
+ * Initialize the Gather Merge.
+ *
+ * Reset data structures to ensure they're empty.  Then pull at least one
+ * tuple from each worker + leader (or set its "done" indicator), and set up
+ * the heap.
+ */
+static void
+gather_merge_init(GatherMergeState *gm_state)
+{
+    int            nreaders = gm_state->nreaders;
+    bool        nowait = true;
+    int            i;
+
+    /* Assert that gather_merge_setup made enough space */
+    Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
+
+    /* Reset leader's tuple slot to empty */
+    gm_state->gm_slots[nreaders] = NULL;
+
+    /* Reset the tuple slot and tuple array for each worker */
+    for (i = 0; i < nreaders; i++)
+    {
+        /* Reset tuple array to empty */
+        gm_state->gm_tuple_buffers[i].nTuples = 0;
+        gm_state->gm_tuple_buffers[i].readCounter = 0;
+        /* Reset done flag to not-done */
+        gm_state->gm_tuple_buffers[i].done = false;
+        /* Ensure output slot is empty */
+        ExecClearTuple(gm_state->gm_slots[i]);
+    }
+
+    /* Reset binary heap to empty */
+    binaryheap_reset(gm_state->gm_heap);

     /*
      * First, try to read a tuple from each worker (including leader) in
@@ -467,23 +512,23 @@ reread:
 }

 /*
- * Clear out the tuple table slots for each gather merge input.
+ * Clear out the tuple table slot, and any unused pending tuples,
+ * for each gather merge input.
  */
 static void
-gather_merge_clear_slots(GatherMergeState *gm_state)
+gather_merge_clear_tuples(GatherMergeState *gm_state)
 {
     int            i;

     for (i = 0; i < gm_state->nreaders; i++)
     {
-        pfree(gm_state->gm_tuple_buffers[i].tuple);
+        GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
+
+        while (tuple_buffer->readCounter < tuple_buffer->nTuples)
+            heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+
         ExecClearTuple(gm_state->gm_slots[i]);
     }
-
-    /* Free tuple array as we don't need it any more */
-    pfree(gm_state->gm_tuple_buffers);
-    /* Free the binaryheap, which was created for sort */
-    binaryheap_free(gm_state->gm_heap);
 }

 /*
@@ -526,7 +571,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
     if (binaryheap_empty(gm_state->gm_heap))
     {
         /* All the queues are exhausted, and so is the heap */
-        gather_merge_clear_slots(gm_state);
+        gather_merge_clear_tuples(gm_state);
         return NULL;
     }
     else
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c713b85..3234900 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -753,11 +753,16 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
     for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
         InstrAccumParallelQuery(&pei->buffer_usage[i]);

-    /* Finally, accumulate instrumentation, if any. */
+    /* Accumulate instrumentation, if any. */
     if (pei->instrumentation)
         ExecParallelRetrieveInstrumentation(pei->planstate,
                                             pei->instrumentation);

+    /* Clean up assorted storage. */
+    if (pei->tqueue)
+        pfree(pei->tqueue);
+    pei->tqueue = NULL;
+
     pei->finished = true;
 }


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Assorted leaks and weirdness in parallel execution

От
Robert Haas
Дата:
On Thu, Aug 31, 2017 at 11:09 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> I complained a couple weeks ago that nodeGatherMerge looked like it
> leaked a lot of memory when commanded to rescan.  Attached are three
> proposed patches that, in combination, demonstrably result in zero
> leakage across repeated rescans.

Gosh, thanks for looking into this so deeply.  I apologize for all of
the bugs.  Also, my ego is taking some severe damage here.

> But it's going to crash and burn someday.

Yeah, ouch.

> (With this patch,
> there are no callers of shm_mq_get_queue(); should we remove that?)

May as well.  I can't remember any more why I did shm_mq_detach() that
way; I think there was someplace where I thought that the
shm_mq_handle might not be available.  Maybe I'm misremembering, or
perhaps the situation has changed as that code has evolved.

> The last patch fixes the one remaining leak I saw after applying the
> first two patches, namely that execParallel.c leaks the array palloc'd
> by ExecParallelSetupTupleQueues --- just the array storage, not any of
> the shm_mq_handles it points to.  The given patch just adds a pfree
> to ExecParallelFinish, but TBH I find this pretty unsatisfactory.
> It seems like a significant modularity violation that execParallel.c
> is responsible for creating those shm_mqs but not for cleaning them up.

Yeah, the correct division of labor between execParallel.c and
nodeGather.c was not entirely clear to me, and I don't pretend that I
got that 100% right.

> (That would make it more difficult to do the early reader destruction
> that nodeGather currently does, but I am not sure we care about that.)

I think the only thing that matters here is -- if we know that we're
not going to read any more tuples from a worker that might still be
generating tuples, it's imperative that we shut it down ASAP.
Otherwise, it's just going to keep cranking them out, wasting
resources unnecessarily.  I think this is different than what you're
talking about here, but just wanted to be clear.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [HACKERS] Assorted leaks and weirdness in parallel execution

От
Tom Lane
Дата:
Robert Haas <robertmhaas@gmail.com> writes:
> On Thu, Aug 31, 2017 at 11:09 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> (With this patch,
>> there are no callers of shm_mq_get_queue(); should we remove that?)

> May as well.  I can't remember any more why I did shm_mq_detach() that
> way; I think there was someplace where I thought that the
> shm_mq_handle might not be available.  Maybe I'm misremembering, or
> perhaps the situation has changed as that code has evolved.

I initially tried to convert the on_dsm_detach callback to take a
pointer to the shm_mq_handle rather than the shm_mq proper.  That
caused regression test crashes in some processes, indicating that
there are situations where we have freed the shm_mq_handle before
the DSM detach happens.  I think it was only during worker process exit.
That's sort of contrary to the advice in shm_mq.c about the desirable
lifespan of a shm_mq_handle, but I didn't feel like trying to fix it.
It seems generally more robust if the on_dsm_detach callback assumes
as little as possible about intra-process state, anyway.

I don't have any strong reason to remove shm_mq_get_queue(), other than
neatnik-ism.  It might save a caller having to remember the shm_mq pointer
separately.  Given the set of API functions, that would only matter if
somebody wanted to set/get the sender/receiver PGPROC pointers later,
but maybe that's a plausible thing to do.

>> It seems like a significant modularity violation that execParallel.c
>> is responsible for creating those shm_mqs but not for cleaning them up.

> Yeah, the correct division of labor between execParallel.c and
> nodeGather.c was not entirely clear to me, and I don't pretend that I
> got that 100% right.

OK, I'll have a go at that.

>> (That would make it more difficult to do the early reader destruction
>> that nodeGather currently does, but I am not sure we care about that.)

> I think the only thing that matters here is -- if we know that we're
> not going to read any more tuples from a worker that might still be
> generating tuples, it's imperative that we shut it down ASAP.
> Otherwise, it's just going to keep cranking them out, wasting
> resources unnecessarily.  I think this is different than what you're
> talking about here, but just wanted to be clear.

Yeah, it is different.  What I'm looking at is that nodeGather does
DestroyTupleQueueReader as soon as it's seen EOF on a given tuple queue.
That can't save any worker cycles.  The reason seems to be that it wants
to collapse its array of TupleQueueReader pointers so only live queues are
in it.  That's reasonable, but I'm inclined to implement it by making the
Gather node keep a separate working array of pointers to only the live
TupleQueueReaders.  The ParallelExecutorInfo would keep the authoritative
array of all TupleQueueReaders that have been created, and destroy them in
ExecParallelFinish.

Your point is that we want to shut down the TupleQueueReaders immediately
on rescan, which we do already.  Another possible scenario is to shut them
down once we've reached the passed-down tuple limit (across the whole
Gather, not per-child which is what 3452dc524 implemented).  I don't think
what I'm suggesting would complicate that.
        regards, tom lane



Re: [HACKERS] Assorted leaks and weirdness in parallel execution

От
Robert Haas
Дата:
On Thu, Aug 31, 2017 at 2:13 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> Yeah, it is different.  What I'm looking at is that nodeGather does
> DestroyTupleQueueReader as soon as it's seen EOF on a given tuple queue.
> That can't save any worker cycles.  The reason seems to be that it wants
> to collapse its array of TupleQueueReader pointers so only live queues are
> in it.  That's reasonable, but I'm inclined to implement it by making the
> Gather node keep a separate working array of pointers to only the live
> TupleQueueReaders.  The ParallelExecutorInfo would keep the authoritative
> array of all TupleQueueReaders that have been created, and destroy them in
> ExecParallelFinish.

Hmm, that's a thought.

> Your point is that we want to shut down the TupleQueueReaders immediately
> on rescan, which we do already.  Another possible scenario is to shut them
> down once we've reached the passed-down tuple limit (across the whole
> Gather, not per-child which is what 3452dc524 implemented).  I don't think
> what I'm suggesting would complicate that.

Yeah.  I think the way to do that would be to implement what is
mentioned in the comment for ExecShutdownNode: call that function on
the child plan as soon as the LIMIT is filled.

(Hmm, the reference to someday covering FDW in the header of that
comment is obsolete, isn't it?  Another oversight on my part.)

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company