Обсуждение: Allow substitute allocators for PGresult.

Поиск
Список
Период
Сортировка

Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Hello. This message is a proposal of a pair of patches that
enables the memory allocator for PGresult in libpq to be
replaced.


The comment at the the begging of pqexpbuffer.c says that libpq
should not rely on palloc(). Besides, Tom Lane said that palloc
should not be visible outside the backend(*1) and I agree with
it.

*1: http://archives.postgresql.org/pgsql-hackers/1999-02/msg00364.php

On the other hand, in dblink, dblink-plus (our product!), and
maybe FDW's connect to other PostgreSQL servers are seem to copy
the result of the query contained in PGresult into tuple store. I
guess that this is in order to avoid memory leakage on
termination in halfway.

But it is rather expensive to copy whole PGresult, and the
significance grows as the data received gets larger. Furthermore,
it requires about twice as much memory as the net size of the
data. And it is fruitless to copy'n modify libpq or reinvent it
from scratch. So we shall be happy to be able to use palloc's in
libpq at least for PGresult for such case in spite of the policy.


For these reasons, I propose to make allocators for PGresult
replaceable.

The modifications are made up into two patches.

1. dupEvents() and pqAddTuple() get new memory block by malloc  currently, but the aquired memory block is linked into
PGresultfinally. So I think it is preferable to use  pqResultAlloc() or its descendents in consistensy with the  nature
ofthe place to link.
 
  But there is not PQresultRealloc() and it will be costly, so  pqAddTuple() is not modified in this patch.


2. Define three function pointers  PQpgresult_(malloc|realloc|free) and replace the calls to  malloc/realloc/free in
thefour functions below with these  pointers.
 
  PQmakeEmptyPGresult()  pqResultAlloc()  PQclear()  pqAddTuple()

This patches make the tools run in backend process and use libpq
possible to handle PGresult as it is with no copy, no more memory.

(Of cource, someone wants to use his/her custom allocator forPGresult on standalone tools could do that using this
feature.)



Three files are attached to this message.

First, the patch with respect to "1" above.
Second, the patch with respect to "2" above.
Third, a very simple sample program.

I have built and briefly tested on CentOS6, with the sample
program mentioned above and valgrind, but not on Windows.


How do you think about this?

Regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 113aab0..8e32b18 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -49,7 +49,7 @@ static int    static_client_encoding = PG_SQL_ASCII;static bool static_std_strings = false;
-static PGEvent *dupEvents(PGEvent *events, int count);
+static PGEvent *dupEvents(PGresult *res, PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static
intPQsendQueryGuts(PGconn *conn,                const char *command,
 
@@ -186,7 +186,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)        /* copy events last; result must be
validif we need to PQclear */        if (conn->nEvents > 0)        {
 
-            result->events = dupEvents(conn->events, conn->nEvents);
+            result->events = dupEvents(result, conn->events, conn->nEvents);            if (!result->events)
{                PQclear(result);
 
@@ -337,7 +337,7 @@ PQcopyResult(const PGresult *src, int flags)    /* Wants to copy PGEvents? */    if ((flags &
PG_COPYRES_EVENTS)&& src->nEvents > 0)    {
 
-        dest->events = dupEvents(src->events, src->nEvents);
+        dest->events = dupEvents(dest, dest->events, src->nEvents);        if (!dest->events)        {
PQclear(dest);
@@ -374,7 +374,7 @@ PQcopyResult(const PGresult *src, int flags) * Also, the resultInitialized flags are all cleared.
*/staticPGEvent *
 
-dupEvents(PGEvent *events, int count)
+dupEvents(PGresult *res, PGEvent *events, int count){    PGEvent    *newEvents;    int            i;
@@ -382,7 +382,7 @@ dupEvents(PGEvent *events, int count)    if (!events || count <= 0)        return NULL;
-    newEvents = (PGEvent *) malloc(count * sizeof(PGEvent));
+    newEvents = (PGEvent *) pqResultAlloc(res, count * sizeof(PGEvent), TRUE);    if (!newEvents)        return NULL;
@@ -392,14 +392,9 @@ dupEvents(PGEvent *events, int count)        newEvents[i].passThrough = events[i].passThrough;
  newEvents[i].data = NULL;        newEvents[i].resultInitialized = FALSE;
 
-        newEvents[i].name = strdup(events[i].name);
+        newEvents[i].name = pqResultStrdup(res, events[i].name);        if (!newEvents[i].name)
-        {
-            while (--i >= 0)
-                free(newEvents[i].name);
-            free(newEvents);            return NULL;
-        }    }    return newEvents;
@@ -661,12 +656,8 @@ PQclear(PGresult *res)            (void) res->events[i].proc(PGEVT_RESULTDESTROY, &evt,
                          res->events[i].passThrough);        }
 
-        free(res->events[i].name);    }
-    if (res->events)
-        free(res->events);
-    /* Free all the subsidiary blocks */    while ((block = res->curBlock) != NULL)    {
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..3b26c7c 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQpgresult_malloc      161
+PQpgresult_realloc      162
+PQpgresult_free          163
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 8e32b18..a574848 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -67,6 +67,15 @@ static int PQsendDescribe(PGconn *conn, char desc_type,               const char
*desc_target);staticint    check_field_number(const PGresult *res, int field_num);
 
+/* ---
+ * malloc/realloc/free for PGResult is replasable for in-backend use
+ * Note that the events having the event id PGEVT_RESULTDESTROY won't
+ * fire when you free the memory blocks for PGresult without
+ * PQclear().
+ */
+void *(*PQpgresult_malloc)(size_t size) = malloc;
+void *(*PQpgresult_realloc)(void *ptr, size_t size) = realloc;
+void (*PQpgresult_free)(void *ptr) = free;/* ---------------- * Space management for PGresult.
@@ -138,7 +147,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status){    PGresult   *result;
-    result = (PGresult *) malloc(sizeof(PGresult));
+    result = (PGresult *) PQpgresult_malloc(sizeof(PGresult));    if (!result)        return NULL;
@@ -536,7 +545,8 @@ pqResultAlloc(PGresult *res, size_t nBytes, bool isBinary)     */    if (nBytes >=
PGRESULT_SEP_ALLOC_THRESHOLD)   {
 
-        block = (PGresult_data *) malloc(nBytes + PGRESULT_BLOCK_OVERHEAD);
+        block =
+            (PGresult_data *) PQpgresult_malloc(nBytes + PGRESULT_BLOCK_OVERHEAD);        if (!block)
returnNULL;        space = block->space + PGRESULT_BLOCK_OVERHEAD;
 
@@ -560,7 +570,7 @@ pqResultAlloc(PGresult *res, size_t nBytes, bool isBinary)    }    /* Otherwise, start a new block.
*/
-    block = (PGresult_data *) malloc(PGRESULT_DATA_BLOCKSIZE);
+    block = (PGresult_data *) PQpgresult_malloc(PGRESULT_DATA_BLOCKSIZE);    if (!block)        return NULL;
block->next= res->curBlock;
 
@@ -662,12 +672,12 @@ PQclear(PGresult *res)    while ((block = res->curBlock) != NULL)    {        res->curBlock =
block->next;
-        free(block);
+        PQpgresult_free(block);    }    /* Free the top-level tuple pointer array */    if (res->tuples)
-        free(res->tuples);
+        PQpgresult_free(res->tuples);    /* zero out the pointer fields to catch programming errors */
res->attDescs= NULL;
 
@@ -679,7 +689,7 @@ PQclear(PGresult *res)    /* res->curBlock was zeroed out earlier */    /* Free the PGresult
structureitself */
 
-    free(res);
+    PQpgresult_free(res);}/*
@@ -844,10 +854,11 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)        if (res->tuples == NULL)
newTuples= (PGresAttValue **)
 
-                malloc(newSize * sizeof(PGresAttValue *));
+                PQpgresult_malloc(newSize * sizeof(PGresAttValue *));        else            newTuples =
(PGresAttValue**)
 
-                realloc(res->tuples, newSize * sizeof(PGresAttValue *));
+                PQpgresult_realloc(res->tuples,
+                                 newSize * sizeof(PGresAttValue *));        if (!newTuples)            return FALSE;
    /* malloc or realloc failed */        res->tupArrSize = newSize;
 
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index d13a5b9..c958df1 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -226,6 +226,14 @@ typedef struct pgresAttDesc} PGresAttDesc;/* ----------------
+ * malloc/realloc/free for PGResult is replasable for in-backend use
+ * ----------------
+ */
+extern void *(*PQpgresult_malloc)(size_t size);
+extern void *(*PQpgresult_realloc)(void *ptr, size_t size);
+extern void (*PQpgresult_free)(void *ptr);
+
+/* ---------------- * Exported functions of libpq * ---------------- */

Re: Allow substitute allocators for PGresult.

От
Heikki Linnakangas
Дата:
On 11.11.2011 11:18, Kyotaro HORIGUCHI wrote:
> The comment at the the begging of pqexpbuffer.c says that libpq
> should not rely on palloc(). Besides, Tom Lane said that palloc
> should not be visible outside the backend(*1) and I agree with
> it.
>
> *1: http://archives.postgresql.org/pgsql-hackers/1999-02/msg00364.php
>
> On the other hand, in dblink, dblink-plus (our product!), and
> maybe FDW's connect to other PostgreSQL servers are seem to copy
> the result of the query contained in PGresult into tuple store. I
> guess that this is in order to avoid memory leakage on
> termination in halfway.
>
> But it is rather expensive to copy whole PGresult, and the
> significance grows as the data received gets larger. Furthermore,
> it requires about twice as much memory as the net size of the
> data. And it is fruitless to copy'n modify libpq or reinvent it
> from scratch. So we shall be happy to be able to use palloc's in
> libpq at least for PGresult for such case in spite of the policy.
>
>
> For these reasons, I propose to make allocators for PGresult
> replaceable.

You could use the resource owner mechanism to track them. Register a 
callback function with RegisterResourceReleaseCallback(). Whenever a 
PGresult is returned from libpq, add it to e.g a linked list, kept in 
TopMemoryContext, and also store a reference to CurrentResourceOwner in 
the list element. In the callback function, scan through the list and 
free all the PGresults associated with the resource owner that's being 
released.

--   Heikki Linnakangas  EnterpriseDB   http://www.enterprisedb.com


Re: Allow substitute allocators for PGresult.

От
Tom Lane
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> On 11.11.2011 11:18, Kyotaro HORIGUCHI wrote:
>> The comment at the the begging of pqexpbuffer.c says that libpq
>> should not rely on palloc(). Besides, Tom Lane said that palloc
>> should not be visible outside the backend(*1) and I agree with
>> it.
>> 
>> *1: http://archives.postgresql.org/pgsql-hackers/1999-02/msg00364.php
>> 
>> On the other hand, in dblink, dblink-plus (our product!), and
>> maybe FDW's connect to other PostgreSQL servers are seem to copy
>> the result of the query contained in PGresult into tuple store. I
>> guess that this is in order to avoid memory leakage on
>> termination in halfway.
>> 
>> But it is rather expensive to copy whole PGresult, and the
>> significance grows as the data received gets larger. Furthermore,
>> it requires about twice as much memory as the net size of the
>> data. And it is fruitless to copy'n modify libpq or reinvent it
>> from scratch. So we shall be happy to be able to use palloc's in
>> libpq at least for PGresult for such case in spite of the policy.
>> 
>> For these reasons, I propose to make allocators for PGresult
>> replaceable.

> You could use the resource owner mechanism to track them.

Heikki's idea is probably superior so far as PG backend usage is
concerned in isolation, but I wonder if there are scenarios where a
client application would like to be able to manage libpq's allocations.
If so, Kyotaro-san's approach would solve more problems than just
dblink's.

However, the bigger picture here is that I think Kyotaro-san's desire to
not have dblink return a tuplestore may be misplaced.  Tuplestores can
spill to disk, while PGresults don't; so the larger the result, the
more important it is to push it into a tuplestore and PQclear it as soon
as possible.

Despite that worry, it'd likely be a good idea to adopt one or the other
of these solutions anyway, because I think there are corner cases where
dblink.c can leak a PGresult --- for instance, what if dblink_res_error
fails due to out-of-memory before reaching PQclear?  And we could get
rid of the awkward and none-too-cheap PG_TRY blocks that it uses to try
to defend against such leaks in other places.

So I'm in favor of making a change along that line, although I'd want
to see more evidence before considering changing dblink to not return
tuplestores.
        regards, tom lane


Re: Allow substitute allocators for PGresult.

От
Stephen Frost
Дата:
* Tom Lane (tgl@sss.pgh.pa.us) wrote:
> Heikki's idea is probably superior so far as PG backend usage is
> concerned in isolation, but I wonder if there are scenarios where a
> client application would like to be able to manage libpq's allocations.

The answer to that is certainly 'yes'.  It was one of the first things
that I complained about when moving from Oracle to PG.  With OCI, you
can bulk load results directly into application-allocated memory areas.

Haven't been following the dblink discussion, so not going to comment
about that piece.
Thanks,
    Stephen

Re: Allow substitute allocators for PGresult.

От
Tom Lane
Дата:
Stephen Frost <sfrost@snowman.net> writes:
> * Tom Lane (tgl@sss.pgh.pa.us) wrote:
>> Heikki's idea is probably superior so far as PG backend usage is
>> concerned in isolation, but I wonder if there are scenarios where a
>> client application would like to be able to manage libpq's allocations.

> The answer to that is certainly 'yes'.  It was one of the first things
> that I complained about when moving from Oracle to PG.  With OCI, you
> can bulk load results directly into application-allocated memory areas.

Well, loading data in a form whereby the application can access it
without going through the PGresult accessor functions would be an
entirely different (and vastly larger) project.  I'm not sure I want
to open that can of worms --- it seems like you could write a huge
amount of code trying to provide every format someone might want,
and still find that there were impedance mismatches for many
applications.

AIUI Kyotaro-san is just suggesting that the app should be able to
provide a substitute malloc function for use in allocating PGresult
space (and not, I think, anything else that libpq allocates internally).
Basically this would allow PGresults to be cleaned up with methods other
than calling PQclear on each one.  It wouldn't affect how you'd interact
with one while you had it.  That seems like pretty much exactly what we
want for preventing memory leaks in the backend; but is it going to be
useful for other apps?
        regards, tom lane


Re: Allow substitute allocators for PGresult.

От
Robert Haas
Дата:
On Sat, Nov 12, 2011 at 12:48 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> AIUI Kyotaro-san is just suggesting that the app should be able to
> provide a substitute malloc function for use in allocating PGresult
> space (and not, I think, anything else that libpq allocates internally).
> Basically this would allow PGresults to be cleaned up with methods other
> than calling PQclear on each one.  It wouldn't affect how you'd interact
> with one while you had it.  That seems like pretty much exactly what we
> want for preventing memory leaks in the backend; but is it going to be
> useful for other apps?

I think it will.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Re: Allow substitute allocators for PGresult.

От
Tom Lane
Дата:
Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp> writes:
> Hello. This message is a proposal of a pair of patches that
> enables the memory allocator for PGresult in libpq to be
> replaced.

Since there seems to be rough consensus that something like this would
be a good idea, I looked more closely at the details of the patch.
I think the design could use some adjustment.

To start with, the patch proposes exposing some global variables that
affect the behavior of libpq process-wide.  This seems like a pretty bad
design, because a single process could contain multiple usages of libpq
with different requirements.  As an example, if dblink.c were to set
these variables inside a backend process, it would break usage of libpq
from PL/Perl via DBI.  Global variables also tend to be a bad idea
whenever you think about multi-threaded applications --- they require
locking facilities, which are not in this patch.

I think it'd be better to consider the PGresult alloc/free functions to
be a property of a PGconn, which you'd set with a function call along the
lines of PQsetResultAllocator(conn, alloc_func, realloc_func, free_func)
after having successfully opened a connection.  Then we just have some
more PGconn fields (and I guess PGresult will need a copy of the
free_func pointer) and no new global variables.

I am also feeling dubious about whether it's a good idea to expect the
functions to have exactly the signature of malloc/free.  They are
essentially callbacks, and in most places where a library provides for
callbacks, it's customary to include a "void *" passthrough argument
in case the callback needs some context information.  I am not sure that
dblink.c would need such a thing, but if we're trying to design a
general-purpose feature, then we probably should have it.  The cost
would be having shim functions inside libpq for the default case, but
it doesn't seem likely that they'd cost enough to notice.

The patch lacks any user documentation, which it surely must have if
we are claiming this is a user-visible feature.  And I think it could
use some attention to updating code comments, notably the large block
about PGresult space management near the top of fe-exec.c.

Usually, when writing a feature of this sort, it's a good idea to
implement a prototype use-case to make sure you've not overlooked
anything.  So I'd feel happier about the patch if it came along with
a patch to make dblink.c use it to prevent memory leaks.
        regards, tom lane


Re: Allow substitute allocators for PGresult.

От
Tom Lane
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> On 11.11.2011 11:18, Kyotaro HORIGUCHI wrote:
>> For these reasons, I propose to make allocators for PGresult
>> replaceable.

> You could use the resource owner mechanism to track them.

BTW, I just thought of a potentially fatal objection to making PGresult
allocation depend on palloc: libpq is absolutely not prepared to handle
losing control on out-of-memory.  While I'm not certain that its
behavior with malloc is entirely desirable either (it tends to loop in
hopes of getting the memory next time), we cannot just plop in palloc
in place of malloc and imagine that we're not breaking it.

This makes me think that Heikki's approach is by far the more tenable
one, so far as dblink is concerned.  Perhaps the substitute-malloc idea
is still useful for some other application, but I'm inclined to put that
idea on the back burner until we have a concrete use case for it.
        regards, tom lane


Re: Allow substitute allocators for PGresult.

От
Matteo Beccati
Дата:
On 12/11/2011 07:36, Robert Haas wrote:
> On Sat, Nov 12, 2011 at 12:48 AM, Tom Lane<tgl@sss.pgh.pa.us>  wrote:
>> AIUI Kyotaro-san is just suggesting that the app should be able to
>> provide a substitute malloc function for use in allocating PGresult
>> space (and not, I think, anything else that libpq allocates internally).
>> Basically this would allow PGresults to be cleaned up with methods other
>> than calling PQclear on each one.  It wouldn't affect how you'd interact
>> with one while you had it.  That seems like pretty much exactly what we
>> want for preventing memory leaks in the backend; but is it going to be
>> useful for other apps?
>
> I think it will.

Maybe I've just talking nonsense, I just have little experience hacking 
the pgsql and pdo-pgsql exstensions, but to me it would seem something 
that could easily avoid an extra duplication of the data returned by 
pqgetvalue. To me it seems a pretty nice win.


Cheers
-- 
Matteo Beccati

Development & Consulting - http://www.beccati.com/


Re: Allow substitute allocators for PGresult.

От
Stephen Frost
Дата:
* Tom Lane (tgl@sss.pgh.pa.us) wrote:
> Well, loading data in a form whereby the application can access it
> without going through the PGresult accessor functions would be an
> entirely different (and vastly larger) project.

Looking through the thread, I agree that it's a different thing than
what's being discussed here.

> I'm not sure I want
> to open that can of worms --- it seems like you could write a huge
> amount of code trying to provide every format someone might want,
> and still find that there were impedance mismatches for many
> applications.

The OCI approach is actually very similar to how we handle our
catalogs internally..  Imagine you define a C struct which matched your
table structure, then you allocate 5000 (or however) of those, give the
base pointer to the 'getResult' call and a integer array of offsets into
that structure for each of the columns.  There might have been a few
other minor things (like some notion of how to handle NULLs), but it was
pretty straight-forward from the C perspective, imv.

Trying to provide alternative formats (I'm guessing you were referring
to something like XML..?  Or some complex structure?) would certainly be
a whole different ballgame.
Thanks,
    Stephen

> AIUI Kyotaro-san is just suggesting that the app should be able to
> provide a substitute malloc function for use in allocating PGresult
> space (and not, I think, anything else that libpq allocates internally).
> Basically this would allow PGresults to be cleaned up with methods other
> than calling PQclear on each one.  It wouldn't affect how you'd interact
> with one while you had it.  That seems like pretty much exactly what we
> want for preventing memory leaks in the backend; but is it going to be
> useful for other apps?
>
>             regards, tom lane

Re: Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Hello,

At Fri, 11 Nov 2011 11:29:30 +0200, Heikki Linnakangas wrote
> You could use the resource owner mechanism to track
> them. Register a callback function with
> RegisterResourceReleaseCallback().

Thank you for letting me know about it. I have dug up a message
in pg-hackers refering to the mechanism on discussion about
postgresql-fdw. I'll put further thought into dblink-plus taking
it into account.


By the way, thinking about memory management for the result in
libpq is considerable as another issue.

At Sat, 12 Nov 2011 12:29:50 -0500, Tom Lane wrote
> To start with, the patch proposes exposing some global
> variables that affect the behavior of libpq process-wide.  This
> seems like a pretty bad design, because a single process could
> contain multiple usages of libpq

You're right to say the design is bad. I've designed it to have
minimal impact on libpq by limiting usage and imposing any
reponsibility on the users, that is the developers of the modules
using it. If there are any other applications that want to use
their own allocators, there are some points to be considered.

I think it is preferable consiering multi-threading to make libpq
write PGresult into memory blocks passed from the application
like OCI does, instead of letting libpq itself make request for
them.

This approach hands the responsibility of memory management to
the user and gives them the capability to avoid memory exhaustion
by their own measures.

On the other hand, this way could produce the situation that
libpq cannot write all of the data to receive from the server
onto handed memory block. So, the API must be able to return the
partial data to the caller.

More advancing, if libpq could store the result directly into
user-allocated memory space using tuplestore-like interface, it
is better on performance if the final storage is a tuplestore
itself.

I will be happy with the part-by-part passing of result. So I
will think about this as the next issue.


> So I'd feel happier about the patch if it came along with a
> patch to make dblink.c use it to prevent memory leaks.

I take it is about my original patch.

Mmm, I heard that dblink copies received data in PGResult to
tuple store not only because of the memory leak, but less memory
usage (after the copy is finished). I think I could show you the
patch ignoring the latter, but it might take some time for me to
start from understand dblink and tuplestore closely...


If I find RegisterResourceReleaseCallback short for our
requirement, I will show it. If not, I withdraw this patch for
ongoing CF and propose another patch based on the discussion
above at another time.


Please let me have a little more time.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


Re: Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Hello, 

me> I'll put further thought into dblink-plus taking it into
me> account.
..
me> Please let me have a little more time.

I've inquired the developer of dblink-plus about
RegisterResourceReleaseCallback(). He said that the function is
in bad compatibility with current implementation. In addition to
this, storing into tuplestore directly seems to me a good idea
than palloc'ed PGresult.

So I tried to make libpq/PGresult be able to handle alternative
tuple store by hinting to PGconn, and modify dblink to use the
mechanism as the first sample code.

I will show it as a series of patches in next message.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


Re: Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Hello, This is the next version of Allow substitute allocators
for PGresult.

Totally chaning the concept from the previous one, this patch
allows libpq to handle alternative tuple store for received
tuples.

Design guidelines are shown below.
- No need to modify existing client code of libpq.
- Existing libpq client runs with roughly same performance, and  dblink with modification runs faster to some extent
and requires less memory.
 

I have measured roughly of run time and memory requirement for
three configurations on CentOS6 on Vbox with 2GB mem 4 cores
running on Win7-Corei7, transferring (30 bytes * 2 cols) *
2000000 tuples (120MB net) within this virutal machine. The
results are below.
                      xfer time    Peak RSS
Original                    : 6.02s        850MB
libpq patch + Original dblink    : 6.11s        850MB
full patch                       : 4.44s        643MB

xfer time here is the mean of five 'real time's measured by
running sql script like this after warmup run.

=== test.sql
select dblink_connect('c', 'host=localhost port=5432 dbname=test');
select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b bytea) limit 1;

select dblink_disconnect('c');
===
$  for i in $(seq 1 10); do time psql test -f t.sql; done 
===

Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps.


It seems somewhat slow using patched libpq and original dblink,
but it seems within error range too. If this amount of slowdown
is not permissible, it might be improved by restoring the static
call route before for extra redundancy of the code.

On the other hand, full patch version seems obviously fast and
requires less memory. Isn't it nice?

This patch consists of two sub patches.

The first is a patch for libpq to allow rewiring tuple storage
mechanism. But default behavior is not changed. Existing libpq
client should run with it.

The second is modify dblink to storing received tuples into
tuplestore directly using the mechanism above.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index a360d78..1af8df6 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,7 +160,3 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
-PQregisterTupleAdder      161
-PQgetAsCstring          162
-PQgetAddTupleParam      163
-PQsetAddTupleErrMes      164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 437be26..50f3f83 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,7 +2692,6 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
-    conn->addTupleFunc = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5065,10 +5064,3 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
-
-void
-PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param)
-{
-    conn->addTupleFunc = func;
-    conn->addTupleFuncParam = param;
-}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index c8ec9bd..113aab0 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -48,6 +48,7 @@ char       *const pgresStatus[] = {static int    static_client_encoding = PG_SQL_ASCII;static bool
static_std_strings= false;
 
+static PGEvent *dupEvents(PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static int
PQsendQueryGuts(PGconn*conn,
 
@@ -65,9 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
-static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func,
-                                   int id, size_t len);
-static void *pqAddTuple(PGresult *res, PGresAttValue *tup);
+/* ---------------- * Space management for PGresult.
@@ -161,9 +160,6 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
-    result->addTupleFunc = pqDefaultAddTupleFunc;
-    result->addTupleFuncParam = NULL;
-    result->addTupleFuncErrMes = NULL;    if (conn)    {
@@ -198,12 +194,6 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
-
-        if (conn->addTupleFunc)
-        {
-            result->addTupleFunc = conn->addTupleFunc;
-            result->addTupleFuncParam = conn->addTupleFuncParam;
-        }    }    else    {
@@ -497,33 +487,6 @@ PQresultAlloc(PGresult *res, size_t nBytes)    return pqResultAlloc(res, nBytes, TRUE);}
-void *
-pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len)
-{
-    void *p;
-
-    switch (func)
-    {
-        case ADDTUP_ALLOC_TEXT:
-            return pqResultAlloc(res, len, TRUE);
-
-        case ADDTUP_ALLOC_BINARY:
-            p = pqResultAlloc(res, len, FALSE);
-
-            if (id == -1)
-                res->addTupleFuncParam = p;
-
-            return p;
-
-        case ADDTUP_ADD_TUPLE:
-            return pqAddTuple(res, res->addTupleFuncParam);
-
-        default:
-            /* Ignore */
-            break;
-    }
-    return NULL;
-}/* * pqResultAlloc - *        Allocate subsidiary storage for a PGresult.
@@ -867,9 +830,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple *      add a row
pointerto the PGresult structure, growing it if necessary
 
- *      Returns tup if OK, NULL if not enough memory to add the row.
+ *      Returns TRUE if OK, FALSE if not enough memory to add the row */
-static void *
+intpqAddTuple(PGresult *res, PGresAttValue *tup){    if (res->ntups >= res->tupArrSize)
@@ -895,13 +858,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)            newTuples = (PGresAttValue **)
    realloc(res->tuples, newSize * sizeof(PGresAttValue *));        if (!newTuples)
 
