Attached is the complete patch against HEAD to prevent hashtable bucket
splits during hash_seq_search. Any comments before I start
back-porting? I suppose we had better patch this all the way back,
even though AtPrepare_Locks() is the only known trouble spot.
regards, tom lane
*** src/backend/access/transam/xact.c.orig Tue Apr 3 12:34:35 2007
--- src/backend/access/transam/xact.c Wed Apr 25 20:32:00 2007
***************
*** 1631,1636 ****
--- 1631,1637 ----
/* smgrcommit already done */
AtEOXact_Files();
AtEOXact_ComboCid();
+ AtEOXact_HashTables(true);
pgstat_clear_snapshot();
pgstat_count_xact_commit();
pgstat_report_txn_timestamp(0);
***************
*** 1849,1854 ****
--- 1850,1856 ----
/* smgrcommit already done */
AtEOXact_Files();
AtEOXact_ComboCid();
+ AtEOXact_HashTables(true);
pgstat_clear_snapshot();
CurrentResourceOwner = NULL;
***************
*** 2003,2008 ****
--- 2005,2011 ----
smgrabort();
AtEOXact_Files();
AtEOXact_ComboCid();
+ AtEOXact_HashTables(false);
pgstat_clear_snapshot();
pgstat_count_xact_rollback();
pgstat_report_txn_timestamp(0);
***************
*** 3716,3721 ****
--- 3719,3725 ----
s->parent->subTransactionId);
AtEOSubXact_Files(true, s->subTransactionId,
s->parent->subTransactionId);
+ AtEOSubXact_HashTables(true, s->nestingLevel);
/*
* We need to restore the upper transaction's read-only state, in case the
***************
*** 3827,3832 ****
--- 3831,3837 ----
s->parent->subTransactionId);
AtEOSubXact_Files(false, s->subTransactionId,
s->parent->subTransactionId);
+ AtEOSubXact_HashTables(false, s->nestingLevel);
}
/*
*** src/backend/commands/prepare.c.orig Mon Apr 16 14:21:07 2007
--- src/backend/commands/prepare.c Thu Apr 26 15:00:40 2007
***************
*** 21,27 ****
#include "catalog/pg_type.h"
#include "commands/explain.h"
#include "commands/prepare.h"
! #include "funcapi.h"
#include "parser/analyze.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
--- 21,27 ----
#include "catalog/pg_type.h"
#include "commands/explain.h"
#include "commands/prepare.h"
! #include "miscadmin.h"
#include "parser/analyze.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
***************
*** 743,834 ****
Datum
pg_prepared_statement(PG_FUNCTION_ARGS)
{
! FuncCallContext *funcctx;
! HASH_SEQ_STATUS *hash_seq;
! PreparedStatement *prep_stmt;
! /* stuff done only on the first call of the function */
! if (SRF_IS_FIRSTCALL())
! {
! TupleDesc tupdesc;
! MemoryContext oldcontext;
! /* create a function context for cross-call persistence */
! funcctx = SRF_FIRSTCALL_INIT();
! /*
! * switch to memory context appropriate for multiple function calls
! */
! oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
! /* allocate memory for user context */
! if (prepared_queries)
{
! hash_seq = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
! hash_seq_init(hash_seq, prepared_queries);
! funcctx->user_fctx = (void *) hash_seq;
! }
! else
! funcctx->user_fctx = NULL;
! /*
! * build tupdesc for result tuples. This must match the definition of
! * the pg_prepared_statements view in system_views.sql
! */
! tupdesc = CreateTemplateTupleDesc(5, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "name",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 2, "statement",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepare_time",
! TIMESTAMPTZOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 4, "parameter_types",
! REGTYPEARRAYOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 5, "from_sql",
! BOOLOID, -1, 0);
!
! funcctx->tuple_desc = BlessTupleDesc(tupdesc);
! MemoryContextSwitchTo(oldcontext);
! }
!
! /* stuff done on every call of the function */
! funcctx = SRF_PERCALL_SETUP();
! hash_seq = (HASH_SEQ_STATUS *) funcctx->user_fctx;
!
! /* if the hash table is uninitialized, we're done */
! if (hash_seq == NULL)
! SRF_RETURN_DONE(funcctx);
!
! prep_stmt = hash_seq_search(hash_seq);
! if (prep_stmt)
! {
! Datum result;
! HeapTuple tuple;
! Datum values[5];
! bool nulls[5];
! MemSet(nulls, 0, sizeof(nulls));
! values[0] = DirectFunctionCall1(textin,
CStringGetDatum(prep_stmt->stmt_name));
! if (prep_stmt->plansource->query_string == NULL)
! nulls[1] = true;
! else
! values[1] = DirectFunctionCall1(textin,
CStringGetDatum(prep_stmt->plansource->query_string));
! values[2] = TimestampTzGetDatum(prep_stmt->prepare_time);
! values[3] = build_regtype_array(prep_stmt->plansource->param_types,
! prep_stmt->plansource->num_params);
! values[4] = BoolGetDatum(prep_stmt->from_sql);
!
! tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
! result = HeapTupleGetDatum(tuple);
! SRF_RETURN_NEXT(funcctx, result);
}
! SRF_RETURN_DONE(funcctx);
}
/*
--- 743,841 ----
Datum
pg_prepared_statement(PG_FUNCTION_ARGS)
{
! ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
! TupleDesc tupdesc;
! Tuplestorestate *tupstore;
! MemoryContext per_query_ctx;
! MemoryContext oldcontext;
! /* check to see if caller supports us returning a tuplestore */
! if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("set-valued function called in context that cannot accept a set")));
! if (!(rsinfo->allowedModes & SFRM_Materialize))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("materialize mode required, but it is not " \
! "allowed in this context")));
!
! /* need to build tuplestore in query context */
! per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
! oldcontext = MemoryContextSwitchTo(per_query_ctx);
! /*
! * build tupdesc for result tuples. This must match the definition of
! * the pg_prepared_statements view in system_views.sql
! */
! tupdesc = CreateTemplateTupleDesc(5, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "name",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 2, "statement",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepare_time",
! TIMESTAMPTZOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 4, "parameter_types",
! REGTYPEARRAYOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 5, "from_sql",
! BOOLOID, -1, 0);
! /*
! * We put all the tuples into a tuplestore in one scan of the hashtable.
! * This avoids any issue of the hashtable possibly changing between calls.
! */
! tupstore = tuplestore_begin_heap(true, false, work_mem);
!
! /* hash table might be uninitialized */
! if (prepared_queries)
! {
! HASH_SEQ_STATUS hash_seq;
! PreparedStatement *prep_stmt;
! hash_seq_init(&hash_seq, prepared_queries);
! while ((prep_stmt = hash_seq_search(&hash_seq)) != NULL)
{
! HeapTuple tuple;
! Datum values[5];
! bool nulls[5];
! /* generate junk in short-term context */
! MemoryContextSwitchTo(oldcontext);
! MemSet(nulls, 0, sizeof(nulls));
! values[0] = DirectFunctionCall1(textin,
CStringGetDatum(prep_stmt->stmt_name));
! if (prep_stmt->plansource->query_string == NULL)
! nulls[1] = true;
! else
! values[1] = DirectFunctionCall1(textin,
CStringGetDatum(prep_stmt->plansource->query_string));
! values[2] = TimestampTzGetDatum(prep_stmt->prepare_time);
! values[3] = build_regtype_array(prep_stmt->plansource->param_types,
! prep_stmt->plansource->num_params);
! values[4] = BoolGetDatum(prep_stmt->from_sql);
!
! tuple = heap_form_tuple(tupdesc, values, nulls);
!
! /* switch to appropriate context while storing the tuple */
! MemoryContextSwitchTo(per_query_ctx);
! tuplestore_puttuple(tupstore, tuple);
! }
}
! /* clean up and return the tuplestore */
! tuplestore_donestoring(tupstore);
!
! MemoryContextSwitchTo(oldcontext);
!
! rsinfo->returnMode = SFRM_Materialize;
! rsinfo->setResult = tupstore;
! rsinfo->setDesc = tupdesc;
!
! return (Datum) 0;
}
/*
*** src/backend/executor/nodeSubplan.c.orig Mon Feb 26 20:11:25 2007
--- src/backend/executor/nodeSubplan.c Wed Apr 25 19:22:50 2007
***************
*** 569,575 ****
TupleHashIterator hashiter;
TupleHashEntry entry;
! ResetTupleHashIterator(hashtable, &hashiter);
while ((entry = ScanTupleHashTable(&hashiter)) != NULL)
{
ExecStoreMinimalTuple(entry->firstTuple, hashtable->tableslot, false);
--- 569,575 ----
TupleHashIterator hashiter;
TupleHashEntry entry;
! InitTupleHashIterator(hashtable, &hashiter);
while ((entry = ScanTupleHashTable(&hashiter)) != NULL)
{
ExecStoreMinimalTuple(entry->firstTuple, hashtable->tableslot, false);
***************
*** 577,584 ****
--- 577,588 ----
numCols, keyColIdx,
hashtable->cur_eq_funcs,
hashtable->tempcxt))
+ {
+ TermTupleHashIterator(&hashiter);
return true;
+ }
}
+ /* No TermTupleHashIterator call needed here */
return false;
}
*** src/backend/nodes/tidbitmap.c.orig Fri Jan 5 18:01:58 2007
--- src/backend/nodes/tidbitmap.c Wed Apr 25 19:38:05 2007
***************
*** 907,913 ****
tbm_mark_page_lossy(tbm, page->blockno);
if (tbm->nentries <= tbm->maxentries)
! return; /* we have done enough */
/*
* Note: tbm_mark_page_lossy may have inserted a lossy chunk into the
--- 907,917 ----
tbm_mark_page_lossy(tbm, page->blockno);
if (tbm->nentries <= tbm->maxentries)
! {
! /* we have done enough */
! hash_seq_term(&status);
! break;
! }
/*
* Note: tbm_mark_page_lossy may have inserted a lossy chunk into the
*** src/backend/utils/hash/dynahash.c.orig Fri Jan 5 18:02:22 2007
--- src/backend/utils/hash/dynahash.c Thu Apr 26 14:29:55 2007
***************
*** 63,68 ****
--- 63,69 ----
#include "postgres.h"
+ #include "access/xact.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "utils/dynahash.h"
***************
*** 160,165 ****
--- 161,169 ----
char *tabname; /* table name (for error messages) */
bool isshared; /* true if table is in shared memory */
+ /* freezing a shared table isn't allowed, so we can keep state here */
+ bool frozen; /* true = no more inserts allowed */
+
/* We keep local copies of these fixed values to reduce contention */
Size keysize; /* hash key length in bytes */
long ssize; /* segment size --- must be power of 2 */
***************
*** 195,200 ****
--- 199,207 ----
static int choose_nelem_alloc(Size entrysize);
static bool init_htab(HTAB *hashp, long nelem);
static void hash_corrupted(HTAB *hashp);
+ static void register_seq_scan(HTAB *hashp);
+ static void deregister_seq_scan(HTAB *hashp);
+ static bool has_seq_scans(HTAB *hashp);
/*
***************
*** 356,361 ****
--- 363,370 ----
errmsg("out of memory")));
}
+ hashp->frozen = false;
+
hdefault(hashp);
hctl = hashp->hctl;
***************
*** 898,903 ****
--- 907,916 ----
if (currBucket != NULL)
return (void *) ELEMENTKEY(currBucket);
+ /* disallow inserts if frozen */
+ if (hashp->frozen)
+ elog(ERROR, "cannot insert into a frozen hashtable");
+
currBucket = get_hash_entry(hashp);
if (currBucket == NULL)
{
***************
*** 925,934 ****
/* caller is expected to fill the data field on return */
! /* Check if it is time to split a bucket */
! /* Can't split if running in partitioned mode */
if (!IS_PARTITIONED(hctl) &&
! hctl->nentries / (long) (hctl->max_bucket + 1) >= hctl->ffactor)
{
/*
* NOTE: failure to expand table is not a fatal error, it just
--- 938,952 ----
/* caller is expected to fill the data field on return */
! /*
! * Check if it is time to split a bucket. Can't split if running
! * in partitioned mode, nor if table is the subject of any active
! * hash_seq_search scans. Strange order of these tests is to try
! * to check cheaper conditions first.
! */
if (!IS_PARTITIONED(hctl) &&
! hctl->nentries / (long) (hctl->max_bucket + 1) >= hctl->ffactor &&
! !has_seq_scans(hashp))
{
/*
* NOTE: failure to expand table is not a fatal error, it just
***************
*** 1001,1018 ****
}
/*
! * hash_seq_init/_search
* Sequentially search through hash table and return
* all the elements one by one, return NULL when no more.
*
* NOTE: caller may delete the returned element before continuing the scan.
* However, deleting any other element while the scan is in progress is
* UNDEFINED (it might be the one that curIndex is pointing at!). Also,
* if elements are added to the table while the scan is in progress, it is
* unspecified whether they will be visited by the scan or not.
*
* NOTE: to use this with a partitioned hashtable, caller had better hold
* at least shared lock on all partitions of the table throughout the scan!
*/
void
hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
--- 1019,1048 ----
}
/*
! * hash_seq_init/_search/_term
* Sequentially search through hash table and return
* all the elements one by one, return NULL when no more.
*
+ * hash_seq_term should be called if and only if the scan is abandoned before
+ * completion; if hash_seq_search returns NULL then it has already done the
+ * end-of-scan cleanup.
+ *
* NOTE: caller may delete the returned element before continuing the scan.
* However, deleting any other element while the scan is in progress is
* UNDEFINED (it might be the one that curIndex is pointing at!). Also,
* if elements are added to the table while the scan is in progress, it is
* unspecified whether they will be visited by the scan or not.
*
+ * NOTE: it is possible to use hash_seq_init/hash_seq_search without any
+ * worry about hash_seq_term cleanup, if the hashtable is first locked against
+ * further insertions by calling hash_freeze. This is used by nodeAgg.c,
+ * wherein it is inconvenient to track whether a scan is still open, and
+ * there's no possibility of further insertions after readout has begun.
+ *
* NOTE: to use this with a partitioned hashtable, caller had better hold
* at least shared lock on all partitions of the table throughout the scan!
+ * We can cope with insertions or deletions by our own backend, but *not*
+ * with concurrent insertions or deletions by another.
*/
void
hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
***************
*** 1020,1025 ****
--- 1050,1057 ----
status->hashp = hashp;
status->curBucket = 0;
status->curEntry = NULL;
+ if (!hashp->frozen)
+ register_seq_scan(hashp);
}
void *
***************
*** 1054,1060 ****
--- 1086,1095 ----
max_bucket = hctl->max_bucket;
if (curBucket > max_bucket)
+ {
+ hash_seq_term(status);
return NULL; /* search is done */
+ }
/*
* first find the right segment in the table directory.
***************
*** 1076,1081 ****
--- 1111,1117 ----
if (++curBucket > max_bucket)
{
status->curBucket = curBucket;
+ hash_seq_term(status);
return NULL; /* search is done */
}
if (++segment_ndx >= ssize)
***************
*** 1094,1099 ****
--- 1130,1165 ----
return (void *) ELEMENTKEY(curElem);
}
+ void
+ hash_seq_term(HASH_SEQ_STATUS *status)
+ {
+ if (!status->hashp->frozen)
+ deregister_seq_scan(status->hashp);
+ }
+
+ /*
+ * hash_freeze
+ * Freeze a hashtable against future insertions (deletions are
+ * still allowed)
+ *
+ * The reason for doing this is that by preventing any more bucket splits,
+ * we no longer need to worry about registering hash_seq_search scans,
+ * and thus caller need not be careful about ensuring hash_seq_term gets
+ * called at the right times.
+ *
+ * Multiple calls to hash_freeze() are allowed, but you can't freeze a table
+ * with active scans (since hash_seq_term would then do the wrong thing).
+ */
+ void
+ hash_freeze(HTAB *hashp)
+ {
+ if (hashp->isshared)
+ elog(ERROR, "cannot freeze shared hashtable");
+ if (!hashp->frozen && has_seq_scans(hashp))
+ elog(ERROR, "cannot freeze hashtable with active scans");
+ hashp->frozen = true;
+ }
+
/********************************* UTILITIES ************************/
***************
*** 1323,1326 ****
--- 1389,1525 ----
for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
;
return i;
+ }
+
+
+ /************************* SEQ SCAN TRACKING ************************/
+
+ /*
+ * We track active hash_seq_search scans here. The need for this mechanism
+ * comes from the fact that a scan will get confused if a bucket split occurs
+ * while it's in progress: it might visit entries twice, or even miss some
+ * entirely (if it's partway through the same bucket that splits). Hence
+ * we want to inhibit bucket splits if there are any active scans on the
+ * table being inserted into. This is a fairly rare case in current usage,
+ * so just postponing the split until the next insertion seems sufficient.
+ *
+ * Given present usages of the function, only a few scans are likely to be
+ * open concurrently; so a finite-size stack of open scans seems sufficient,
+ * and we don't worry that linear search is too slow. Note that we do
+ * allow multiple scans of the same hashtable to be open concurrently.
+ *
+ * This mechanism can support concurrent scan and insertion in a shared
+ * hashtable if it's the same backend doing both. It would fail otherwise,
+ * but locking reasons seem to preclude any such scenario anyway, so we don't
+ * worry.
+ *
+ * This arrangement is reasonably robust if a transient hashtable is deleted
+ * without notifying us. The absolute worst case is we might inhibit splits
+ * in another table created later at exactly the same address. We will give
+ * a warning at transaction end for reference leaks, so any bugs leading to
+ * lack of notification should be easy to catch.
+ */
+
+ #define MAX_SEQ_SCANS 100
+
+ static HTAB *seq_scan_tables[MAX_SEQ_SCANS]; /* tables being scanned */
+ static int seq_scan_level[MAX_SEQ_SCANS]; /* subtransaction nest level */
+ static int num_seq_scans = 0;
+
+
+ /* Register a table as having an active hash_seq_search scan */
+ static void
+ register_seq_scan(HTAB *hashp)
+ {
+ if (num_seq_scans >= MAX_SEQ_SCANS)
+ elog(ERROR, "too many active hash_seq_search scans");
+ seq_scan_tables[num_seq_scans] = hashp;
+ seq_scan_level[num_seq_scans] = GetCurrentTransactionNestLevel();
+ num_seq_scans++;
+ }
+
+ /* Deregister an active scan */
+ static void
+ deregister_seq_scan(HTAB *hashp)
+ {
+ int i;
+
+ /* Search backward since it's most likely at the stack top */
+ for (i = num_seq_scans - 1; i >= 0; i--)
+ {
+ if (seq_scan_tables[i] == hashp)
+ {
+ seq_scan_tables[i] = seq_scan_tables[num_seq_scans - 1];
+ seq_scan_level[i] = seq_scan_level[num_seq_scans - 1];
+ num_seq_scans--;
+ return;
+ }
+ }
+ elog(ERROR, "no hash_seq_search scan for hash table \"%s\"",
+ hashp->tabname);
+ }
+
+ /* Check if a table has any active scan */
+ static bool
+ has_seq_scans(HTAB *hashp)
+ {
+ int i;
+
+ for (i = 0; i < num_seq_scans; i++)
+ {
+ if (seq_scan_tables[i] == hashp)
+ return true;
+ }
+ return false;
+ }
+
+ /* Clean up any open scans at end of transaction */
+ void
+ AtEOXact_HashTables(bool isCommit)
+ {
+ /*
+ * During abort cleanup, open scans are expected; just silently clean 'em
+ * out. An open scan at commit means someone forgot a hash_seq_term()
+ * call, so complain.
+ *
+ * Note: it's tempting to try to print the tabname here, but refrain for
+ * fear of touching deallocated memory. This isn't a user-facing message
+ * anyway, so it needn't be pretty.
+ */
+ if (isCommit)
+ {
+ int i;
+
+ for (i = 0; i < num_seq_scans; i++)
+ {
+ elog(WARNING, "leaked hash_seq_search scan for hash table %p",
+ seq_scan_tables[i]);
+ }
+ }
+ num_seq_scans = 0;
+ }
+
+ /* Clean up any open scans at end of subtransaction */
+ void
+ AtEOSubXact_HashTables(bool isCommit, int nestDepth)
+ {
+ int i;
+
+ /*
+ * Search backward to make cleanup easy. Note we must check all entries,
+ * not only those at the end of the array, because deletion technique
+ * doesn't keep them in order.
+ */
+ for (i = num_seq_scans - 1; i >= 0; i--)
+ {
+ if (seq_scan_level[i] >= nestDepth)
+ {
+ if (isCommit)
+ elog(WARNING, "leaked hash_seq_search scan for hash table %p",
+ seq_scan_tables[i]);
+ seq_scan_tables[i] = seq_scan_tables[num_seq_scans - 1];
+ seq_scan_level[i] = seq_scan_level[num_seq_scans - 1];
+ num_seq_scans--;
+ }
+ }
}
*** src/backend/utils/mmgr/portalmem.c.orig Thu Apr 12 14:21:37 2007
--- src/backend/utils/mmgr/portalmem.c Thu Apr 26 15:27:57 2007
***************
*** 22,28 ****
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "commands/portalcmds.h"
- #include "funcapi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
--- 22,27 ----
***************
*** 621,627 ****
/* Zap all non-holdable portals */
PortalDrop(portal, true);
! /* Restart the iteration */
hash_seq_init(&status, PortalHashTable);
}
}
--- 620,628 ----
/* Zap all non-holdable portals */
PortalDrop(portal, true);
! /* Restart the iteration in case that led to other drops */
! /* XXX is this really necessary? */
! hash_seq_term(&status);
hash_seq_init(&status, PortalHashTable);
}
}
***************
*** 858,936 ****
Datum
pg_cursor(PG_FUNCTION_ARGS)
{
! FuncCallContext *funcctx;
! HASH_SEQ_STATUS *hash_seq;
PortalHashEnt *hentry;
! /* stuff done only on the first call of the function */
! if (SRF_IS_FIRSTCALL())
! {
! MemoryContext oldcontext;
! TupleDesc tupdesc;
!
! /* create a function context for cross-call persistence */
! funcctx = SRF_FIRSTCALL_INIT();
!
! /*
! * switch to memory context appropriate for multiple function calls
! */
! oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
!
! if (PortalHashTable)
! {
! hash_seq = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
! hash_seq_init(hash_seq, PortalHashTable);
! funcctx->user_fctx = (void *) hash_seq;
! }
! else
! funcctx->user_fctx = NULL;
!
! /*
! * build tupdesc for result tuples. This must match the definition of
! * the pg_cursors view in system_views.sql
! */
! tupdesc = CreateTemplateTupleDesc(6, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "name",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 2, "statement",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 3, "is_holdable",
! BOOLOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 4, "is_binary",
! BOOLOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 5, "is_scrollable",
! BOOLOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 6, "creation_time",
! TIMESTAMPTZOID, -1, 0);
!
! funcctx->tuple_desc = BlessTupleDesc(tupdesc);
! MemoryContextSwitchTo(oldcontext);
! }
! /* stuff done on every call of the function */
! funcctx = SRF_PERCALL_SETUP();
! hash_seq = (HASH_SEQ_STATUS *) funcctx->user_fctx;
!
! /* if the hash table is uninitialized, we're done */
! if (hash_seq == NULL)
! SRF_RETURN_DONE(funcctx);
! /* loop until we find a visible portal or hit the end of the list */
! while ((hentry = hash_seq_search(hash_seq)) != NULL)
! {
! if (hentry->portal->visible)
! break;
! }
! if (hentry)
{
! Portal portal;
! Datum result;
HeapTuple tuple;
Datum values[6];
bool nulls[6];
! portal = hentry->portal;
MemSet(nulls, 0, sizeof(nulls));
values[0] = DirectFunctionCall1(textin, CStringGetDatum(portal->name));
--- 859,926 ----
Datum
pg_cursor(PG_FUNCTION_ARGS)
{
! ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
! TupleDesc tupdesc;
! Tuplestorestate *tupstore;
! MemoryContext per_query_ctx;
! MemoryContext oldcontext;
! HASH_SEQ_STATUS hash_seq;
PortalHashEnt *hentry;
! /* check to see if caller supports us returning a tuplestore */
! if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("set-valued function called in context that cannot accept a set")));
! if (!(rsinfo->allowedModes & SFRM_Materialize))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("materialize mode required, but it is not " \
! "allowed in this context")));
!
! /* need to build tuplestore in query context */
! per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
! oldcontext = MemoryContextSwitchTo(per_query_ctx);
! /*
! * build tupdesc for result tuples. This must match the definition of
! * the pg_cursors view in system_views.sql
! */
! tupdesc = CreateTemplateTupleDesc(6, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "name",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 2, "statement",
! TEXTOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 3, "is_holdable",
! BOOLOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 4, "is_binary",
! BOOLOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 5, "is_scrollable",
! BOOLOID, -1, 0);
! TupleDescInitEntry(tupdesc, (AttrNumber) 6, "creation_time",
! TIMESTAMPTZOID, -1, 0);
! /*
! * We put all the tuples into a tuplestore in one scan of the hashtable.
! * This avoids any issue of the hashtable possibly changing between calls.
! */
! tupstore = tuplestore_begin_heap(true, false, work_mem);
! hash_seq_init(&hash_seq, PortalHashTable);
! while ((hentry = hash_seq_search(&hash_seq)) != NULL)
{
! Portal portal = hentry->portal;
HeapTuple tuple;
Datum values[6];
bool nulls[6];
! /* report only "visible" entries */
! if (!portal->visible)
! continue;
!
! /* generate junk in short-term context */
! MemoryContextSwitchTo(oldcontext);
!
MemSet(nulls, 0, sizeof(nulls));
values[0] = DirectFunctionCall1(textin, CStringGetDatum(portal->name));
***************
*** 944,953 ****
values[4] = BoolGetDatum(portal->cursorOptions & CURSOR_OPT_SCROLL);
values[5] = TimestampTzGetDatum(portal->creation_time);
! tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
! result = HeapTupleGetDatum(tuple);
! SRF_RETURN_NEXT(funcctx, result);
}
! SRF_RETURN_DONE(funcctx);
}
--- 934,954 ----
values[4] = BoolGetDatum(portal->cursorOptions & CURSOR_OPT_SCROLL);
values[5] = TimestampTzGetDatum(portal->creation_time);
! tuple = heap_form_tuple(tupdesc, values, nulls);
!
! /* switch to appropriate context while storing the tuple */
! MemoryContextSwitchTo(per_query_ctx);
! tuplestore_puttuple(tupstore, tuple);
}
! /* clean up and return the tuplestore */
! tuplestore_donestoring(tupstore);
!
! MemoryContextSwitchTo(oldcontext);
!
! rsinfo->returnMode = SFRM_Materialize;
! rsinfo->setResult = tupstore;
! rsinfo->setDesc = tupdesc;
!
! return (Datum) 0;
}
*** src/include/nodes/execnodes.h.orig Tue Mar 27 19:21:12 2007
--- src/include/nodes/execnodes.h Wed Apr 25 19:21:40 2007
***************
*** 408,415 ****
typedef HASH_SEQ_STATUS TupleHashIterator;
! #define ResetTupleHashIterator(htable, iter) \
hash_seq_init(iter, (htable)->hashtab)
#define ScanTupleHashTable(iter) \
((TupleHashEntry) hash_seq_search(iter))
--- 408,427 ----
typedef HASH_SEQ_STATUS TupleHashIterator;
! /*
! * Use InitTupleHashIterator/TermTupleHashIterator for a read/write scan.
! * Use ResetTupleHashIterator if the table can be frozen (in this case no
! * explicit scan termination is needed).
! */
! #define InitTupleHashIterator(htable, iter) \
hash_seq_init(iter, (htable)->hashtab)
+ #define TermTupleHashIterator(iter) \
+ hash_seq_term(iter)
+ #define ResetTupleHashIterator(htable, iter) \
+ do { \
+ hash_freeze((htable)->hashtab); \
+ hash_seq_init(iter, (htable)->hashtab); \
+ } while (0)
#define ScanTupleHashTable(iter) \
((TupleHashEntry) hash_seq_search(iter))
*** src/include/utils/hsearch.h.orig Fri Jan 5 18:02:59 2007
--- src/include/utils/hsearch.h Wed Apr 25 20:29:18 2007
***************
*** 130,138 ****
--- 130,142 ----
extern long hash_get_num_entries(HTAB *hashp);
extern void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp);
extern void *hash_seq_search(HASH_SEQ_STATUS *status);
+ extern void hash_seq_term(HASH_SEQ_STATUS *status);
+ extern void hash_freeze(HTAB *hashp);
extern Size hash_estimate_size(long num_entries, Size entrysize);
extern long hash_select_dirsize(long num_entries);
extern Size hash_get_shared_size(HASHCTL *info, int flags);
+ extern void AtEOXact_HashTables(bool isCommit);
+ extern void AtEOSubXact_HashTables(bool isCommit, int nestDepth);
/*
* prototypes for functions in hashfn.c