-            return NULL;        /* malloc or realloc failed */
+            return FALSE;        /* malloc or realloc failed */        res->tupArrSize = newSize;        res->tuples =
newTuples;   }    res->tuples[res->ntups] = tup;    res->ntups++;
 
-    return tup;
+    return TRUE;}/*
@@ -2859,43 +2822,6 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
-/* PQgetAsCString
- *    returns the field as C string.
- */
-char *
-PQgetAsCstring(PGresAttValue *attval)
-{
-    return attval->len == NULL_LEN ? NULL : attval->value;
-}
-
-/* PQgetAddTupleParam
- *    Get the pointer to the contextual parameter from PGresult which is
- *    registered to PGconn by PQregisterTupleAdder
- */
-void *
-PQgetAddTupleParam(const PGresult *res)
-{
-    if (!res)
-        return NULL;
-    return res->addTupleFuncParam;
-}
-
-/* PQsetAddTupleErrMes
- *    Set the error message pass back to the caller of addTupleFunc
- *  mes must be a malloc'ed memory block and it is released by the
- *  caller of addTupleFunc if set.
- *  You can replace the previous message by alternative mes, or clear
- *  it with NULL.
- */
-void
-PQsetAddTupleErrMes(PGresult *res, char *mes)
-{
-    /* Free existing message */
-    if (res->addTupleFuncErrMes)
-        free(res->addTupleFuncErrMes);
-    res->addTupleFuncErrMes = mes;
-}
-/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index c7f74ae..77c4d5a 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,10 +733,9 @@ getAnotherTuple(PGconn *conn, bool binary)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
-                                 nfields * sizeof(PGresAttValue));
+            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);        if (conn->curTuple == NULL)
-            goto addTupleError;
+            goto outOfMemory;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));        /*
@@ -758,7 +757,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto addTupleError;
+            goto outOfMemory;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -788,12 +787,9 @@ getAnotherTuple(PGconn *conn, bool binary)                vlen = 0;            if (tup[i].value ==
NULL)           {
 
-                AddTupFunc func =
-                    (binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
-                tup[i].value =
-                    (char *) result->addTupleFunc(result, func, i, vlen + 1);
+                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);                if (tup[i].value ==
NULL)
-                    goto addTupleError;
+                    goto outOfMemory;            }            tup[i].len = vlen;            /* read in the value */
@@ -816,9 +812,8 @@ getAnotherTuple(PGconn *conn, bool binary)    }    /* Success!  Store the completed tuple in the
result*/
 
-    if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
-        goto addTupleError;
-
+    if (!pqAddTuple(result, tup))
+        goto outOfMemory;    /* and reset for a new message */    conn->curTuple = NULL;
@@ -826,7 +821,7 @@ getAnotherTuple(PGconn *conn, bool binary)        free(bitmap);    return 0;
-addTupleError:
+outOfMemory:    /* Replace partially constructed result with an error result */    /*
@@ -834,21 +829,8 @@ addTupleError:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    resetPQExpBuffer(&conn->errorMessage);
-
-    /*
-     * If error message is passed from addTupleFunc, set it into
-     * PGconn, assume out of memory if not.
-     */
-    appendPQExpBufferStr(&conn->errorMessage,
-                         libpq_gettext(result->addTupleFuncErrMes ?
-                                       result->addTupleFuncErrMes :
-                                       "out of memory for query result\n"));
-    if (result->addTupleFuncErrMes)
-    {
-        free(result->addTupleFuncErrMes);
-        result->addTupleFuncErrMes = NULL;
-    }
+    printfPQExpBuffer(&conn->errorMessage,
+                      libpq_gettext("out of memory for query result\n"));    /*     * XXX: if PQmakeEmptyPGresult()
fails,there's probably not much we can
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index d14b57a..45a84d8 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,10 +634,9 @@ getAnotherTuple(PGconn *conn, int msgLength)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
-                                 nfields * sizeof(PGresAttValue));
+            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);        if (conn->curTuple == NULL)
-            goto addTupleError;
+            goto outOfMemory;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));    }    tup =
conn->curTuple;
@@ -674,12 +673,11 @@ getAnotherTuple(PGconn *conn, int msgLength)            vlen = 0;        if (tup[i].value ==
NULL)       {
 
-            AddTupFunc func = (result->attDescs[i].format != 0 ?
-                               ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
-            tup[i].value =
-                (char *) result->addTupleFunc(result, func, i, vlen + 1);
+            bool        isbinary = (result->attDescs[i].format != 0);
+
+            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);            if (tup[i].value == NULL)
-                goto addTupleError;
+                goto outOfMemory;        }        tup[i].len = vlen;        /* read in the value */
@@ -691,36 +689,22 @@ getAnotherTuple(PGconn *conn, int msgLength)    }    /* Success!  Store the completed tuple in
theresult */
 
-    if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
-        goto addTupleError;
-
+    if (!pqAddTuple(result, tup))
+        goto outOfMemory;    /* and reset for a new message */    conn->curTuple = NULL;    return 0;
-addTupleError:
+outOfMemory:    /*     * Replace partially constructed result with an error result. First     * discard the old result
totry to win back some memory.     */    pqClearAsyncResult(conn);
 
-    resetPQExpBuffer(&conn->errorMessage);
-
-    /*
-     * If error message is passed from addTupleFunc, set it into
-     * PGconn, assume out of memory if not.
-     */
-    appendPQExpBufferStr(&conn->errorMessage,
-                         libpq_gettext(result->addTupleFuncErrMes ?
-                                       result->addTupleFuncErrMes : 
-                                       "out of memory for query result\n"));
-    if (result->addTupleFuncErrMes)
-    {
-        free(result->addTupleFuncErrMes);
-        result->addTupleFuncErrMes = NULL;
-    }
+    printfPQExpBuffer(&conn->errorMessage,
+                      libpq_gettext("out of memory for query result\n"));    pqSaveErrorResult(conn);    /* Discard
thefailed message by pretending we read it */
 
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index bdce294..d13a5b9 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,16 +116,6 @@ typedef enum    PQPING_NO_ATTEMPT            /* connection not attempted (bad params) */} PGPing;
-/* AddTupFunc is one of the parameters of addTupleFunc that decides
- * the function of the addTupleFunction. See addTupleFunction for
- * details */
-typedef enum 
-{
-    ADDTUP_ALLOC_TEXT,          /* Returns non-aligned memory for text value */
-    ADDTUP_ALLOC_BINARY,        /* Returns aligned memory for binary value */
-    ADDTUP_ADD_TUPLE            /* Adds tuple data into tuple storage */
-} AddTupFunc;
-/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to
applications.*/
 
@@ -235,12 +225,6 @@ typedef struct pgresAttDesc    int            atttypmod;        /* type-specific modifier info */}
PGresAttDesc;
-typedef struct pgresAttValue
-{
-    int            len;            /* length in bytes of the value */
-    char       *value;            /* actual value, plus terminating zero byte */
-} PGresAttValue;
-/* ---------------- * Exported functions of libpq * ----------------
@@ -432,52 +416,6 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
-/*
- * Typedef for tuple storage function.
- *
- * This function pointer is used for tuple storage function in
- * PGresult and PGconn.
- *
- * addTupleFunction is called for four types of function designated by
- * the enum AddTupFunc.
- *
- * id is the identifier for allocated memory block. The caller sets -1
- * for PGresAttValue array, and 0 to number of cols - 1 for each
- * column.
- *
- * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text
- * value which may not be alingned to the word boundary.
- *
- * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a
- * binary value which is aligned to the word boundary.
- *
- * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and
- * free the memory blocks allocated by this function if necessary.
- * id and size are ignored.
- *
- * This function must return non-NULL value for success and must
- * return NULL for failure and may set error message by
- * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out
- * of memory if the error message is NULL on failure. This function is
- * assumed not to throw any exception.
- */
-    typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func,
-                                      int id, size_t size);
-
-/*
- * Register alternative tuple storage function to PGconn.
- * 
- * By registering this function, pg_result disables its own tuple
- * storage and calls it to append rows one by one.
- *
- * func is tuple store function. See addTupleFunction.
- * 
- * addTupFuncParam is contextual storage that can be get with
- * PQgetAddTupleParam in func.
- */
-extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func,
-                                 void *addTupFuncParam);
-/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -516,9 +454,6 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
-extern char *PQgetAsCstring(PGresAttValue *attdesc);
-extern void *PQgetAddTupleParam(const PGresult *res);
-extern void    PQsetAddTupleErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern Oid
PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 45e4c93..64dfcb2 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,6 +134,12 @@ typedef struct pgresParamDesc#define NULL_LEN        (-1)    /* pg_result len for NULL value */
+typedef struct pgresAttValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, plus terminating zero byte */
+} PGresAttValue;
+/* Typedef for message-field list entries */typedef struct pgMessageField{
@@ -203,11 +209,6 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
-
-    addTupleFunction addTupleFunc; /* Tuple storage function. See
-                                    * addTupleFunction for details. */
-    void *addTupleFuncParam;       /* Contextual parameter for addTupleFunc */
-    char *addTupleFuncErrMes;      /* Error message returned from addTupFunc */};/* PGAsyncStatusType defines the
stateof the query-execution state machine */
 
@@ -442,13 +443,6 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
-
-    /* Tuple store function. The two fields below is copied to newly
-     * created PGresult if addTupleFunc is not NULL. Use default
-     * function if addTupleFunc is NULL. */
-    addTupleFunction addTupleFunc; /* Tuple storage function. See
-                                    * addTupleFunction for details. */
-    void *addTupleFuncParam;       /* Contextual parameter for addTupFunc */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -513,6 +507,7 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
+extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code,
               const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, 
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 62c810a..fb2e10e 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    AttInMetadata *attinmeta;
+    MemoryContext oldcontext;
+    char *attrvalbuf;
+    void **valbuf;
+    size_t *valbufsize;
+    bool error_occurred;
+    bool nummismatch;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterTupleAdder(conn, addTuple, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* This is only for backward compatibility */
+        if (storeinfo.nummismatch)
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));        dblink_res_error(conname, res, "could not
fetchfrom cursor", fail);        return (Datum) 0;    }
 
@@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS)                 errmsg("cursor \"%s\" does not exist", curname)));
}
 
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterTupleAdder(conn, addTuple, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* This is only for backward compatibility */
+            if (storeinfo.nummismatch)
+            {
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
-
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+    sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbufsize[i] = 0;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    /* Preallocate memory of same size with PGresAttDesc array for values. */
+    sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        if (sinfo->valbuf[i])
+        {
+            free(sinfo->valbuf[i]);
+            sinfo->valbuf[i] = NULL;        }
+    }
+    if (sinfo->attrvalbuf)
+        free(sinfo->attrvalbuf);
+    sinfo->attrvalbuf = NULL;
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+static void *
+addTuple(PGresult *res, AddTupFunc  func, int id, size_t size)
+{
+    storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res);
+    HeapTuple    tuple;
+    int fields = PQnfields(res);
+    int i;
+    PGresAttValue *attval;
+    char        **cstrs;
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->error_occurred)
+        return NULL;
-            values = (char **) palloc(nfields * sizeof(char *));
+    switch (func)
+    {
+        case ADDTUP_ALLOC_TEXT:
+        case ADDTUP_ALLOC_BINARY:
+            if (id == -1)
+                return sinfo->attrvalbuf;
+
+            if (id < 0 || id >= sinfo->nattrs)
+                return NULL;
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
+            if (sinfo->valbufsize[id] < size)            {
-                HeapTuple    tuple;
+                if (sinfo->valbuf[id] == NULL)
+                    sinfo->valbuf[id] = malloc(size);
+                else
+                    sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+                sinfo->valbufsize[id] = size;
+            }
+            return sinfo->valbuf[id];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+        case ADDTUP_ADD_TUPLE:
+            break;   /* Go through */
+        default:
+            /* Ignore */
+            break;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+        PQsetAddTupleErrMes(res,
+                            strdup("function returning record called in "
+                                   "context that cannot accept type record"));
+        return NULL;
+    }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+    /*
+     * Rewrite PGresAttDesc[] to char(*)[] in-place.
+     */
+    Assert(sizeof(char*) <= sizeof(PGresAttValue));
+    attval = (PGresAttValue *)sinfo->attrvalbuf;
+    cstrs   = (char **)sinfo->attrvalbuf;
+    for(i = 0 ; i < fields ; i++)
+        cstrs[i] = PQgetAsCstring(attval++);
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        /*
+         * Return the error message in the exception to the caller and
+         * cancel the exception.
+         */
+        ErrorData *edata;
+
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+
+        finishStoreInfo(sinfo);
+
+        edata = CopyErrorData();
+        FlushErrorState();
+
+        PQsetAddTupleErrMes(res, strdup(edata->message));
+        return NULL;    }    PG_END_TRY();
+
+    return sinfo->attrvalbuf;}/*

Re: Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Ouch! I'm sorry for making a reverse patch for the first modification.

This is an amendment of the message below. The body text is
copied into this message.

http://archives.postgresql.org/message-id/20111201.192419.103527179.horiguchi.kyotaro@oss.ntt.co.jp

=======
Hello, This is the next version of Allow substitute allocators
for PGresult.

Totally chaning the concept from the previous one, this patch
allows libpq to handle alternative tuple store for received
tuples.

Design guidelines are shown below.
- No need to modify existing client code of libpq.
- Existing libpq client runs with roughly same performance, and  dblink with modification runs faster to some extent
and requires less memory.
 

I have measured roughly of run time and memory requirement for
three configurations on CentOS6 on Vbox with 2GB mem 4 cores
running on Win7-Corei7, transferring (30 bytes * 2 cols) *
2000000 tuples (120MB net) within this virutal machine. The
results are below.
                      xfer time    Peak RSS
Original                    : 6.02s        850MB
libpq patch + Original dblink    : 6.11s        850MB
full patch                       : 4.44s        643MB

xfer time here is the mean of five 'real time's measured by
running sql script like this after warmup run.

=== test.sql
select dblink_connect('c', 'host=localhost port=5432 dbname=test');
select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b bytea) limit 1;

select dblink_disconnect('c');
===
$  for i in $(seq 1 10); do time psql test -f t.sql; done 
===

Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps.


It seems somewhat slow using patched libpq and original dblink,
but it seems within error range too. If this amount of slowdown
is not permissible, it might be improved by restoring the static
call route before for extra redundancy of the code.

On the other hand, full patch version seems obviously fast and
requires less memory. Isn't it nice?

This patch consists of two sub patches.

The first is a patch for libpq to allow rewiring tuple storage
mechanism. But default behavior is not changed. Existing libpq
client should run with it.

The second is modify dblink to storing received tuples into
tuplestore directly using the mechanism above.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..a360d78 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,7 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterTupleAdder      161
+PQgetAsCstring          162
+PQgetAddTupleParam      163
+PQsetAddTupleErrMes      164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 50f3f83..437be26 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->addTupleFunc = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5064,3 +5065,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param)
+{
+    conn->addTupleFunc = func;
+    conn->addTupleFuncParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 113aab0..c8ec9bd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -48,7 +48,6 @@ char       *const pgresStatus[] = {static int    static_client_encoding = PG_SQL_ASCII;static bool
static_std_strings= false;
 
-static PGEvent *dupEvents(PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static int
PQsendQueryGuts(PGconn*conn,
 
@@ -66,7 +65,9 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
-
+static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func,
+                                   int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);/* ---------------- * Space management for PGresult.
@@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->addTupleFunc = pqDefaultAddTupleFunc;
+    result->addTupleFuncParam = NULL;
+    result->addTupleFuncErrMes = NULL;    if (conn)    {
@@ -194,6 +198,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        if (conn->addTupleFunc)
+        {
+            result->addTupleFunc = conn->addTupleFunc;
+            result->addTupleFuncParam = conn->addTupleFuncParam;
+        }    }    else    {
@@ -487,6 +497,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)    return pqResultAlloc(res, nBytes, TRUE);}
+void *
+pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len)
+{
+    void *p;
+
+    switch (func)
+    {
+        case ADDTUP_ALLOC_TEXT:
+            return pqResultAlloc(res, len, TRUE);
+
+        case ADDTUP_ALLOC_BINARY:
+            p = pqResultAlloc(res, len, FALSE);
+
+            if (id == -1)
+                res->addTupleFuncParam = p;
+
+            return p;
+
+        case ADDTUP_ADD_TUPLE:
+            return pqAddTuple(res, res->addTupleFuncParam);
+
+        default:
+            /* Ignore */
+            break;
+    }
+    return NULL;
+}/* * pqResultAlloc - *        Allocate subsidiary storage for a PGresult.
@@ -830,9 +867,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple *      add a row
pointerto the PGresult structure, growing it if necessary
 
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      Returns tup if OK, NULL if not enough memory to add the row. */
-int
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup){    if (res->ntups >= res->tupArrSize)
@@ -858,13 +895,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)            newTuples = (PGresAttValue **)
    realloc(res->tuples, newSize * sizeof(PGresAttValue *));        if (!newTuples)
 
-            return FALSE;        /* malloc or realloc failed */
+            return NULL;        /* malloc or realloc failed */        res->tupArrSize = newSize;        res->tuples =
newTuples;   }    res->tuples[res->ntups] = tup;    res->ntups++;
 
-    return TRUE;
+    return tup;}/*
@@ -2822,6 +2859,43 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQgetAsCString
+ *    returns the field as C string.
+ */
+char *
+PQgetAsCstring(PGresAttValue *attval)
+{
+    return attval->len == NULL_LEN ? NULL : attval->value;
+}
+
+/* PQgetAddTupleParam
+ *    Get the pointer to the contextual parameter from PGresult which is
+ *    registered to PGconn by PQregisterTupleAdder
+ */
+void *
+PQgetAddTupleParam(const PGresult *res)
+{
+    if (!res)
+        return NULL;
+    return res->addTupleFuncParam;
+}
+
+/* PQsetAddTupleErrMes
+ *    Set the error message pass back to the caller of addTupleFunc
+ *  mes must be a malloc'ed memory block and it is released by the
+ *  caller of addTupleFunc if set.
+ *  You can replace the previous message by alternative mes, or clear
+ *  it with NULL.
+ */
+void
+PQsetAddTupleErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->addTupleFuncErrMes)
+        free(res->addTupleFuncErrMes);
+    res->addTupleFuncErrMes = mes;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 77c4d5a..c7f74ae 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));        /*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto addTupleError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)                vlen = 0;            if (tup[i].value ==
NULL)           {
 
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+                AddTupFunc func =
+                    (binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
+                tup[i].value =
+                    (char *) result->addTupleFunc(result, func, i, vlen + 1);                if (tup[i].value ==
NULL)
-                    goto outOfMemory;
+                    goto addTupleError;            }            tup[i].len = vlen;            /* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)    }    /* Success!  Store the completed tuple in the
result*/
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+    /* and reset for a new message */    conn->curTuple = NULL;
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)        free(bitmap);    return 0;
-outOfMemory:
+addTupleError:    /* Replace partially constructed result with an error result */    /*
@@ -829,8 +834,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->addTupleFuncErrMes ?
+                                       result->addTupleFuncErrMes :
+                                       "out of memory for query result\n"));
+    if (result->addTupleFuncErrMes)
+    {
+        free(result->addTupleFuncErrMes);
+        result->addTupleFuncErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 45a84d8..d14b57a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));    }    tup =
conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)            vlen = 0;        if (tup[i].value ==
NULL)       {
 
-            bool        isbinary = (result->attDescs[i].format != 0);
-
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+            AddTupFunc func = (result->attDescs[i].format != 0 ?
+                               ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT);
+            tup[i].value =
+                (char *) result->addTupleFunc(result, func, i, vlen + 1);            if (tup[i].value == NULL)
-                goto outOfMemory;
+                goto addTupleError;        }        tup[i].len = vlen;        /* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)    }    /* Success!  Store the completed tuple in
theresult */
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+    /* and reset for a new message */    conn->curTuple = NULL;    return 0;
-outOfMemory:
+addTupleError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->addTupleFuncErrMes ?
+                                       result->addTupleFuncErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->addTupleFuncErrMes)
+    {
+        free(result->addTupleFuncErrMes);
+        result->addTupleFuncErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index d13a5b9..bdce294 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum    PQPING_NO_ATTEMPT            /* connection not attempted (bad params) */} PGPing;
+/* AddTupFunc is one of the parameters of addTupleFunc that decides
+ * the function of the addTupleFunction. See addTupleFunction for
+ * details */
+typedef enum 
+{
+    ADDTUP_ALLOC_TEXT,          /* Returns non-aligned memory for text value */
+    ADDTUP_ALLOC_BINARY,        /* Returns aligned memory for binary value */
+    ADDTUP_ADD_TUPLE            /* Adds tuple data into tuple storage */
+} AddTupFunc;
+/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to
applications.*/
 
@@ -225,6 +235,12 @@ typedef struct pgresAttDesc    int            atttypmod;        /* type-specific modifier info */}
PGresAttDesc;
+typedef struct pgresAttValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, plus terminating zero byte */
+} PGresAttValue;
+/* ---------------- * Exported functions of libpq * ----------------
@@ -416,6 +432,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for tuple storage function.
+ *
+ * This function pointer is used for tuple storage function in
+ * PGresult and PGconn.
+ *
+ * addTupleFunction is called for four types of function designated by
+ * the enum AddTupFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a
+ * binary value which is aligned to the word boundary.
+ *
+ * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and
+ * free the memory blocks allocated by this function if necessary.
+ * id and size are ignored.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out
+ * of memory if the error message is NULL on failure. This function is
+ * assumed not to throw any exception.
+ */
+    typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func,
+                                      int id, size_t size);
+
+/*
+ * Register alternative tuple storage function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own tuple
+ * storage and calls it to append rows one by one.
+ *
+ * func is tuple store function. See addTupleFunction.
+ * 
+ * addTupFuncParam is contextual storage that can be get with
+ * PQgetAddTupleParam in func.
+ */
+extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func,
+                                 void *addTupFuncParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +516,9 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern char *PQgetAsCstring(PGresAttValue *attdesc);
+extern void *PQgetAddTupleParam(const PGresult *res);
+extern void    PQsetAddTupleErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern Oid
PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 64dfcb2..45e4c93 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN        (-1)    /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
-    int            len;            /* length in bytes of the value */
-    char       *value;            /* actual value, plus terminating zero byte */
-} PGresAttValue;
-/* Typedef for message-field list entries */typedef struct pgMessageField{
@@ -209,6 +203,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    addTupleFunction addTupleFunc; /* Tuple storage function. See
+                                    * addTupleFunction for details. */
+    void *addTupleFuncParam;       /* Contextual parameter for addTupleFunc */
+    char *addTupleFuncErrMes;      /* Error message returned from addTupFunc */};/* PGAsyncStatusType defines the
stateof the query-execution state machine */
 
@@ -443,6 +442,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store function. The two fields below is copied to newly
+     * created PGresult if addTupleFunc is not NULL. Use default
+     * function if addTupleFunc is NULL. */
+    addTupleFunction addTupleFunc; /* Tuple storage function. See
+                                    * addTupleFunction for details. */
+    void *addTupleFuncParam;       /* Contextual parameter for addTupFunc */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code,
               const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, 
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 62c810a..fb2e10e 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    AttInMetadata *attinmeta;
+    MemoryContext oldcontext;
+    char *attrvalbuf;
+    void **valbuf;
+    size_t *valbufsize;
+    bool error_occurred;
+    bool nummismatch;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterTupleAdder(conn, addTuple, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* This is only for backward compatibility */
+        if (storeinfo.nummismatch)
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));        dblink_res_error(conname, res, "could not
fetchfrom cursor", fail);        return (Datum) 0;    }
 
@@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS)                 errmsg("cursor \"%s\" does not exist", curname)));
}
 
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterTupleAdder(conn, addTuple, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* This is only for backward compatibility */
+            if (storeinfo.nummismatch)
+            {
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
-
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+    sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbufsize[i] = 0;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    /* Preallocate memory of same size with PGresAttDesc array for values. */
+    sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        if (sinfo->valbuf[i])
+        {
+            free(sinfo->valbuf[i]);
+            sinfo->valbuf[i] = NULL;        }
+    }
+    if (sinfo->attrvalbuf)
+        free(sinfo->attrvalbuf);
+    sinfo->attrvalbuf = NULL;
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+static void *
+addTuple(PGresult *res, AddTupFunc  func, int id, size_t size)
+{
+    storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res);
+    HeapTuple    tuple;
+    int fields = PQnfields(res);
+    int i;
+    PGresAttValue *attval;
+    char        **cstrs;
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->error_occurred)
+        return NULL;
-            values = (char **) palloc(nfields * sizeof(char *));
+    switch (func)
+    {
+        case ADDTUP_ALLOC_TEXT:
+        case ADDTUP_ALLOC_BINARY:
+            if (id == -1)
+                return sinfo->attrvalbuf;
+
+            if (id < 0 || id >= sinfo->nattrs)
+                return NULL;
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
+            if (sinfo->valbufsize[id] < size)            {
-                HeapTuple    tuple;
+                if (sinfo->valbuf[id] == NULL)
+                    sinfo->valbuf[id] = malloc(size);
+                else
+                    sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+                sinfo->valbufsize[id] = size;
+            }
+            return sinfo->valbuf[id];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+        case ADDTUP_ADD_TUPLE:
+            break;   /* Go through */
+        default:
+            /* Ignore */
+            break;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+        PQsetAddTupleErrMes(res,
+                            strdup("function returning record called in "
+                                   "context that cannot accept type record"));
+        return NULL;
+    }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+    /*
+     * Rewrite PGresAttDesc[] to char(*)[] in-place.
+     */
+    Assert(sizeof(char*) <= sizeof(PGresAttValue));
+    attval = (PGresAttValue *)sinfo->attrvalbuf;
+    cstrs   = (char **)sinfo->attrvalbuf;
+    for(i = 0 ; i < fields ; i++)
+        cstrs[i] = PQgetAsCstring(attval++);
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        /*
+         * Return the error message in the exception to the caller and
+         * cancel the exception.
+         */
+        ErrorData *edata;
+
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+
+        finishStoreInfo(sinfo);
+
+        edata = CopyErrorData();
+        FlushErrorState();
+
+        PQsetAddTupleErrMes(res, strdup(edata->message));
+        return NULL;    }    PG_END_TRY();
+
+    return sinfo->attrvalbuf;}/*

Re: Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Hello,
The documentation had slipped my mind.
This is the patch to add the documentation of PGresult customstorage. It shows in section '31.19. Alternative
resultstorage'.
regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 252ff8c..dc2acb6 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7229,6 +7229,325 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-alterstorage">
+  <title>Alternative result storage</title>
+
+  <indexterm zone="libpq-alterstorage">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to store them into your own storage instead of PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registertupleadder">
+    <term>
+     <function>PQregisterTupleAdder</function>
+     <indexterm>
+      <primary>PQregisterTupleAdder</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a function to allocate memory for each tuple and column
+       values, and add the completed tuple into your storage.
+<synopsis>
+void PQregisterTupleAdder(PGconn *conn,
+                          addTupleFunction func,
+                          void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the tuple adder
+           function. PGresult created from this connection calles
+           this function to store the result tuples instead of
+           storing into its internal storage.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Tuple adder function to set. NULL means to use the
+           default storage.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-addtuplefunction">
+    <term>
+     <type>addTupleFunction</type>
+     <indexterm>
+      <primary>addTupleFunction</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the callback function to serve memory blocks for
+       each tuple and its column values, and to add the constructed
+       tuple into your own storage.
+<synopsis>
+typedef enum 
+{
+  ADDTUP_ALLOC_TEXT,
+  ADDTUP_ALLOC_BINARY,
+  ADDTUP_ADD_TUPLE
+} AddTupFunc;
+
+void *(*addTupleFunction)(PGresult *res,
+                          AddTupFunc func,
+                          int id,
+                          size_t size);
+</synopsis>
+     </para>
+
+     <para>
+       Generally this function must return NULL for failure and should
+       set the error message
+       with <function>PGsetAddTupleErrMes</function> if the cause is
+       other than out of memory. This funcion must not throw any
+       exception. This function is called in the sequence following.
+
+       <itemizedlist spacing="compact">
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>ADDTUP_ALLOC_BINARY</firstterm>
+       and <parameter>id</parameter> = -1 to request the memory
+       for tuple used as an array
+       of <type>PGresAttValue</type> </simpara>
+     </listitem>
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>ADDTUP_ALLOC_TEXT</firstterm>
+       or <firstterm>ADDTUP_ALLOC_TEXT</firstterm>
+       and <parameter>id</parameter> is zero or positive number
+       to request the memory for each column value in current
+       tuple.</simpara>
+     </listitem>
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>ADDTUP_ADD_TUPLE</firstterm> to request the
+       constructed tuple to store.</simpara>
+     </listitem>
+       </itemizedlist>
+     </para>
+     <para>
+       Calling <type>addTupleFunction</type>
+       with <parameter>func</parameter> =
+       <firstterm>ADDTUP_ALLOC_TEXT</firstterm> is telling to return a
+        memory block with at least <parameter>size</parameter> bytes
+        which may not be aligned to the word boundary.
+       <parameter>id</parameter> is a zero or positive number
+       distinguishes the usage of requested memory block, that is the
+       position of the column for which the memory block is used.
+     </para>
+     <para>
+       When <parameter>func</parameter>
+       = <firstterm>ADDTUP_ALLOC_BINARY</firstterm>, this function is
+       telled to return a memory block with at
+       least <parameter>size</parameter> bytes which is aligned to the
+       word boundary.
+       <parameter>id</parameter> is the identifier distinguishes the
+       usage of requested memory block. -1 means that it is used as an
+       array of <type>PGresAttValue</type> to store the tuple. Zero or
+       positive numbers have the same meanings as for
+       <firstterm>ADDTUP_ALLOC_BINARY</firstterm>.
+     </para>
+     <para>When <parameter>func</parameter>
+       = <firstterm>ADDTUP_ADD_TUPLE</firstterm>, this function is
+       telled to store the <type>PGresAttValue</type> structure
+       constructed by the caller into your storage. The pointer to the
+       tuple structure is not passed so you should memorize the
+       pointer to the memory block passed the caller on
+       <parameter>func</parameter>
+       = <parameter>ADDTUP_ALLOC_BINARY</parameter>
+       with <parameter>id</parameter> is -1. This function must return
+       any non-NULL values for success. You must properly put back the
+       memory blocks passed to the caller for this function if needed.
+     </para>
+     <variablelist>
+       <varlistentry>
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>func</parameter></term>
+     <listitem>
+       <para>
+         An <type>enum</type> value telling the function to perform.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         A pointer to contextual parameter passed to func.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgestasctring">
+    <term>
+     <function>PQgetAsCstring</function>
+     <indexterm>
+      <primary>PQgetAsCstring</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Get the value of the column pointed
+    by <parameter>attval</parameter> in the form of
+    zero-terminated C string. Returns NULL if the value is null.
+<synopsis>
+char *PQgetAsCstring(PGresAttValue *attval)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>attval</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresAttValue</type> object
+        to retrieve the value.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetaddtupleparam">
+    <term>
+     <function>PQgetAddTupleParam</function>
+     <indexterm>
+      <primary>PQgetAddTupleParam</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Get the pointer passed to <function>PQregisterTupleAdder</function>
+    as <parameter>param</parameter>.
+<synopsis>
+void *PQgetTupleParam(PGresult *res)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetaddtupleerrmes">
+    <term>
+     <function>PQsetAddTupleErrMes</function>
+     <indexterm>
+      <primary>PQsetAddTupleErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred in <type>addTupleFunction</type>.
+    If this message is not set, the error is assumed to be out of
+    memory.
+<synopsis>
+void PQsetAddTupleErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        in <type>addTupleFunction</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>mes</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the memory block containing the error
+        message, which must be allocated by alloc(). The
+        memory block will be freed with free() in the caller
+        of
+        <type>addTupleFunction</type> only if it returns NULL.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message
+        previously set, it is freed and the given message is
+        set. Set NULL to cancel the the costom message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>

Re: Allow substitute allocators for PGresult.

От
Greg Smith
Дата:
On 12/01/2011 05:48 AM, Kyotaro HORIGUCHI wrote:
>                         xfer time    Peak RSS
> Original                    : 6.02s        850MB
> libpq patch + Original dblink    : 6.11s        850MB
> full patch                       : 4.44s        643MB
>    

These look like interesting results.  Currently Tom is listed as the 
reviewer on this patch, based on comments made before the CF really 
started.  And the patch has been incorrectly been sitting in "Waiting 
for author" for the last week; oops.  I'm not sure what to do with this 
one now except raise a general call to see if anyone wants to take a 
look at it, now that it seems to be in good enough shape to deliver 
measurable results.

-- 
Greg Smith   2ndQuadrant US    greg@2ndQuadrant.com   Baltimore, MD
PostgreSQL Training, Services, and 24x7 Support  www.2ndQuadrant.us



Re: Allow substitute allocators for PGresult.

От
Tom Lane
Дата:
Greg Smith <greg@2ndQuadrant.com> writes:
> On 12/01/2011 05:48 AM, Kyotaro HORIGUCHI wrote:
>> xfer time    Peak RSS
>> Original                : 6.02s        850MB
>> libpq patch + Original dblink    : 6.11s        850MB
>> full patch                       : 4.44s        643MB

> These look like interesting results.  Currently Tom is listed as the 
> reviewer on this patch, based on comments made before the CF really 
> started.  And the patch has been incorrectly been sitting in "Waiting 
> for author" for the last week; oops.  I'm not sure what to do with this 
> one now except raise a general call to see if anyone wants to take a 
> look at it, now that it seems to be in good enough shape to deliver 
> measurable results.

I did list myself as reviewer some time ago, but if anyone else wants to
take it I won't be offended ;-)
        regards, tom lane


Re: Allow substitute allocators for PGresult.

От
Robert Haas
Дата:
On Thu, Dec 8, 2011 at 5:41 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@oss.ntt.co.jp> wrote:
>  This is the patch to add the documentation of PGresult custom
>  storage. It shows in section '31.19. Alternative result
>  storage'.

It would be good to consolidate this into the main patch.

I find the names of the functions added here to be quite confusing and
would suggest renaming them.  I expected PQgetAsCstring to do
something similar to PQgetvalue, but the code is completely different,
and even after reading the documentation I still don't understand what
that function is supposed to be used for.  Why "as cstring"?  What
would the other option be?

I also don't think the "add tuple" terminology is particularly good.
It's not obvious from the name that what you're doing is overriding
the way memory is allocated and results are stored.

Also, what about the problem Tom mentioned here?

http://archives.postgresql.org/message-id/1042.1321123761@sss.pgh.pa.us

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Re: Allow substitute allocators for PGresult.

От
Kyotaro HORIGUCHI
Дата:
Hello, thank you for taking the time for comment.

At Wed, 21 Dec 2011 11:09:59 -0500, Robert Haas <robertmhaas@gmail.com> wrote
> I find the names of the functions added here to be quite
> confusing and would suggest renaming them.  I expected
> PQgetAsCstring to do something similar to PQgetvalue, but the
> code is completely different,

To be honest, I've also felt that kind of perplexity. If the
problem is simply of the naming, I can propose the another name
"PQreadAttValue"... This is not so good too...
But...

> and even after reading the documentation I still don't
> understand what that function is supposed to be used for.  Why
> "as cstring"?  What would the other option be?

Is it a problem of the poor description? Or about the raison
d'être of the function?

The immediate cause of the existence of the function is that
getAnotherTuple internally stores the field values of the tuples
sent from the server, in the form of PGresAttValue, and I have
found only one route to store a tuple into TupleStore is
BuildeTupleFromCStrings() to tupelstore_puttuple() which is
dblink does in materializeResult(), and of cource C-string is the
most natural format in C program, and I have hesitated to modify
execTuples.c, and I wanted to hide the details of PGresAttValue.

Assuming that the values are passed as PGresAttValue* is given
(for the reasons of performance and the extent of the
modification), the "adding tuple" functions should get the value
from the struct. This can be done in two ways from the view of
authority (`encapsulation', in other words) and convenience, one
is with the knowledge of the structure, and the other is without
it. PQgetAsCstring is the latter approach. (And it is
inconsistent with the fact that the definition of PGresAttValue
is moved into lipq-fe.h from libpq-int.h. The details of the
structure should be hidden like PGresult in this approach).

But it is not obvious that the choice is better than the another
one. If we consider that PGresAttValue is too simple and stable
to hide the details, PQgetAsCString will be taken off and the
problem will go out. PGresAttValue needs documentation in this
case.

I prefer to handle PGresAttValue directly if no problem.

> I also don't think the "add tuple" terminology is particularly good.
> It's not obvious from the name that what you're doing is overriding
> the way memory is allocated and results are stored.

This phrase is taken from pqAddTuple() in fe-exec.c at first and
have not been changed after the function is integrated with other
functions.

I propose "tuple storage handler" for the alternative.

- typedef void *(*addTupleFunction)(...);
+ typedef void *(*tupleStorageHandler)(...);

- typedef enum { ADDTUP_*, } AddTupFunc;
+ typedef enum { TSHANDLER_*, } TSHandlerCommand;

- void *PQgetAddTupleParam(...);
+ void *PQgetTupleStrageHandlerContext(...);

- void PQregisterTupleAdder(...);
+ void PQregisterTupleStoreHandler(...);

- addTupleFunction PGresult.addTupleFunc;
+ tupleStorageHandler PGresult.tupleStorageHandlerFunc;

- void *PGresult.addTuleFuncParam;
+ void *PGresult.tupleStorageHandlerContext;

- char *PGresult.addTuleFuncErrMes;
+ void *PGresult.tupelStrageHandlerErrMes;

> Also, what about the problem Tom mentioned here?
>
> http://archives.postgresql.org/message-id/1042.1321123761@sss.pgh.pa.us

The plan that simply replace malloc's with something like
palloc's is abandoned for the narrow scope.

dblink-plus copies whole PGresult into TupleStore in order to
avoid making orphaned memory on SIGINT. The resource owner
mechanism is principally applicable to that but practically hard
for the reason that current implementation without radically
modification couldn't accept it.. In addition to that, dblink
also does same thing for maybe the same reason with dblink-plus
and another reason as far as I heard.

Whatever the reason is, both dblink and dblink-plus do the same
thing that could lower the performance than expected.

If TupleStore(TupleDesc) is preferable to PGresult for in-backend
use and oridinary(client-use) libpq users can handle only
PGresult, the mechanism like this patch would be reuired to
maintain the compatibility, I think. To the contrary, if there is
no significant reason to use TupleStore in backend use - it
implies that existing mechanisms like resource owner can save the
backend inexpensively from possible inconvenience caused by using
PGresult storage in backends - PGresult should be used as it is.

I think TupleStore prepared to be used in backend is preferable
for the usage and don't want to get data making detour via
PGresult.


regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center


Speed dblink using alternate libpq tuple storage

От
Greg Smith
Дата:
One patch that fell off the truck during a turn in the November
CommitFest was "Allow substitute allocators for PGresult".  Re-reading
the whole thing again, it actually turned into a rather different
submission in the middle, and I know I didn't follow that shift
correctly then.  I'm replying to its thread but have changed the subject
to reflect that change.  From a procedural point of view, I don't feel
right kicking this back to its author on a Friday night when the
deadline for resubmitting it would be Sunday.  Instead I've refreshed
the patch myself and am adding it to the January CommitFest.  The new
patch is a single file; it's easy enough to split out the dblink changes
if someone wants to work with the pieces separately.

After my meta-review I think we should get another reviewer familiar
with using dblink to look at this next.  This is fundamentally a
performance patch now.  Some results and benchmarking code were
submitted along with it; the other issues are moot if those aren't
reproducible.  The secondary goal for a new review here is to provide
another opinion on the naming issues and abstraction concerns raised so far.

To clear out the original line of thinking, this is not a replacement
low-level storage allocator anymore.  The idea of using such a mechanism
to help catch memory leaks has also been dropped.

Instead this adds and documents a new path for libpq callers to more
directly receive tuples, for both improved speed and lower memory
usage.  dblink has been modified to use this new mechanism.
Benchmarking by the author suggests no significant change in libpq speed
when only that change was made, while the modified dblink using the new
mechanism was significantly faster.  It jumped from 332K tuples/sec to
450K, a 35% gain, and had a lower memory footprint too.  Test
methodology and those results are at
http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php

Robert Haas did a quick code review of this already, it along with
author response mixed in are at
http://archives.postgresql.org/pgsql-hackers/2011-12/msg01149.php  I see
two areas of contention there:

-There are several naming bits no one is happy with yet.  Robert didn't
like some of them, but neither did Kyotaro.  I don't have an opinion
myself.  Is it the case that some changes to the existing code's
terminology are what's actually needed to make this all better?  Or is
this just fundamentally warty and there's nothing to be done about it.
Dunno.

-There is an abstraction wrapper vs. coding convenience trade-off
centering around PGresAttValue.  It sounded to me like it raised always
fun questions like "where's the right place for the line between
lipq-fe.h and libpq-int.h to be?"

dblink is pretty popular, and this is a big performance win for it.  If
naming and API boundary issues are the worst problems here, this sounds
like something well worth pursuing as part of 9.2's still advancing
performance theme.

--
Greg Smith   2ndQuadrant US    greg@2ndQuadrant.com   Baltimore, MD
PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.com


Вложения

Re: Speed dblink using alternate libpq tuple storage

От
Kyotaro HORIGUCHI
Дата:
Hello,  This is revised and rebased version of the patch.

a. Old term `Add Tuple Function' is changed to 'Store  Handler'. The reason why not `storage' is simply length of the
symbols.

b. I couldn't find the place to settle PGgetAsCString() in. It is  removed and storeHandler()@dblink.c touches
PGresAttValue directly in this new patch. Definition of PGresAttValue stays  in lipq-fe.h and provided with comment.
 

c. Refine error handling of dblink.c. I think it preserves the  previous behavior for column number mismatch and type
conversionexception.
 

d. Document is revised.

> It jumped from 332K tuples/sec to 450K, a 35% gain, and had a
> lower memory footprint too.  Test methodology and those results
> are at
> http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php

It is a disappointment that I found that the gain had become
lower than that according to the re-measuring.

For CentOS6.2 and other conditions are the same to the previous
testing, the overall performance became hihger and the loss of
libpq patch was 1.8% and the gain of full patch had been fallen
to 5.6%. But the reduction of the memory usage was not changed.

Original             : 3.96s  100.0%
w/libpq patch        : 4.03s  101.8%
w/libpq+dblink patch : 3.74s   94.4%


The attachments are listed below.

libpq_altstore_20120117.patch - Allow alternative storage for libpql.

dblink_perf_20120117.patch - Modify dblink to use alternative storage mechanism.
libpq_altstore_doc_20120117.patch - Document for libpq_altstore. Shows in "31.19. Alternatie result storage"


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..83525e1 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterStoreHandler      161
+PQgetStoreHandlerParam      163
+PQsetStoreHandlerErrMes      164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..5559f0b 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->storeHandler = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterStoreHandler(PGconn *conn, StoreHandler func, void *param)
+{
+    conn->storeHandler = func;
+    conn->storeHandlerParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..96e5974 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -67,6 +67,10 @@ static int PQsendDescribe(PGconn *conn, char desc_type,               const char
*desc_target);staticint    check_field_number(const PGresult *res, int field_num);
 
+static void *pqDefaultStoreHandler(PGresult *res, PQStoreFunc func,
+                                   int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);
+/* ---------------- * Space management for PGresult.
@@ -160,6 +164,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->storeHandler = pqDefaultStoreHandler;
+    result->storeHandlerParam = NULL;
+    result->storeHandlerErrMes = NULL;    if (conn)    {
@@ -194,6 +201,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        if (conn->storeHandler)
+        {
+            result->storeHandler = conn->storeHandler;
+            result->storeHandlerParam = conn->storeHandlerParam;
+        }    }    else    {
@@ -487,6 +500,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)    return pqResultAlloc(res, nBytes, TRUE);}
+void *
+pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, int id, size_t len)
+{
+    void *p;
+
+    switch (func)
+    {
+        case PQSF_ALLOC_TEXT:
+            return pqResultAlloc(res, len, TRUE);
+
+        case PQSF_ALLOC_BINARY:
+            p = pqResultAlloc(res, len, FALSE);
+
+            if (id == -1)
+                res->storeHandlerParam = p;
+
+            return p;
+
+        case PQSF_ADD_TUPLE:
+            return pqAddTuple(res, res->storeHandlerParam);
+
+        default:
+            /* Ignore */
+            break;
+    }
+    return NULL;
+}/* * pqResultAlloc - *        Allocate subsidiary storage for a PGresult.
@@ -830,9 +870,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple *      add a row
pointerto the PGresult structure, growing it if necessary
 
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      Returns tup if OK, NULL if not enough memory to add the row. */
-int
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup){    if (res->ntups >= res->tupArrSize)
@@ -858,13 +898,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)            newTuples = (PGresAttValue **)
    realloc(res->tuples, newSize * sizeof(PGresAttValue *));        if (!newTuples)
 
-            return FALSE;        /* malloc or realloc failed */
+            return NULL;        /* malloc or realloc failed */        res->tupArrSize = newSize;        res->tuples =
newTuples;   }    res->tuples[res->ntups] = tup;    res->ntups++;
 
-    return TRUE;
+    return tup;}/*
@@ -2822,6 +2862,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQgetAddStoreHandlerParam
+ *    Get the pointer to the contextual parameter from PGresult which is
+ *    registered to PGconn by PQregisterStoreHandler
+ */
+void *
+PQgetStoreHandlerParam(const PGresult *res)
+{
+    if (!res)
+        return NULL;
+    return res->storeHandlerParam;
+}
+
+/* PQsetStorHandlerErrMes
+ *    Set the error message pass back to the caller of StoreHandler.
+ *
+ *  mes must be a malloc'ed memory block and it will be released by
+ *  the caller of StoreHandler.  You can replace the previous message
+ *  by alternative mes, or clear it with NULL. The previous one will
+ *  be freed internally.
+ */
+void
+PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->storeHandlerErrMes)
+        free(res->storeHandlerErrMes);
+    res->storeHandlerErrMes = mes;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..205502b 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));        /*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto addTupleError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)                vlen = 0;            if (tup[i].value ==
NULL)           {
 
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+                PQStoreFunc func = 
+                    (binary ? PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+                tup[i].value =
+                    (char *) result->storeHandler(result, func, i, vlen + 1);                if (tup[i].value ==
NULL)
-                    goto outOfMemory;
+                    goto addTupleError;            }            tup[i].len = vlen;            /* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)    }    /* Success!  Store the completed tuple in the
result*/
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+    /* and reset for a new message */    conn->curTuple = NULL;
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)        free(bitmap);    return 0;
-outOfMemory:
+addTupleError:    /* Replace partially constructed result with an error result */    /*
@@ -829,8 +834,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->storeHandlerErrMes ?
+                                       result->storeHandlerErrMes :
+                                       "out of memory for query result\n"));
+    if (result->storeHandlerErrMes)
+    {
+        free(result->storeHandlerErrMes);
+        result->storeHandlerErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..117c38a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)    if (conn->curTuple == NULL)    {
conn->curTuple= (PGresAttValue *)
 
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+            result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+                                 nfields * sizeof(PGresAttValue));        if (conn->curTuple == NULL)
-            goto outOfMemory;
+            goto addTupleError;        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));    }    tup =
conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)            vlen = 0;        if (tup[i].value ==
NULL)       {
 
-            bool        isbinary = (result->attDescs[i].format != 0);
-
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+            PQStoreFunc func = (result->attDescs[i].format != 0 ?
+                                PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+            tup[i].value =
+                (char *) result->storeHandler(result, func, i, vlen + 1);            if (tup[i].value == NULL)
-                goto outOfMemory;
+                goto addTupleError;        }        tup[i].len = vlen;        /* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)    }    /* Success!  Store the completed tuple in
theresult */
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
+    if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+        goto addTupleError;
+        /* and reset for a new message */    conn->curTuple = NULL;    return 0;
-outOfMemory:
+addTupleError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->storeHandlerErrMes ?
+                                       result->storeHandlerErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->storeHandlerErrMes)
+    {
+        free(result->storeHandlerErrMes);
+        result->storeHandlerErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..6d86fa0 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum    PQPING_NO_ATTEMPT            /* connection not attempted (bad params) */} PGPing;
+/* PQStoreFunc is the enum for one of the parameters of storeHandler
+ * that decides what to do. See the typedef StoreHandler for
+ * details */
+typedef enum 
+{
+    PQSF_ALLOC_TEXT,          /* Requested non-aligned memory for text value */
+    PQSF_ALLOC_BINARY,        /* Requested aligned memory for binary value */
+    PQSF_ADD_TUPLE            /* Requested to add tuple data into store */
+} PQStoreFunc;
+/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to
applications.*/
 
@@ -149,6 +159,15 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGresAttValue represents a value of one tuple field in string form.
+   NULL is represented as len < 0. Otherwise value points to a null
+   terminated C string with the length of len. */
+typedef struct pgresAttValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, plus terminating zero byte */
+} PGresAttValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +435,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative result store handler.
+ *
+ * This function pointer is used for alternative result store handler
+ * callback in PGresult and PGconn.
+ *
+ * StoreHandler is called for three functions designated by the enum
+ * PQStoreFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * PQSF_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * PQSF_ALLOC_BINARY requests the size bytes memory block for a binary
+ * value which is aligned to the word boundary.
+ *
+ * PQSF_ADD_TUPLE requests to add tuple data into the result store,
+ * and free the memory blocks allocated by this function if necessary.
+ * id and size are to be ignored for this function.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetStoreHandlerErrMes. It is assumed by caller as out of memory
+ * when the error message is NULL on failure. This function is assumed
+ * not to throw any exception.
+ */
+typedef void *(*StoreHandler)(PGresult *res, PQStoreFunc func,
+                              int id, size_t size);
+
+/*
+ * Register alternative result store function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own result
+ * store and calls it to append rows one by one.
+ *
+ * func is tuple store function. See the typedef StoreHandler.
+ * 
+ * storeHandlerParam is the contextual variable that can be get with
+ * PQgetStoreHandlerParam in StoreHandler.
+ */
+extern void PQregisterStoreHandler(PGconn *conn, StoreHandler func,
+                                   void *storeHandlerParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +519,8 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void *PQgetStoreHandlerParam(const PGresult *res);
+extern void    PQsetStoreHandlerErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..e28e712 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN        (-1)    /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
-    int            len;            /* length in bytes of the value */
-    char       *value;            /* actual value, plus terminating zero byte */
-} PGresAttValue;
-/* Typedef for message-field list entries */typedef struct pgMessageField{
@@ -209,6 +203,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    StoreHandler storeHandler;  /* Result store handler. See
+                                 * StoreHandler for details. */
+    void *storeHandlerParam;    /* Contextual parameter for storeHandler */
+    char *storeHandlerErrMes;   /* Error message from storeHandler */};/* PGAsyncStatusType defines the state of the
query-executionstate machine */
 
@@ -443,6 +442,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store handler. The two fields below is copied to newly
+     * created PGresult if tupStoreHandler is not NULL. Use default
+     * function if NULL. */
+    StoreHandler storeHandler;   /* Result store handler. See
+                                  * StoreHandler for details. */
+    void *storeHandlerParam;  /* Contextual parameter for storeHandler */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code,
               const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, 
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..a8685a9 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,24 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    AttInMetadata *attinmeta;
+    MemoryContext oldcontext;
+    char *attrvalbuf;
+    void **valbuf;
+    size_t *valbufsize;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +755,213 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
-
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+    sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbufsize[i] = 0;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    /* Preallocate memory of same size with PGresAttDesc array for values. */
+    sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        if (sinfo->valbuf[i])
+        {
+            free(sinfo->valbuf[i]);
+            sinfo->valbuf[i] = NULL;        }
+    }
+    if (sinfo->attrvalbuf)
+        free(sinfo->attrvalbuf);
+    sinfo->attrvalbuf = NULL;
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+static void *
+storeHandler(PGresult *res, PQStoreFunc  func, int id, size_t size)
+{
+    storeInfo *sinfo = (storeInfo *)PQgetStoreHandlerParam(res);
+    HeapTuple    tuple;
+    int fields = PQnfields(res);
+    int i;
+    PGresAttValue *attval;
+    char        **cstrs;
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->error_occurred)
+        return NULL;
+
+    switch (func)
+    {
+        case PQSF_ALLOC_TEXT:
+        case PQSF_ALLOC_BINARY:
+            if (id == -1)
+                return sinfo->attrvalbuf;
-            values = (char **) palloc(nfields * sizeof(char *));
+            if (id < 0 || id >= sinfo->nattrs)
+                return NULL;
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
+            if (sinfo->valbufsize[id] < size)            {
-                HeapTuple    tuple;
+                if (sinfo->valbuf[id] == NULL)
+                    sinfo->valbuf[id] = malloc(size);
+                else
+                    sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+                sinfo->valbufsize[id] = size;
+            }
+            return sinfo->valbuf[id];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+        case PQSF_ADD_TUPLE:
+            break;   /* Go through */
+        default:
+            /* Ignore */
+            break;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return NULL;
+    }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+    /*
+     * Rewrite PGresAttValue[] to char(*)[] in-place.
+     */
+    Assert(sizeof(char*) <= sizeof(PGresAttValue));
-        PQclear(res);
+    attval = (PGresAttValue *)sinfo->attrvalbuf;
+    cstrs   = (char **)sinfo->attrvalbuf;
+    for(i = 0 ; i < fields ; i++)
+    {
+        if (attval->len < 0)
+            cstrs[i] = NULL;
+        else
+            cstrs[i] = attval->value;
+    }
+
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        MemoryContext context;
+        /*
+         * Store exception for later ReThrow and cancel the exception.
+         */
+        sinfo->error_occurred = TRUE;
+        context = MemoryContextSwitchTo(sinfo->oldcontext);
+        sinfo->edata = CopyErrorData();
+        MemoryContextSwitchTo(context);
+        FlushErrorState();
+
+        return NULL;    }    PG_END_TRY();
+
+    return sinfo->attrvalbuf;}/*
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..8803999 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,293 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-alterstorage">
+  <title>Alternative result storage</title>
+
+  <indexterm zone="libpq-alterstorage">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to store them into your own storage instead of PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registerstorehandler">
+    <term>
+     <function>PQregisterStoreHandler</function>
+     <indexterm>
+      <primary>PQregisterStoreHandler</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to allocate memory for each tuple and
+       column values, and add the complete tuple into the alternative
+       result storage.
+<synopsis>
+void PQregisterStoreHandler(PGconn *conn,
+                            StoreHandler func,
+                            void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the storage handler
+           function. PGresult created from this connection calls this
+           function to store the result instead of storing into its
+           internal storage.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default storage.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>. You can get this poiner
+           in <type>StoreHandler</type>
+           by <function>PQgetStoreHandlerParam</function>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-storehandler">
+    <term>
+     <type>Storehandler</type>
+     <indexterm>
+      <primary>StoreHandler</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the storage handler callback function.
+<synopsis>
+typedef enum 
+{
+  PQSF_ALLOC_TEXT,
+  PQSF_ALLOC_BINARY,
+  PQSF_ADD_TUPLE
+} PQStoreFunc;
+
+void *(*StoreHandler)(PGresult *res,
+                      PQStoreFunc func,
+                      int id,
+                      size_t size);
+</synopsis>
+     </para>
+
+     <para>
+       Generally this function must return NULL for failure and should
+       set the error message
+       with <function>PGsetStoreHandlerErrMes</function> if the cause
+       is other than out of memory. This funcion must not throw any
+       exception. This function is called in the sequence following.
+
+       <itemizedlist spacing="compact">
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_BINARY</firstterm>
+       and <parameter>id</parameter> = -1 to request the memory
+       for a tuple to be used as an array
+       of <type>PGresAttValue</type>. </simpara>
+     </listitem>
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_TEXT</firstterm>
+       or <firstterm>PQSF_ALLOC_BINARY</firstterm>
+       and <parameter>id</parameter> is zero to the number of columns
+       - 1 to request the memory for each column value in current
+       tuple.</simpara>
+     </listitem>
+     <listitem>
+       <simpara>Call with <parameter>func</parameter>
+       = <firstterm>PQSF_ADD_TUPLE</firstterm> to request the
+       constructed tuple to be stored.</simpara>
+     </listitem>
+       </itemizedlist>
+     </para>
+     <para>
+       Calling <type>StoreHandler</type>
+       with <parameter>func</parameter> =
+       <firstterm>PQSF_ALLOC_TEXT</firstterm> is telling to return a
+        memory block with at least <parameter>size</parameter> bytes
+        which may not be aligned to the word boundary.
+       <parameter>id</parameter> is a zero or positive number
+       distinguishes the usage of requested memory block, that is the
+       position of the column for which the memory block is used.
+     </para>
+     <para>
+       When <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_BINARY</firstterm>, this function is
+       telled to return a memory block with at
+       least <parameter>size</parameter> bytes which is aligned to the
+       word boundary.
+       <parameter>id</parameter> is the identifier distinguishes the
+       usage of requested memory block. -1 means that it is used as an
+       array of <type>PGresAttValue</type> to store the tuple. Zero or
+       positive numbers have the same meanings as for
+       <firstterm>PQSF_ALLOC_BINARY</firstterm>.
+     </para>
+     <para>When <parameter>func</parameter>
+       = <firstterm>PQSF_ADD_TUPLE</firstterm>, this function is
+       telled to store the <type>PGresAttValue</type> structure
+       constructed by the caller into your storage. The pointer to the
+       tuple structure is not passed so you should memorize the
+       pointer to the memory block passed back the caller on
+       <parameter>func</parameter>
+       = <parameter>PQSF_ALLOC_BINARY</parameter>
+       with <parameter>id</parameter> is -1. This function must return
+       any non-NULL values for success. You must properly put back the
+       memory blocks passed to the caller in this function if needed.
+     </para>
+     <variablelist>
+       <varlistentry>
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>func</parameter></term>
+     <listitem>
+       <para>
+         An <type>enum</type> value telling the function to perform.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         A pointer to contextual parameter passed to func.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetstorehandlerparam">
+    <term>
+     <function>PQgetStoreHandlerParam</function>
+     <indexterm>
+      <primary>PQgetStoreHandlerParam</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Get the pointer passed to <function>PQregisterStoreHandler</function>
+    as <parameter>param</parameter>.
+<synopsis>
+void *PQgetStoreHandlerParam(PGresult *res)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetstorehandlererrmes">
+    <term>
+     <function>PQsetStoreHandlerErrMes</function>
+     <indexterm>
+      <primary>PQsetStoreHandlerErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>StoreHandler</type>.  If this message is not set, the
+    caller assumes the error to be out of memory.
+<synopsis>
+void PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>StoreHandler</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>mes</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the memory block containing the error
+        message, which is allocated
+        by <function>malloc()</function>. The memory block
+        will be freed with <function>free()</function> in the
+        caller of
+        <type>StoreHandler</type> only if it returns NULL.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it is freed and then the given message is set. Set NULL
+        to cancel the the costom message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>

Re: Speed dblink using alternate libpq tuple storage

От
Marko Kreen
Дата:
On Tue, Jan 17, 2012 at 05:53:33PM +0900, Kyotaro HORIGUCHI wrote:
> Hello,  This is revised and rebased version of the patch.
> 
> a. Old term `Add Tuple Function' is changed to 'Store
>    Handler'. The reason why not `storage' is simply length of the
>    symbols.
> 
> b. I couldn't find the place to settle PGgetAsCString() in. It is
>    removed and storeHandler()@dblink.c touches PGresAttValue
>    directly in this new patch. Definition of PGresAttValue stays
>    in lipq-fe.h and provided with comment.
> 
> c. Refine error handling of dblink.c. I think it preserves the
>    previous behavior for column number mismatch and type
>    conversion exception.
> 
> d. Document is revised.

First, my priority is one-the-fly result processing,
not the allocation optimizing.  And this patch seems to make
it possible, I can process results row-by-row, without the
need to buffer all of them in PQresult.  Which is great!

But the current API seems clumsy, I guess its because the
patch grew from trying to replace the low-level allocator.

I would like to propose better one-shot API with:
   void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns);

where the PGresAttValue * is allocated once, inside PQresult.
And the pointers inside point directly to network buffer.
Ofcourse this requires replacing the current per-column malloc+copy
pattern with per-row parse+handle pattern, but I think resulting
API will be better:

1) Pass-through processing do not need to care about unnecessary  per-row allocations.

2) Handlers that want to copy of the row (like regular libpq),  can optimize allocations by having "global" view of the
row. (Eg. One allocation for row header + data).
 

This also optimizes call patterns - first libpq parses packet,
then row handler processes row, no unnecessary back-and-forth.


Summary - current API has various assumptions how the row is
processed, let's remove those.

-- 
marko



Re: Speed dblink using alternate libpq tuple storage

От
"Marc Mamin"
Дата:
> >
> > c. Refine error handling of dblink.c. I think it preserves the
> >    previous behavior for column number mismatch and type
> >    conversion exception.

Hello,

I don't know if this cover following issue.
I just mention it for the case you didn't notice it and would like to
handle this rather cosmetic issue as well.

http://archives.postgresql.org/pgsql-bugs/2011-08/msg00113.php

best regards,

Marc Mamin



Re: Speed dblink using alternate libpq tuple storage

От
Marko Kreen
Дата:
On Sat, Jan 21, 2012 at 1:52 PM, Marc Mamin <M.Mamin@intershop.de> wrote:
>> >
>> > c. Refine error handling of dblink.c. I think it preserves the
>> >    previous behavior for column number mismatch and type
>> >    conversion exception.
>
> Hello,
>
> I don't know if this cover following issue.
> I just mention it for the case you didn't notice it and would like to
> handle this rather cosmetic issue as well.
>
> http://archives.postgresql.org/pgsql-bugs/2011-08/msg00113.php

It is not relevant to this thread, but seems good idea to implement indeed.
It should be simple matter of creating handler that uses dblink_res_error()
to report the notice.

Perhaps you could create and submit the patch by yourself?

For reference, here it the full flow in PL/Proxy:

1) PQsetNoticeReceiver:
https://github.com/markokr/plproxy-dev/blob/master/src/execute.c#L422
2) handle_notice:
https://github.com/markokr/plproxy-dev/blob/master/src/execute.c#L370
3) plproxy_remote_error:
https://github.com/markokr/plproxy-dev/blob/master/src/main.c#L82

--
marko


Re: Speed dblink using alternate libpq tuple storage

От
Kyotaro HORIGUCHI
Дата:
Thank you for the comment,

> First, my priority is one-the-fly result processing,
> not the allocation optimizing.  And this patch seems to make
> it possible, I can process results row-by-row, without the
> need to buffer all of them in PQresult.  Which is great!
> 
> But the current API seems clumsy, I guess its because the
> patch grew from trying to replace the low-level allocator.
Exactly.

> I would like to propose better one-shot API with:
> 
>     void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns);
> 
> where the PGresAttValue * is allocated once, inside PQresult.
> And the pointers inside point directly to network buffer.
Good catch, thank you. The patch is dragging too much from the
old implementation. It is no need to copy the data inside
getAnotherTuple to do it, as you say.

> Ofcourse this requires replacing the current per-column malloc+copy
> pattern with per-row parse+handle pattern, but I think resulting
> API will be better:
> 
> 1) Pass-through processing do not need to care about unnecessary
>    per-row allocations.
> 
> 2) Handlers that want to copy of the row (like regular libpq),
>    can optimize allocations by having "global" view of the row.
>    (Eg. One allocation for row header + data).
> 
> This also optimizes call patterns - first libpq parses packet,
> then row handler processes row, no unnecessary back-and-forth.
> 
> 
> Summary - current API has various assumptions how the row is
> processed, let's remove those.
Thank you, I rewrite the patch to make it realize.
regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


Re: Speed dblink using alternate libpq tuple storage

От
Kyotaro HORIGUCHI
Дата:
Hello, This is a new version of the patch formerly known as
'alternative storage for libpq'.

- Changed the concept to 'Alternative Row Processor' from 'Storage handler'. Symbol names are also changed.

- Callback function is modified following to the comment.

- From the restriction of time, I did minimum check for this patch. The purpose of this patch is to show the new
implement.

- Proformance is not measured for this patch for the same reason. I will do that on next monday.

- The meaning of PGresAttValue is changed. The field 'value' now contains a value withOUT terminating zero. This change
seemsto have no effect on any other portion within the whole source tree of postgresql from what I've seen.
 


> > I would like to propose better one-shot API with:
> > 
> >     void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns);
...
> > 1) Pass-through processing do not need to care about unnecessary
> >    per-row allocations.
> > 
> > 2) Handlers that want to copy of the row (like regular libpq),
> >    can optimize allocations by having "global" view of the row.
> >    (Eg. One allocation for row header + data).

I expect the new implementation is far more better than the
orignal.

regargs,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..c47af3a 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterRowProcessor      161
+PQgetRowProcessorParam      163
+PQsetRowProcessorErrMes      164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..93803d5 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->rowProcessor = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterRowProcessor(PGconn *conn, RowProcessor func, void *param)
+{
+    conn->rowProcessor = func;
+    conn->rowProcessorParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..5d78b39 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,7 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
-
+static void *pqAddTuple(PGresult *res, PGresAttValue *columns);/* ---------------- * Space management for PGresult.
@@ -160,6 +160,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->rowProcessor = pqAddTuple;
+    result->rowProcessorParam = NULL;
+    result->rowProcessorErrMes = NULL;    if (conn)    {
@@ -194,6 +197,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        if (conn->rowProcessor)
+        {
+            result->rowProcessor = conn->rowProcessor;
+            result->rowProcessorParam = conn->rowProcessorParam;
+        }    }    else    {
@@ -445,7 +454,7 @@ PQsetvalue(PGresult *res, int tup_num, int field_num, char *value, int len)        }        /* add
itto the array */
 
-        if (!pqAddTuple(res, tup))
+        if (pqAddTuple(res, tup) == NULL)            return FALSE;    }
@@ -701,7 +710,6 @@ pqClearAsyncResult(PGconn *conn)    if (conn->result)        PQclear(conn->result);    conn->result
=NULL;
 
-    conn->curTuple = NULL;}/*
@@ -756,7 +764,6 @@ pqPrepareAsyncResult(PGconn *conn)     */    res = conn->result;    conn->result = NULL;        /*
handingover ownership to caller */
 
-    conn->curTuple = NULL;        /* just in case */    if (!res)        res = PQmakeEmptyPGresult(conn,
PGRES_FATAL_ERROR);   else
 
@@ -829,12 +836,17 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple
- *      add a row pointer to the PGresult structure, growing it if necessary
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      add a row to the PGresult structure, growing it if necessary
+ *      Returns the pointer to the new tuple if OK, NULL if not enough
+ *      memory to add the row. */
-int
-pqAddTuple(PGresult *res, PGresAttValue *tup)
+void *
+pqAddTuple(PGresult *res, PGresAttValue *columns){
+    PGresAttValue *tup;
+    int nfields = res->numAttributes;
+    int i;
+    if (res->ntups >= res->tupArrSize)    {        /*
@@ -858,13 +870,39 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)            newTuples = (PGresAttValue **)
    realloc(res->tuples, newSize * sizeof(PGresAttValue *));        if (!newTuples)
 
-            return FALSE;        /* malloc or realloc failed */
+            return NULL;        /* malloc or realloc failed */        res->tupArrSize = newSize;        res->tuples =
newTuples;   }
 
+
+    tup = (PGresAttValue *)
+        pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+    if (tup == NULL) return NULL;
+    memcpy(tup, columns, nfields * sizeof(PGresAttValue));
+
+    for (i = 0 ; i < nfields ; i++)
+    {
+        tup[i].len = columns[i].len;
+        if (tup[i].len == NULL_LEN)
+        {
+            tup[i].value = res->null_field;
+        }
+        else
+        {
+            bool isbinary = (res->attDescs[i].format != 0);
+            tup[i].value =
+                (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+            if (tup[i].value == NULL)
+                return NULL;
+            memcpy(tup[i].value, columns[i].value, tup[i].len);
+            /* We have to terminate this ourselves */
+            tup[i].value[tup[i].len] = '\0';
+        }
+    }
+    res->tuples[res->ntups] = tup;    res->ntups++;
-    return TRUE;
+    return tup;}/*
@@ -1223,7 +1261,6 @@ PQsendQueryStart(PGconn *conn)    /* initialize async result-accumulation state */
conn->result= NULL;
 
-    conn->curTuple = NULL;    /* ready to send command message */    return true;
@@ -2822,6 +2859,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQgetAddRowProcessorParam
+ *    Get the pointer to the contextual parameter from PGresult which is
+ *    registered to PGconn by PQregisterRowProcessor
+ */
+void *
+PQgetRowProcessorParam(const PGresult *res)
+{
+    if (!res)
+        return NULL;
+    return res->rowProcessorParam;
+}
+
+/* PQsetRowProcessorErrMes
+ *    Set the error message pass back to the caller of RowProcessor.
+ *
+ *  mes must be a malloc'ed memory block and it will be released by
+ *  the caller of RowProcessor.  You can replace the previous message
+ *  by alternative mes, or clear it with NULL. The previous one will
+ *  be freed internally.
+ */
+void
+PQsetRowProcessorErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->rowProcessorErrMes)
+        free(res->rowProcessorErrMes);
+    res->rowProcessorErrMes = mes;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..546534a 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/*
+ * pqGetnchar:
+ *    skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+    if (len > (size_t) (conn->inEnd - conn->inCursor))
+        return EOF;
+
+    conn->inCursor += len;
+
+    if (conn->Pfdebug)
+        fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+                (unsigned long) len);
+
+    return 0;
+}
+
+/* * pqPutnchar: *    write exactly len bytes to the current message */
@@ -238,6 +257,7 @@ pqPutnchar(const char *s, size_t len, PGconn *conn)    return 0;}
+/* * pqGetInt *    read a 2 or 4 byte integer and convert from network byte order
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..9abbb29 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -715,7 +715,7 @@ getAnotherTuple(PGconn *conn, bool binary){    PGresult   *result = conn->result;    int
nfields= result->numAttributes;
 
-    PGresAttValue *tup;
+    PGresAttValue tup[result->numAttributes];    /* the backend sends us a bitmap of which attributes are null */
char       std_bitmap[64]; /* used unless it doesn't fit */
 
@@ -729,26 +729,11 @@ getAnotherTuple(PGconn *conn, bool binary)    result->binary = binary;
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
+    if (binary)    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-        /*
-         * If it's binary, fix the column format indicators.  We assume the
-         * backend will consistently send either B or D, not a mix.
-         */
-        if (binary)
-        {
-            for (i = 0; i < nfields; i++)
-                result->attDescs[i].format = 1;
-        }
+        for (i = 0; i < nfields; i++)
+            result->attDescs[i].format = 1;    }
-    tup = conn->curTuple;    /* Get the null-value bitmap */    nbytes = (nfields + BITS_PER_BYTE - 1) /
BITS_PER_BYTE;
@@ -757,7 +742,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto rowProcessError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -785,19 +770,17 @@ getAnotherTuple(PGconn *conn, bool binary)                vlen = vlen - 4;            if (vlen <
0)               vlen = 0;
 
-            if (tup[i].value == NULL)
-            {
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                if (tup[i].value == NULL)
-                    goto outOfMemory;
-            }
+
+            /*
+             * Buffer content may be shifted on reloading data. So we must
+             * set the pointer to the value on every scan.
+             */
+            tup[i].value = conn->inBuffer + conn->inCursor;            tup[i].len = vlen;
-            /* read in the value */
+            /* Skip the value */            if (vlen > 0)
-                if (pqGetnchar((char *) (tup[i].value), vlen, conn))
+                if (pqSkipnchar(vlen, conn))                    goto EOFexit;
-            /* we have to terminate this ourselves */
-            tup[i].value[vlen] = '\0';        }        /* advance the bitmap stuff */        bitcnt++;
@@ -812,16 +795,15 @@ getAnotherTuple(PGconn *conn, bool binary)    }    /* Success!  Store the completed tuple in the
result*/
 
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    if (!result->rowProcessor(result, tup))
+        goto rowProcessError;    if (bitmap != std_bitmap)        free(bitmap);    return 0;
-outOfMemory:
+rowProcessError:
+        /* Replace partially constructed result with an error result */    /*
@@ -829,8 +811,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes :
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..18342c7 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -625,22 +625,12 @@ getAnotherTuple(PGconn *conn, int msgLength){    PGresult   *result = conn->result;    int
   nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGresAttValue tup[result->numAttributes];    int            tupnfields;        /* # fields from tuple */    int
       vlen;            /* length of the current field value */    int            i;    /* Allocate tuple space if
firsttime for this data message */
 
-    if (conn->curTuple == NULL)
-    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-    }
-    tup = conn->curTuple;
-    /* Get the field count and make sure it's what we expect */    if (pqGetInt(&tupnfields, 2, conn))        return
EOF;
@@ -671,40 +661,46 @@ getAnotherTuple(PGconn *conn, int msgLength)        }        if (vlen < 0)            vlen = 0;
-        if (tup[i].value == NULL)
-        {
-            bool        isbinary = (result->attDescs[i].format != 0);
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-            if (tup[i].value == NULL)
-                goto outOfMemory;
-        }
-        tup[i].len = vlen;
-        /* read in the value */
+        /*
+         * Buffer content may be shifted on reloading data. So we must
+         * set the pointer to the value every scan.
+         */
+        tup[i].value = conn->inBuffer + conn->inCursor;
+         tup[i].len = vlen;        if (vlen > 0)
-            if (pqGetnchar((char *) (tup[i].value), vlen, conn))
+            if (pqSkipnchar(vlen, conn))                return EOF;
-        /* we have to terminate this ourselves */
-        tup[i].value[vlen] = '\0';    }    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
-
+    if (!result->rowProcessor(result, tup))
+        goto rowProcessError;
+        return 0;
-outOfMemory:
+rowProcessError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..0931211 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,15 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGresAttValue represents a value of one tuple field in string form.
+   NULL is represented as len < 0. Otherwise value points to a string
+   without null termination of the length of len. */
+typedef struct pgresAttValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, without null termination */
+} PGresAttValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +425,31 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetRowProcessorErrMes. It is assumed by caller as out of memory
+ * when the error message is NULL on failure. This function is assumed
+ * not to throw any exception.
+ */
+typedef void *(*RowProcessor)(PGresult *res, PGresAttValue *columns);
+
+/*
+ * Register alternative result store function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ * 
+ * rowProcessorParam is the contextual variable that can be get with
+ * PQgetRowProcessorParam in RowProcessor.
+ */
+extern void PQregisterRowProcessor(PGconn *conn, RowProcessor func,
+                                   void *rowProcessorParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +488,8 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void *PQgetRowProcessorParam(const PGresult *res);
+extern void    PQsetRowProcessorErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..51ac927 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN        (-1)    /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
-    int            len;            /* length in bytes of the value */
-    char       *value;            /* actual value, plus terminating zero byte */
-} PGresAttValue;
-/* Typedef for message-field list entries */typedef struct pgMessageField{
@@ -209,6 +203,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    RowProcessor rowProcessor;  /* Result row processor handler. See
+                                 * RowProcessor for details. */
+    void *rowProcessorParam;    /* Contextual parameter for rowProcessor */
+    char *rowProcessorErrMes;   /* Error message from rowProcessor */};/* PGAsyncStatusType defines the state of the
query-executionstate machine */
 
@@ -398,7 +397,6 @@ struct pg_conn    /* Status for asynchronous result construction */    PGresult   *result;
 /* result being constructed */
 
-    PGresAttValue *curTuple;    /* tuple currently being read */#ifdef USE_SSL    bool        allow_ssl_try;    /*
Allowedto try SSL negotiation */
 
@@ -443,6 +441,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store handler. The two fields below is copied to newly
+     * created PGresult if rowProcessor is not NULL. Use default
+     * function if NULL. */
+    RowProcessor rowProcessor;   /* Result row processor. See
+                                  * RowProcessor for details. */
+    void *rowProcessorParam;     /* Contextual parameter for rowProcessor */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +512,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code,
               const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name,
 
@@ -560,6 +564,7 @@ extern int    pqGets(PQExpBuffer buf, PGconn *conn);extern int    pqGets_append(PQExpBuffer buf,
PGconn*conn);extern int    pqPuts(const char *s, PGconn *conn);extern int    pqGetnchar(char *s, size_t len, PGconn
*conn);
+extern int    pqSkipnchar(size_t len, PGconn *conn);extern int    pqPutnchar(const char *s, size_t len, PGconn
*conn);externint    pqGetInt(int *result, size_t bytes, PGconn *conn);extern int    pqPutInt(int value, size_t bytes,
PGconn*conn); 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..9ad3bfd 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,215 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-alterrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-alterrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to pass every row to your own row processor instead of
+   storing into PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registerrowprocessor">
+    <term>
+     <function>PQregisterRowProcessor</function>
+     <indexterm>
+      <primary>PQregisterRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQregisterRowProcessor(PGconn *conn,
+                            RowProcessor func,
+                            void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the storage handler
+           function. PGresult created from this connection calls this
+           function to process each row.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default processor.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>. You can get this pointer
+           in <type>RowProcessor</type>
+           by <function>PQgetRowProcessorParam</function>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-rowprocessor">
+    <term>
+     <type>RowProcessor</type>
+     <indexterm>
+      <primary>RowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+void *(*RowProcessor)(PGresult *res,
+                      PGresAttValue *columns);
+</synopsis>
+     </para>
+
+     <para>
+       Generally this function must return NULL for failure and should
+       set the error message
+       with <function>PGsetRowProcessorErrMes</function> if the cause
+       is other than out of memory. This funcion must not throw any
+       exception.
+     </para>
+     <variablelist>
+       <varlistentry>
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+     <term><parameter>columns</parameter></term>
+     <listitem>
+       <para>
+         An column values of the row to process.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetrowprocessorparam">
+    <term>
+     <function>PQgetRowProcessorParam</function>
+     <indexterm>
+      <primary>PQgetRowProcessorParam</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Get the pointer passed to <function>PQregisterRowProcessor</function>
+    as <parameter>param</parameter>.
+<synopsis>
+void *PQgetRowProcessorParam(PGresult *res)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessorerrmes">
+    <term>
+     <function>PQsetRowProcessorErrMes</function>
+     <indexterm>
+      <primary>PQsetRowProcessorErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>RowProcessor</type>.  If this message is not set, the
+    caller assumes the error to be out of memory.
+<synopsis>
+void PQsetRowProcessorErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>RowProcessor</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>mes</parameter></term>
+        <listitem>
+          <para>
+        A pointer  to the memory  block containing the  error message,
+        which  is   allocated  by  <function>malloc()</function>.  The
+        memory block will be freed with <function>free()</function> in
+        the caller of <type>RowProcessor</type> only if it returns NULL.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it is freed and then the given message is set. Set NULL
+        to cancel the the costom message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..195ad21 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *storeHandler(PGresult *res, PGresAttValue *columns);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +754,205 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
-
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **) malloc(sinfo->nattrs * sizeof(char*));
+    sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            is_sql_cmd = false;
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)
+        {
+            if (sinfo->valbuf[i])            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
+                free(sinfo->valbuf[i]);
+                sinfo->valbuf[i] = NULL;            }
-
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
-
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            values = (char **) palloc(nfields * sizeof(char *));
+static void *
+storeHandler(PGresult *res, PGresAttValue *columns)
+{
+    storeInfo *sinfo = (storeInfo *)PQgetRowProcessorParam(res);
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      *cstrs[PQnfields(res)];
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+    if (sinfo->error_occurred)
+        return NULL;
-                if (!is_sql_cmd)
-                {
-                    int            i;
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return NULL;
+    }
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
+    /*
+     * value input functions assumes that the value string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)
+    {
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            if (sinfo->valbuf[i] == NULL)
+            {
+                sinfo->valbuf[i] = (char *)malloc(len + 1);
+                sinfo->valbuflen[i] = len + 1;
+            }
+            else if (sinfo->valbuflen[i] < len + 1)
+            {
+                sinfo->valbuf[i] = (char *)realloc(sinfo->valbuf[i], len + 1);
+                sinfo->valbuflen[i] = len + 1;            }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';        }
+    }
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        MemoryContext context;
+        /*
+         * Store exception for later ReThrow and cancel the exception.
+         */
+        sinfo->error_occurred = TRUE;
+        context = MemoryContextSwitchTo(sinfo->oldcontext);
+        sinfo->edata = CopyErrorData();
+        MemoryContextSwitchTo(context);
+        FlushErrorState();
+
+        return NULL;    }    PG_END_TRY();
+
+    return columns;}/*

Re: Speed dblink using alternate libpq tuple storage

От
Merlin Moncure
Дата:
On Fri, Jan 27, 2012 at 2:57 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@oss.ntt.co.jp> wrote:
> Hello, This is a new version of the patch formerly known as
> 'alternative storage for libpq'.

I took a quick look at the patch and the docs.  Looks good and agree
with rationale and implementation.   I see you covered the pqsetvalue
case which is nice.  I expect libpq C api clients coded for
performance will immediately gravitate to this api.

> - The meaning of PGresAttValue is changed. The field 'value' now
>  contains a value withOUT terminating zero. This change seems to
>  have no effect on any other portion within the whole source
>  tree of postgresql from what I've seen.

This is a minor point of concern.  This function was exposed to
support libpqtypes (which your stuff compliments very nicely by the
way) and I quickly confirmed removal of the null terminator didn't
cause any problems there.  I doubt anyone else is inspecting the
structure directly (also searched the archives and didn't find
anything).

This needs to be advertised very loudly in the docs -- I understand
why this was done but it's a pretty big change in the way the api
works.

merlin


Re: Speed dblink using alternate libpq tuple storage

От
Marko Kreen
Дата:
On Fri, Jan 27, 2012 at 05:57:01PM +0900, Kyotaro HORIGUCHI wrote:
> Hello, This is a new version of the patch formerly known as
> 'alternative storage for libpq'.
> 
> - Changed the concept to 'Alternative Row Processor' from
>   'Storage handler'. Symbol names are also changed.
> 
> - Callback function is modified following to the comment.
> 
> - From the restriction of time, I did minimum check for this
>   patch. The purpose of this patch is to show the new implement.
> 
> - Proformance is not measured for this patch for the same
>   reason. I will do that on next monday.
> 
> - The meaning of PGresAttValue is changed. The field 'value' now
>   contains a value withOUT terminating zero. This change seems to
>   have no effect on any other portion within the whole source
>   tree of postgresql from what I've seen.


I think we have general structure in place.  Good.

Minor notes:

= rowhandler api =

* It returns bool, so void* is wrong.  Instead libpq style is to use int, with 1=OK, 0=Failure.  Seems that was also
oldpqAddTuple() convention.
 

* Drop PQgetRowProcessorParam(), instead give param as argument.

* PQsetRowProcessorErrMes() should strdup() the message.  That gets rid of allocator requirements in API.  This also
makessafe to pass static strings there.  If strdup() fails, fall back to generic no-mem message.
 

* Create new struct to replace PGresAttValue for rowhandler usage. RowHandler API is pretty unique and self-contained.
Itshould have it's own struct.  Main reason is that it allows to properly document it. Otherwise the minor details get
lostas they are different from libpq-internal usage.  Also this allows two structs to be improved separately.
(PGresRawValue?)

* Stop storing null_value into ->value.  It's libpq internal detail. Instead the ->value should always point into
bufferwhere the value info is located, even for NULL.  This makes safe to simply subtract pointers to get row size
estimate.Seems pqAddTuple() already does null_value logic, so no need to do it in rowhandler api.
 

= libpq =

Currently its confusing whether rowProcessor can be NULL, and what
should be done if so.  I think its better to fix usage so that
it is always set.

* PQregisterRowProcessor() should use default func if func==NULL. and set default handler if so.
* Never set rowProcessor directly, always via PQregisterRowProcessor()
* Drop all if(rowProcessor) checks.

= dblink =

* There are malloc failure checks missing in initStoreInfo() & storeHandler().


-- 
marko


PS.  You did not hear it from me, but most raw values are actually
nul-terminated in protocol.  Think big-endian.  And those which
are not, you can make so, as the data is not touched anymore.
You cannot do it for last value, as next byte may not be allocated.
But you could memmove() it lower address so you can null-terminate.

I'm not suggesting it for official patch, but it would be fun to know
if such hack is benchmarkable, and benchmarkable on realistic load.



Re: Speed dblink using alternate libpq tuple storage

От
Marko Kreen
Дата:
On Fri, Jan 27, 2012 at 09:35:04AM -0600, Merlin Moncure wrote:
> On Fri, Jan 27, 2012 at 2:57 AM, Kyotaro HORIGUCHI
> > - The meaning of PGresAttValue is changed. The field 'value' now
> >  contains a value withOUT terminating zero. This change seems to
> >  have no effect on any other portion within the whole source
> >  tree of postgresql from what I've seen.
> 
> This is a minor point of concern.  This function was exposed to
> support libpqtypes (which your stuff compliments very nicely by the
> way) and I quickly confirmed removal of the null terminator didn't
> cause any problems there.  I doubt anyone else is inspecting the
> structure directly (also searched the archives and didn't find
> anything).
> 
> This needs to be advertised very loudly in the docs -- I understand
> why this was done but it's a pretty big change in the way the api
> works.

Note that the non-NUL-terminated PGresAttValue is only used for row
handler.  So no existing usage is affected.

But I agree using same struct in different situations is confusing,
thus the request for separate struct for row handler usage.

-- 
marko



Re: Speed dblink using alternate libpq tuple storage

От
Kyotaro HORIGUCHI
Дата:
Thank you for comments, this is revised version of the patch.

The gain of performance is more than expected. Measure script now
does query via dblink ten times for stability of measuring, so
the figures become about ten times longer than the previous ones.
                      sec    % to Original
Original             : 31.5     100.0%
RowProcessor patch   : 31.3      99.4%
dblink patch         : 24.6      78.1%

RowProcessor patch alone makes no loss or very-little gain, and
full patch gives us 22% gain for the benchmark(*1).


The modifications are listed below.


- No more use of PGresAttValue for this mechanism, and added PGrowValue instead. PGresAttValue has been put back to
libpq-int.h

- pqAddTuple() is restored as original and new function paAddRow() to use as RowProcessor. (Previous pqAddTuple
implementhad been buggily mixed the two usage of PGresAttValue)
 

- PQgetRowProcessorParam has been dropped. Contextual parameter is passed as one of the parameters of RowProcessor().

- RowProcessor() returns int (as bool, is that libpq convension?) instead of void *. (Actually, void * had already
becomeuseless as of previous patch)
 

- PQsetRowProcessorErrMes() is changed to do strdup internally.

- The callers of RowProcessor() no more set null_field to PGrowValue.value. Plus, the PGrowValue[] which RowProcessor()
receiveshas nfields + 1 elements to be able to make rough estimate by cols->value[nfields].value - cols->value[0].value
-something.  The somthing here is 4 * nfields for protocol3 and 4 * (non-null fields) for protocol2. I fear that this
appliesonly for textual transfer usage...
 

- PQregisterRowProcessor() sets the default handler when given NULL. (pg_conn|pg_result).rowProcessor cannot be NULL
forits lifetime.
 

- initStoreInfo() and storeHandler() has been provided with malloc error handling.


And more..

- getAnotherTuple()@fe-protocol2.c is not tested utterly.

- The uniformity of the size of columns in the test data prevents realloc from execution in dblink... More test should
bedone.
 

regards,

=====
(*1) The benchmark is done as follows,

==test.sql
select dblink_connect('c', 'host=localhost dbname=test');
select * from dblink('c', 'select a,c from foo limit 2000000') as (a text b bytea) limit 1;
...(repeat 9 times more)
select dblink_disconnect('c');
==

$ for i in $(seq 1 10); do time psql test -f t.sql; done

The environment is CentOS 6.2 on VirtualBox on Core i7 965 3.2GHz # of processor  1 Allocated mem   2GB 
Test DB schema is  Column | Type  | Modifiers  --------+-------+-----------  a      | text  |   b      | text  |   c
 | bytea |  Indexes:     "foo_a_bt" btree (a)     "foo_c_bt" btree (c)
 

test=# select count(*),              min(length(a)) as a_min, max(length(a)) as a_max,              min(length(c)) as
c_min,max(length(c)) as c_max from foo;
 
 count  | a_min | a_max | c_min | c_max 
---------+-------+-------+-------+-------2000000 |    29 |    29 |    29 |    29
(1 row)

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..5ed083c 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,5 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQregisterRowProcessor      161
+PQsetRowProcessorErrMes      162
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..4fe2f41 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,8 @@ makeEmptyPGconn(void)    conn->allow_ssl_try = true;    conn->wait_ssl_try = false;#endif
+    conn->rowProcessor = pqAddRow;
+    conn->rowProcessorParam = NULL;    /*     * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5078,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
+void
+PQregisterRowProcessor(PGconn *conn, RowProcessor func, void *param)
+{
+    conn->rowProcessor = (func ? func : pqAddRow);
+    conn->rowProcessorParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..82914fd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
+static int    pqAddTuple(PGresult *res, PGresAttValue *tup);/* ----------------
@@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->rowProcessor = pqAddRow;
+    result->rowProcessorParam = NULL;
+    result->rowProcessorErrMes = NULL;    if (conn)    {
@@ -194,6 +198,10 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)            }            result->nEvents =
conn->nEvents;       }
 
+
+        /* copy row processor settings */
+        result->rowProcessor = conn->rowProcessor;
+        result->rowProcessorParam = conn->rowProcessorParam;    }    else    {
@@ -701,7 +709,6 @@ pqClearAsyncResult(PGconn *conn)    if (conn->result)        PQclear(conn->result);    conn->result
=NULL;
 
-    conn->curTuple = NULL;}/*
@@ -756,7 +763,6 @@ pqPrepareAsyncResult(PGconn *conn)     */    res = conn->result;    conn->result = NULL;        /*
handingover ownership to caller */
 
-    conn->curTuple = NULL;        /* just in case */    if (!res)        res = PQmakeEmptyPGresult(conn,
PGRES_FATAL_ERROR);   else
 
@@ -828,9 +834,52 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/*
+ * pqAddRow
+ *      add a row to the PGresult structure, growing it if necessary
+ *      Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+int
+pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+{
+    PGresAttValue *tup;
+    int nfields = res->numAttributes;
+    int i;
+
+    tup = (PGresAttValue *)
+        pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+    if (tup == NULL) return FALSE;
+
+    memcpy(tup, columns, nfields * sizeof(PGresAttValue));
+
+    for (i = 0 ; i < nfields ; i++)
+    {
+        tup[i].len = columns[i].len;
+        if (tup[i].len == NULL_LEN)
+        {
+            tup[i].value = res->null_field;
+        }
+        else
+        {
+            bool isbinary = (res->attDescs[i].format != 0);
+            tup[i].value =
+                (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+            if (tup[i].value == NULL)
+                return FALSE;
+
+            memcpy(tup[i].value, columns[i].value, tup[i].len);
+            /* We have to terminate this ourselves */
+            tup[i].value[tup[i].len] = '\0';
+        }
+    }
+
+    return pqAddTuple(res, tup);
+}
+
+/* * pqAddTuple
- *      add a row pointer to the PGresult structure, growing it if necessary
- *      Returns TRUE if OK, FALSE if not enough memory to add the row
+ *      add a row POINTER to the PGresult structure, growing it if
+ *      necessary Returns TRUE if OK, FALSE if not enough memory to add
+ *      the row. */intpqAddTuple(PGresult *res, PGresAttValue *tup)
@@ -1223,7 +1272,6 @@ PQsendQueryStart(PGconn *conn)    /* initialize async result-accumulation state */
conn->result= NULL;
 
-    conn->curTuple = NULL;    /* ready to send command message */    return true;
@@ -2822,6 +2870,30 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)        return 0;}
+/* PQsetRowProcessorErrMes
+ *    Set the error message pass back to the caller of RowProcessor.
+ *
+ *  You can replace the previous message by alternative mes, or clear
+ *  it with NULL.
+ */
+void
+PQsetRowProcessorErrMes(PGresult *res, char *mes)
+{
+    /* Free existing message */
+    if (res->rowProcessorErrMes)
+        free(res->rowProcessorErrMes);
+
+    /*
+     * Set the duped message if mes is not NULL. Failure of strdup
+     * will be handled as 'Out of memory' by the caller of the
+     * RowProcessor.
+     */
+    if (mes)
+        res->rowProcessorErrMes = strdup(mes);
+    else
+        res->rowProcessorErrMes = NULL;
+}
+/* PQnparams: *    returns the number of input parameters of a prepared statement. */
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/*
+ * pqGetnchar:
+ *    skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+    if (len > (size_t) (conn->inEnd - conn->inCursor))
+        return EOF;
+
+    conn->inCursor += len;
+
+    if (conn->Pfdebug)
+        fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+                (unsigned long) len);
+
+    return 0;
+}
+
+/* * pqPutnchar: *    write exactly len bytes to the current message */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..496c42e 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -715,7 +715,7 @@ getAnotherTuple(PGconn *conn, bool binary){    PGresult   *result = conn->result;    int
nfields= result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  rowval[result->numAttributes + 1];    /* the backend sends us a bitmap of which attributes are null */
  char        std_bitmap[64]; /* used unless it doesn't fit */
 
@@ -729,26 +729,11 @@ getAnotherTuple(PGconn *conn, bool binary)    result->binary = binary;
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
+    if (binary)    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-        /*
-         * If it's binary, fix the column format indicators.  We assume the
-         * backend will consistently send either B or D, not a mix.
-         */
-        if (binary)
-        {
-            for (i = 0; i < nfields; i++)
-                result->attDescs[i].format = 1;
-        }
+        for (i = 0; i < nfields; i++)
+            result->attDescs[i].format = 1;    }
-    tup = conn->curTuple;    /* Get the null-value bitmap */    nbytes = (nfields + BITS_PER_BYTE - 1) /
BITS_PER_BYTE;
@@ -757,7 +742,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto rowProcessError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -771,34 +756,31 @@ getAnotherTuple(PGconn *conn, bool binary)    for (i = 0; i < nfields; i++)    {        if
(!(bmap& 0200))
 
-        {
-            /* if the field value is absent, make it a null string */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-        }
+            vlen = NULL_LEN;
+        else if (pqGetInt(&vlen, 4, conn))
+                goto EOFexit;        else        {
-            /* get the value length (the first four bytes are for length) */
-            if (pqGetInt(&vlen, 4, conn))
-                goto EOFexit;            if (!binary)                vlen = vlen - 4;            if (vlen < 0)
      vlen = 0;
 
-            if (tup[i].value == NULL)
-            {
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                if (tup[i].value == NULL)
-                    goto outOfMemory;
-            }
-            tup[i].len = vlen;
-            /* read in the value */
-            if (vlen > 0)
-                if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                    goto EOFexit;
-            /* we have to terminate this ourselves */
-            tup[i].value[vlen] = '\0';        }
+
+        /*
+         * Buffer content may be shifted on reloading additional
+         * data. So we must set all pointers on every scan.
+         *
+         * rowval[i].value always points to the next address of the
+         * length field even if the value length is zero or the value
+         * is NULL for the access safety.
+         */
+        rowval[i].value = conn->inBuffer + conn->inCursor;
+        rowval[i].len = vlen;
+        /* Skip the value */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            goto EOFexit;
+        /* advance the bitmap stuff */        bitcnt++;        if (bitcnt == BITS_PER_BYTE)
@@ -811,17 +793,33 @@ getAnotherTuple(PGconn *conn, bool binary)            bmap <<= 1;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /*
+     * Set rowval[nfields] for the access safety. We can estimate the
+     * length of the buffer to store by
+     *
+     *    rowval[nfields].value - rowval[0].value - 4 * (# of non-nulls)).
+     */
+    rowval[nfields].value = conn->inBuffer + conn->inCursor;
+    rowval[nfields].len = NULL_LEN;
+
+    /* Success!  Pass the completed row values to rowProcessor */
+    if (!result->rowProcessor(result, result->rowProcessorParam, rowval))
+        goto rowProcessError;
+
+    /* Free garbage message. */
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    if (bitmap != std_bitmap)        free(bitmap);
+    return 0;
-outOfMemory:
+rowProcessError:
+        /* Replace partially constructed result with an error result */    /*
@@ -829,8 +827,21 @@ outOfMemory:     * there's not enough memory to concatenate messages...     */
pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from RowProcessor, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes :
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..b7c6118 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -625,22 +625,12 @@ getAnotherTuple(PGconn *conn, int msgLength){    PGresult   *result = conn->result;    int
   nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  rowval[result->numAttributes + 1];    int            tupnfields;        /* # fields from tuple */
int           vlen;            /* length of the current field value */    int            i;    /* Allocate tuple space
iffirst time for this data message */
 
-    if (conn->curTuple == NULL)
-    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-    }
-    tup = conn->curTuple;
-    /* Get the field count and make sure it's what we expect */    if (pqGetInt(&tupnfields, 2, conn))        return
EOF;
@@ -663,48 +653,70 @@ getAnotherTuple(PGconn *conn, int msgLength)        if (pqGetInt(&vlen, 4, conn))
returnEOF;        if (vlen == -1)
 
-        {
-            /* null field */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-            continue;
-        }
-        if (vlen < 0)
+            vlen = NULL_LEN;
+        else if (vlen < 0)            vlen = 0;
-        if (tup[i].value == NULL)
-        {
-            bool        isbinary = (result->attDescs[i].format != 0);
+        
+        /*
+         * Buffer content may be shifted on reloading additional
+         * data. So we must set all pointers on every scan.
+         * 
+         * rowval[i].value always points to the next address of the
+         * length field even if the value length is zero or the value
+         * is NULL for the access safety.
+         */
+        rowval[i].value = conn->inBuffer + conn->inCursor;
+         rowval[i].len = vlen;
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-            if (tup[i].value == NULL)
-                goto outOfMemory;
-        }
-        tup[i].len = vlen;
-        /* read in the value */
-        if (vlen > 0)
-            if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                return EOF;
-        /* we have to terminate this ourselves */
-        tup[i].value[vlen] = '\0';
+        /* Skip to the next length field */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            return EOF;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /*
+     * Set rowval[nfields] for the access safety. We can estimate the
+     * length of the buffer to store by
+     *
+     *    rowval[nfields].value - rowval[0].value - 4 * nfields.
+     */
+    rowval[nfields].value = conn->inBuffer + conn->inCursor;
+    rowval[nfields].len = NULL_LEN;
+
+    /* Success!  Pass the completed row values to rowProcessor */
+    if (!result->rowProcessor(result, result->rowProcessorParam, rowval))
+        goto rowProcessError;
+    
+    /* Free garbage error message. */
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    return 0;
-outOfMemory:
+rowProcessError:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..27ef007 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,16 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGrowValue represents a value of one tuple field in string form,
+   used by RowProcessor. NULL is represented as len < 0. Otherwise
+   value points to a string without null termination of the length of
+   len. */
+typedef struct pgRowValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, without null termination */
+} PGrowValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +426,32 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQsetRowProcessorErrMes.  It
+ * is assumed by caller as out of memory when the error message is not
+ * set on failure. This function is assumed not to throw any
+ * exception.
+ */
+    typedef int (*RowProcessor)(PGresult *res, void *param,
+                                PGrowValue *columns);
+    
+/*
+ * Register alternative result store function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ * 
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQregisterRowProcessor(PGconn *conn, RowProcessor func,
+                                   void *rowProcessorParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +490,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void    PQsetRowProcessorErrMes(PGresult *res, char *mes);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..06d8b26 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -209,6 +209,11 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
  curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    RowProcessor rowProcessor;  /* Result row processor handler. See
+                                 * RowProcessor for details. */
+    void *rowProcessorParam;    /* Contextual parameter for rowProcessor */
+    char *rowProcessorErrMes;   /* Error message from rowProcessor */};/* PGAsyncStatusType defines the state of the
query-executionstate machine */
 
@@ -398,7 +403,6 @@ struct pg_conn    /* Status for asynchronous result construction */    PGresult   *result;
 /* result being constructed */
 
-    PGresAttValue *curTuple;    /* tuple currently being read */#ifdef USE_SSL    bool        allow_ssl_try;    /*
Allowedto try SSL negotiation */
 
@@ -443,6 +447,13 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /* Tuple store handler. The two fields below is copied to newly
+     * created PGresult if rowProcessor is not NULL. Use default
+     * function if NULL. */
+    RowProcessor rowProcessor;   /* Result row processor. See
+                                  * RowProcessor for details. */
+    void *rowProcessorParam;     /* Contextual parameter for rowProcessor */};/* PGcancel stores all data necessary to
cancela connection. A copy of this
 
@@ -507,7 +518,7 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check
theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
 
-extern int    pqAddTuple(PGresult *res, PGresAttValue *tup);
+extern int    pqAddRow(PGresult *res, void *param, PGrowValue *columns);extern void pqSaveMessageField(PGresult *res,
charcode,                   const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name,
 
@@ -560,6 +571,7 @@ extern int    pqGets(PQExpBuffer buf, PGconn *conn);extern int    pqGets_append(PQExpBuffer buf,
PGconn*conn);extern int    pqPuts(const char *s, PGconn *conn);extern int    pqGetnchar(char *s, size_t len, PGconn
*conn);
+extern int    pqSkipnchar(size_t len, PGconn *conn);extern int    pqPutnchar(const char *s, size_t len, PGconn
*conn);externint    pqGetInt(int *result, size_t bytes, PGconn *conn);extern int    pqPutInt(int value, size_t bytes,
PGconn*conn); 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..5417df1 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,199 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-alterrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-alterrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to pass every row to your own row processor instead of
+   storing into PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registerrowprocessor">
+    <term>
+     <function>PQregisterRowProcessor</function>
+     <indexterm>
+      <primary>PQregisterRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQregisterRowProcessor(PGconn *conn,
+                            RowProcessor func,
+                            void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the storage handler
+           function. PGresult created from this connection calls this
+           function to process each row.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default processor.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-rowprocessor">
+    <term>
+     <type>RowProcessor</type>
+     <indexterm>
+      <primary>RowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+bool (*RowProcessor)(PGresult   *res,
+                     void       *param,
+                     PGrowValue *columns);
+
+typedef struct
+{
+    int         len;            /* length in bytes of the value */
+    char       *value;          /* actual value, without null termination */
+} PGrowValue;
+
+</synopsis>
+     </para>
+
+     <para>
+       This function must return TRUE for success, and FALSE for
+       failure. On failure this function should set the error message
+       with <function>PGsetRowProcessorErrMes</function> if the cause
+       is other than out of memory. This funcion must not throw any
+       exception.
+     </para>
+     <variablelist>
+       <varlistentry>
+
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         A pointer to contextual parameter which is registered
+         by <function>PQregisterRowProcessor</function>.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>columns</parameter></term>
+     <listitem>
+       <para>
+         Column values of the row to process.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessorerrmes">
+    <term>
+     <function>PQsetRowProcessorErrMes</function>
+     <indexterm>
+      <primary>PQsetRowProcessorErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>RowProcessor</type>.  If this message is not set, the
+    caller assumes the error to be out of memory.
+<synopsis>
+void PQsetRowProcessorErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>RowProcessor</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>mes</parameter></term>
+        <listitem>
+          <para>
+        Error message. This will be copied internally so there is
+        no need to care of the scope.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it will be overritten. Set NULL to cancel the the costom
+        message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..e6edcd5 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +754,214 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    {
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **) malloc(sinfo->nattrs * sizeof(char*));
+    sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    if (sinfo->valbuf == NULL || sinfo->valbuflen == NULL)
+        ereport(ERROR,
+                (errcode(ERRCODE_OUT_OF_MEMORY),
+                 errmsg("out of memory")));
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    for (i = 0 ; i < sinfo->nattrs ; i++)    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            is_sql_cmd = false;
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)
+        {
+            if (sinfo->valbuf[i])            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
+                free(sinfo->valbuf[i]);
+                sinfo->valbuf[i] = NULL;            }
-
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
-
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
-
-            values = (char **) palloc(nfields * sizeof(char *));
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+static int
+storeHandler(PGresult *res, void *param, PGrowValue *columns)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      *cstrs[PQnfields(res)];
-                if (!is_sql_cmd)
-                {
-                    int            i;
+    if (sinfo->error_occurred)
+        return FALSE;
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
+
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return FALSE;
+    }
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)
+    {
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            if (sinfo->valbuf[i] == NULL)
+            {
+                sinfo->valbuf[i] = (char *)malloc(len + 1);
+                sinfo->valbuflen[i] = len + 1;
+            }
+            else if (sinfo->valbuflen[i] < len + 1)
+            {
+                sinfo->valbuf[i] = (char *)realloc(sinfo->valbuf[i], len + 1);
+                sinfo->valbuflen[i] = len + 1;            }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
+            if (sinfo->valbuf[i] == NULL)
+                ereport(ERROR,
+                        (errcode(ERRCODE_OUT_OF_MEMORY),
+                         errmsg("out of memory")));
+
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';        }
+    }
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        MemoryContext context;
+        /*
+         * Store exception for later ReThrow and cancel the exception.
+         */
+        sinfo->error_occurred = TRUE;
+        context = MemoryContextSwitchTo(sinfo->oldcontext);
+        sinfo->edata = CopyErrorData();
+        MemoryContextSwitchTo(context);
+        FlushErrorState();
+        return FALSE;    }    PG_END_TRY();
+
+    return TRUE;}/*

Re: Speed dblink using alternate libpq tuple storage

От
Marko Kreen
Дата:
On Mon, Jan 30, 2012 at 06:06:57PM +0900, Kyotaro HORIGUCHI wrote:
> The gain of performance is more than expected. Measure script now
> does query via dblink ten times for stability of measuring, so
> the figures become about ten times longer than the previous ones.
> 
>                        sec    % to Original
> Original             : 31.5     100.0%
> RowProcessor patch   : 31.3      99.4%
> dblink patch         : 24.6      78.1%
> 
> RowProcessor patch alone makes no loss or very-little gain, and
> full patch gives us 22% gain for the benchmark(*1).

Excellent!

> - The callers of RowProcessor() no more set null_field to
>   PGrowValue.value. Plus, the PGrowValue[] which RowProcessor()
>   receives has nfields + 1 elements to be able to make rough
>   estimate by cols->value[nfields].value - cols->value[0].value -
>   something.  The somthing here is 4 * nfields for protocol3 and
>   4 * (non-null fields) for protocol2. I fear that this applies
>   only for textual transfer usage...

Excact estimate is not important here.  And (nfields + 1) elem
feels bit too much magic, considering that most users probably
do not need it.  Without it, the logic would be:
total = last.value - first.value + ((last.len > 0) ? last.len : 0)

which isn't too complex.  So I think we can remove it.


= Problems =

* Remove the dubious memcpy() in pqAddRow()

* I think the dynamic arrays in getAnotherTuple() are not portable enough, please do proper allocation for array.  I
guessin PQsetResultAttrs()?
 


= Minor notes =

These can be argued either way, if you don't like some
suggestion, you can drop it.

* Move PQregisterRowProcessor() into fe-exec.c, then we can make pqAddRow static.

* Should PQclear() free RowProcessor error msg?  It seems it should not get outside from getAnotherTuple(), but thats
notcertain.  Perhaps it would be clearer to free it here too.
 

* Remove the part of comment in getAnotherTuple():  * Buffer content may be shifted on reloading additional  * data. So
wemust set all pointers on every scan.
 
 It's confusing why it needs to clarify that, as there is nobody expecting it.

* PGrowValue documentation should mention that ->value pointer is always valid.

* dblink: Perhaps some of those mallocs() could be replaced with pallocs() or even StringInfo, which already does the
reallocdance?  I'm not familiar with dblink, and various struct lifetimes there so I don't know it that actually makes
senseor not.
 


It seems this patch is getting ReadyForCommitter soon...

-- 
marko