Re: Make COPY format extendable: Extract COPY TO format implementations

Поиск
Список
Период
Сортировка
От Sutou Kouhei
Тема Re: Make COPY format extendable: Extract COPY TO format implementations
Дата
Msg-id 20240124.144936.67229716500876806.kou@clear-code.com
обсуждение исходный текст
Ответ на Re: Make COPY format extendable: Extract COPY TO format implementations  (Sutou Kouhei <kou@clear-code.com>)
Ответы Re: Make COPY format extendable: Extract COPY TO format implementations  (Michael Paquier <michael@paquier.xyz>)
Re: Make COPY format extendable: Extract COPY TO format implementations  (Sutou Kouhei <kou@clear-code.com>)
Список pgsql-hackers
Hi,

I've implemented custom COPY format feature based on the
current design discussion. See the attached patches for
details.

I also implemented a PoC COPY format handler for Apache
Arrow with this implementation and it worked.
https://github.com/kou/pg-copy-arrow

The patches implement not only custom COPY TO format feature
but also custom COPY FROM format feature.

0001-0004 is for COPY TO and 0005-0008 is for COPY FROM.

For COPY TO:

0001: This adds CopyToRoutine and use it for text/csv/binary
formats. No implementation change. This just move codes.

0002: This adds support for adding custom COPY TO format by
"CREATE FUNCTION ${FORMAT_NAME}". This uses the same
approach provided by Sawada-san[1] but this doesn't
introduce a wrapper CopyRoutine struct for
Copy{To,From}Routine. Because I noticed that a wrapper
CopyRoutine struct is needless. Copy handler can just return
CopyToRoutine or CopyFromRtouine because both of them have
NodeTag. We can distinct a returned struct by the NodeTag.

[1] https://www.postgresql.org/message-id/CAD21AoCunywHird3GaPzWe6s9JG1wzxj3Cr6vGN36DDheGjOjA@mail.gmail.com

0003: This exports CopyToStateData. No implementation change
except CopyDest enum values. I changed COPY_ prefix to
COPY_DEST_ to avoid name conflict with CopySource enum
values. This just moves codes.

0004: This adds CopyToState::opaque and exports
CopySendEndOfRow(). CopySendEndOfRow() is renamed to
CopyToStateFlush().

For COPY FROM:

0005: Same as 0001 but for COPY FROM. This adds
CopyFromRoutine and use it for text/csv/binary formats. No
implementation change. This just move codes.

0006: Same as 0002 but for COPY FROM. This adds support for
adding custom COPY FROM format by "CREATE FUNCTION
${FORMAT_NAME}".

0007: Same as 0003 but for COPY FROM. This exports
CopyFromStateData. No implementation change except
CopySource enum values. I changed COPY_ prefix to
COPY_SOURCE_ to align CopyDest changes in 0003. This just
moves codes.

0008: Same as 0004 but for COPY FROM. This adds
CopyFromState::opaque and exports
CopyReadBinaryData(). CopyReadBinaryData() is renamed to
CopyFromStateRead().


Thanks,
-- 
kou

In <20240115.152702.2011620917962812379.kou@clear-code.com>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Mon, 15 Jan 2024 15:27:02 +0900 (JST),
  Sutou Kouhei <kou@clear-code.com> wrote:

> Hi,
> 
> If there are no more comments for the current design, I'll
> start implementing this feature with the following
> approaches for "Discussing" items:
> 
>> 3.1 Should we use one function(internal) for COPY TO/FROM
>>     handlers or two function(internal)s (one is for COPY TO
>>     handler and another is for COPY FROM handler)?
>>     [4]
> 
> I'll choose "one function(internal) for COPY TO/FROM handlers".
> 
>> 3.4 Should we export Copy{To,From}State? Or should we just
>>     provide getters/setters to access Copy{To,From}State
>>     internal?
>>     [10]
> 
> I'll export Copy{To,From}State.
> 
> 
> Thanks,
> -- 
> kou
> 
> In <20240112.144615.157925223373344229.kou@clear-code.com>
>   "Re: Make COPY format extendable: Extract COPY TO format implementations" on Fri, 12 Jan 2024 14:46:15 +0900
(JST),
>   Sutou Kouhei <kou@clear-code.com> wrote:
> 
>> Hi,
>> 
>> Here is the current summary for a this discussion to make
>> COPY format extendable. It's for reaching consensus and
>> starting implementing the feature. (I'll start implementing
>> the feature once we reach consensus.) If you have any
>> opinion, please share it.
>> 
>> Confirmed:
>> 
>> 1.1 Making COPY format extendable will not reduce performance.
>>     [1]
>> 
>> Decisions:
>> 
>> 2.1 Use separated handler for COPY TO and COPY FROM because
>>     our COPY TO implementation (copyto.c) and COPY FROM
>>     implementation (coypfrom.c) are separated.
>>     [2]
>> 
>> 2.2 Don't use system catalog for COPY TO/FROM handlers. We can
>>     just use a function(internal) that returns a handler instead.
>>     [3]
>> 
>> 2.3 The implementation must include documentation.
>>     [5]
>> 
>> 2.4 The implementation must include test.
>>     [6]
>> 
>> 2.5 The implementation should be consist of small patches
>>     for easy to review.
>>     [6]
>> 
>> 2.7 Copy{To,From}State must have a opaque space for
>>     handlers.
>>     [8]
>> 
>> 2.8 Export CopySendData() and CopySendEndOfRow() for COPY TO
>>     handlers.
>>     [8]
>> 
>> 2.9 Make "format" in PgMsg_CopyOutResponse message
>>     extendable.
>>     [9]
>> 
>> 2.10 Make makeNode() call avoidable in function(internal)
>>      that returns COPY TO/FROM handler.
>>      [9]
>> 
>> 2.11 Custom COPY TO/FROM handlers must be able to parse
>>      their options.
>>      [11]
>> 
>> Discussing:
>> 
>> 3.1 Should we use one function(internal) for COPY TO/FROM
>>     handlers or two function(internal)s (one is for COPY TO
>>     handler and another is for COPY FROM handler)?
>>     [4]
>> 
>> 3.2 If we use separated function(internal) for COPY TO/FROM
>>     handlers, we need to define naming rule. For example,
>>     <method_name>_to(internal) for COPY TO handler and
>>     <method_name>_from(internal) for COPY FROM handler.
>>     [4]
>> 
>> 3.3 Should we use prefix or suffix for function(internal)
>>     name to avoid name conflict with other handlers such as
>>     tablesample handlers?
>>     [7]
>> 
>> 3.4 Should we export Copy{To,From}State? Or should we just
>>     provide getters/setters to access Copy{To,From}State
>>     internal?
>>     [10]
>> 
>> 
>> [1] https://www.postgresql.org/message-id/flat/20231204.153548.2126325458835528809.kou%40clear-code.com
>> [2] https://www.postgresql.org/message-id/flat/ZXEUIy6wl4jHy6Nm%40paquier.xyz
>> [3]
https://www.postgresql.org/message-id/flat/CAD21AoAhcZkAp_WDJ4sSv_%2Bg2iCGjfyMFgeu7MxjnjX_FutZAg%40mail.gmail.com
>> [4]
https://www.postgresql.org/message-id/flat/CAD21AoDkoGL6yJ_HjNOg9cU%3DaAdW8uQ3rSQOeRS0SX85LPPNwQ%40mail.gmail.com
>> [5]
https://www.postgresql.org/message-id/flat/TY3PR01MB9889C9234CD220A3A7075F0DF589A%40TY3PR01MB9889.jpnprd01.prod.outlook.com
>> [6] https://www.postgresql.org/message-id/flat/ZXbiPNriHHyUrcTF%40paquier.xyz
>> [7] https://www.postgresql.org/message-id/flat/20231214.184414.2179134502876898942.kou%40clear-code.com
>> [8] https://www.postgresql.org/message-id/flat/20231221.183504.1240642084042888377.kou%40clear-code.com
>> [9] https://www.postgresql.org/message-id/flat/ZYTfqGppMc9e_w2k%40paquier.xyz
>> [10]
https://www.postgresql.org/message-id/flat/CAD21AoD%3DUapH4Wh06G6H5XAzPJ0iJg9YcW8r7E2UEJkZ8QsosA%40mail.gmail.com
>> [11] https://www.postgresql.org/message-id/flat/20240110.152023.1920937326588672387.kou%40clear-code.com
>> 
>> 
>> Thanks,
>> -- 
>> kou
>> 
>> 
> 
>
From 3444b523aa356417f4cb3ec0c78894de65684889 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Dec 2023 12:32:54 +0900
Subject: [PATCH v6 1/8] Extract COPY TO format implementations

This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
  https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
  https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com

This doesn't change the current behavior. This just introduces
CopyToRoutine, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyToRoutine can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.

Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:

https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us

> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.

You can see that there is no significant loss of performance:

Data: Random 32 bit integers:

    CREATE TABLE data (int32 integer);
    INSERT INTO data
      SELECT random() * 10000
        FROM generate_series(1, ${n_records});

The number of records: 100K, 1M and 10M

100K without this change:

    format,elapsed time (ms)
    text,11.002
    csv,11.696
    binary,11.352

100K with this change:

    format,elapsed time (ms)
    text,100000,11.562
    csv,100000,11.889
    binary,100000,10.825

1M without this change:

    format,elapsed time (ms)
    text,108.359
    csv,114.233
    binary,111.251

1M with this change:

    format,elapsed time (ms)
    text,111.269
    csv,116.277
    binary,104.765

10M without this change:

    format,elapsed time (ms)
    text,1090.763
    csv,1136.103
    binary,1137.141

10M with this change:

    format,elapsed time (ms)
    text,1082.654
    csv,1196.991
    binary,1069.697
---
 contrib/file_fdw/file_fdw.c     |   2 +-
 src/backend/commands/copy.c     |  43 +++-
 src/backend/commands/copyfrom.c |   2 +-
 src/backend/commands/copyto.c   | 428 ++++++++++++++++++++------------
 src/include/commands/copy.h     |   7 +-
 src/include/commands/copyapi.h  |  59 +++++
 6 files changed, 376 insertions(+), 165 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 249d82d3a0..9e4e819858 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -329,7 +329,7 @@ file_fdw_validator(PG_FUNCTION_ARGS)
     /*
      * Now apply the core COPY code's validation logic for more checks.
      */
-    ProcessCopyOptions(NULL, NULL, true, other_options);
+    ProcessCopyOptions(NULL, NULL, true, NULL, other_options);
 
     /*
      * Either filename or program option is required for file_fdw foreign
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cc0786c6f4..5f3697a5f9 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -442,6 +442,9 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
  * a list of options.  In that usage, 'opts_out' can be passed as NULL and
  * the collected data is just leaked until CurrentMemoryContext is reset.
  *
+ * 'cstate' is CopyToState* for !is_from, CopyFromState* for is_from. 'cstate'
+ * may be NULL. For example, file_fdw uses NULL.
+ *
  * Note that additional checking, such as whether column names listed in FORCE
  * QUOTE actually exist, has to be applied later.  This just checks for
  * self-consistency of the options list.
@@ -450,6 +453,7 @@ void
 ProcessCopyOptions(ParseState *pstate,
                    CopyFormatOptions *opts_out,
                    bool is_from,
+                   void *cstate,
                    List *options)
 {
     bool        format_specified = false;
@@ -464,7 +468,13 @@ ProcessCopyOptions(ParseState *pstate,
 
     opts_out->file_encoding = -1;
 
-    /* Extract options from the statement node tree */
+    /* Text is the default format. */
+    opts_out->to_routine = &CopyToRoutineText;
+
+    /*
+     * Extract only the "format" option to detect target routine as the first
+     * step
+     */
     foreach(option, options)
     {
         DefElem    *defel = lfirst_node(DefElem, option);
@@ -479,15 +489,29 @@ ProcessCopyOptions(ParseState *pstate,
             if (strcmp(fmt, "text") == 0)
                  /* default format */ ;
             else if (strcmp(fmt, "csv") == 0)
+            {
                 opts_out->csv_mode = true;
+                opts_out->to_routine = &CopyToRoutineCSV;
+            }
             else if (strcmp(fmt, "binary") == 0)
+            {
                 opts_out->binary = true;
+                opts_out->to_routine = &CopyToRoutineBinary;
+            }
             else
                 ereport(ERROR,
                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                          errmsg("COPY format \"%s\" not recognized", fmt),
                          parser_errposition(pstate, defel->location)));
         }
+    }
+    /* Extract options except "format" from the statement node tree */
+    foreach(option, options)
+    {
+        DefElem    *defel = lfirst_node(DefElem, option);
+
+        if (strcmp(defel->defname, "format") == 0)
+            continue;
         else if (strcmp(defel->defname, "freeze") == 0)
         {
             if (freeze_specified)
@@ -616,11 +640,18 @@ ProcessCopyOptions(ParseState *pstate,
             opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from);
         }
         else
-            ereport(ERROR,
-                    (errcode(ERRCODE_SYNTAX_ERROR),
-                     errmsg("option \"%s\" not recognized",
-                            defel->defname),
-                     parser_errposition(pstate, defel->location)));
+        {
+            bool        processed = false;
+
+            if (!is_from)
+                processed = opts_out->to_routine->CopyToProcessOption(cstate, defel);
+            if (!processed)
+                ereport(ERROR,
+                        (errcode(ERRCODE_SYNTAX_ERROR),
+                         errmsg("option \"%s\" not recognized",
+                                defel->defname),
+                         parser_errposition(pstate, defel->location)));
+        }
     }
 
     /*
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 173a736ad5..05b3d13236 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1411,7 +1411,7 @@ BeginCopyFrom(ParseState *pstate,
     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
     /* Extract options from the statement node tree */
-    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
+    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , cstate, options);
 
     /* Process the target relation */
     cstate->rel = rel;
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index d3dc3fc854..6547b7c654 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,275 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToRoutine implementations.
+ */
+
+/*
+ * CopyToRoutine implementation for "text" and "csv". CopyToText*()
+ * refer cstate->opts.csv_mode and change their behavior. We can split this
+ * implementation and stop referring cstate->opts.csv_mode later.
+ */
+
+/* All "text" and "csv" options are parsed in ProcessCopyOptions(). We may
+ * move the code to here later. */
+static bool
+CopyToTextProcessOption(CopyToState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyToTextGetFormat(CopyToState cstate)
+{
+    return 0;
+}
+
+static void
+CopyToTextSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+    CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, colname, false,
+                                    list_length(cstate->attnumlist) == 1);
+            else
+                CopyAttributeOutText(cstate, colname);
+        }
+
+        CopyToTextSendEndOfRow(cstate);
+    }
+}
+
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1], value);
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, string,
+                                    cstate->opts.force_quote_flags[attnum - 1],
+                                    list_length(cstate->attnumlist) == 1);
+            else
+                CopyAttributeOutText(cstate, string);
+        }
+    }
+
+    CopyToTextSendEndOfRow(cstate);
+}
+
+static void
+CopyToTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToRoutine implementation for "binary".
+ */
+
+/* All "binary" options are parsed in ProcessCopyOptions(). We may move the
+ * code to here later. */
+static bool
+CopyToBinaryProcessOption(CopyToState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyToBinaryGetFormat(CopyToState cstate)
+{
+    return 1;
+}
+
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    {
+        /* Generate header for a binary copy */
+        int32        tmp;
+
+        /* Signature */
+        CopySendData(cstate, BinarySignature, 11);
+        /* Flags field */
+        tmp = 0;
+        CopySendInt32(cstate, tmp);
+        /* No header extension */
+        tmp = 0;
+        CopySendInt32(cstate, tmp);
+    }
+}
+
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+CopyToRoutine CopyToRoutineText = {
+    .CopyToProcessOption = CopyToTextProcessOption,
+    .CopyToGetFormat = CopyToTextGetFormat,
+    .CopyToStart = CopyToTextStart,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextEnd,
+};
+
+/*
+ * We can use the same CopyToRoutine for both of "text" and "csv" because
+ * CopyToText*() refer cstate->opts.csv_mode and change their behavior. We can
+ * split the implementations and stop referring cstate->opts.csv_mode later.
+ */
+CopyToRoutine CopyToRoutineCSV = {
+    .CopyToProcessOption = CopyToTextProcessOption,
+    .CopyToGetFormat = CopyToTextGetFormat,
+    .CopyToStart = CopyToTextStart,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextEnd,
+};
+
+CopyToRoutine CopyToRoutineBinary = {
+    .CopyToProcessOption = CopyToBinaryProcessOption,
+    .CopyToGetFormat = CopyToBinaryGetFormat,
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -141,7 +410,7 @@ SendCopyBegin(CopyToState cstate)
 {
     StringInfoData buf;
     int            natts = list_length(cstate->attnumlist);
-    int16        format = (cstate->opts.binary ? 1 : 0);
+    int16        format = cstate->opts.to_routine->CopyToGetFormat(cstate);
     int            i;
 
     pq_beginmessage(&buf, PqMsg_CopyOutResponse);
@@ -198,16 +467,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -242,10 +501,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -431,7 +686,7 @@ BeginCopyTo(ParseState *pstate,
     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
     /* Extract options from the statement node tree */
-    ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
+    ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , cstate, options);
 
     /* Process the source/target relation or query */
     if (rel)
@@ -748,8 +1003,6 @@ DoCopyTo(CopyToState cstate)
     bool        pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
     bool        fe_copy = (pipe && whereToSendOutput == DestRemote);
     TupleDesc    tupDesc;
-    int            num_phys_attrs;
-    ListCell   *cur;
     uint64        processed;
 
     if (fe_copy)
@@ -759,32 +1012,11 @@ DoCopyTo(CopyToState cstate)
         tupDesc = RelationGetDescr(cstate->rel);
     else
         tupDesc = cstate->queryDesc->tupDesc;
-    num_phys_attrs = tupDesc->natts;
     cstate->opts.null_print_client = cstate->opts.null_print;    /* default */
 
     /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
     cstate->fe_msgbuf = makeStringInfo();
 
-    /* Get info about the columns we need to process. */
-    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
-        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
-        if (cstate->opts.binary)
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-        else
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-    }
-
     /*
      * Create a temporary memory context that we can reset once per row to
      * recover palloc'd memory.  This avoids any problems with leaks inside
@@ -795,57 +1027,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false,
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->opts.to_routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -884,13 +1066,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
+    cstate->opts.to_routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -906,71 +1082,15 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    bool        need_delim = false;
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
-    ListCell   *cur;
-    char       *string;
 
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Datum        value = slot->tts_values[attnum - 1];
-        bool        isnull = slot->tts_isnull[attnum - 1];
-
-        if (!cstate->opts.binary)
-        {
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-        }
-
-        if (isnull)
-        {
-            if (!cstate->opts.binary)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-                CopySendInt32(cstate, -1);
-        }
-        else
-        {
-            if (!cstate->opts.binary)
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1],
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-            else
-            {
-                bytea       *outputbytes;
-
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->opts.to_routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b3da3cb0be..34bea880ca 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,6 +14,7 @@
 #ifndef COPY_H
 #define COPY_H
 
+#include "commands/copyapi.h"
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
@@ -74,11 +75,11 @@ typedef struct CopyFormatOptions
     bool        convert_selectively;    /* do selective binary conversion? */
     CopyOnErrorChoice on_error; /* what to do when error happened */
     List       *convert_select; /* list of column names (can be NIL) */
+    CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
+/* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
@@ -87,7 +88,7 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
 
-extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options);
+extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List
*options);
 extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause,
                                    const char *filename,
                                    bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List
*options);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 0000000000..eb68f2fb7b
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO/FROM handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/parsenodes.h"
+
+/* This is private in commands/copyto.c */
+typedef struct CopyToStateData *CopyToState;
+
+typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel);
+typedef int16 (*CopyToGetFormat_function) (CopyToState cstate);
+typedef void (*CopyToStart_function) (CopyToState cstate, TupleDesc tupDesc);
+typedef void (*CopyToOneRow_function) (CopyToState cstate, TupleTableSlot *slot);
+typedef void (*CopyToEnd_function) (CopyToState cstate);
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called for processing one COPY TO option. This will return false when
+     * the given option is invalid.
+     */
+    CopyToProcessOption_function CopyToProcessOption;
+
+    /*
+     * Called when COPY TO is started. This will return a format as int16
+     * value. It's used for the CopyOutResponse message.
+     */
+    CopyToGetFormat_function CopyToGetFormat;
+
+    /* Called when COPY TO is started. This will send a header. */
+    CopyToStart_function CopyToStart;
+
+    /* Copy one row for COPY TO. */
+    CopyToOneRow_function CopyToOneRow;
+
+    /* Called when COPY TO is ended. This will send a trailer. */
+    CopyToEnd_function CopyToEnd;
+}            CopyToRoutine;
+
+/* Built-in CopyToRoutine for "text", "csv" and "binary". */
+extern CopyToRoutine CopyToRoutineText;
+extern CopyToRoutine CopyToRoutineCSV;
+extern CopyToRoutine CopyToRoutineBinary;
+
+#endif                            /* COPYAPI_H */
-- 
2.43.0

From bd5848739465618fd31839b8ed34ad1cb95f6359 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 13:58:38 +0900
Subject: [PATCH v6 2/8] Add support for adding custom COPY TO format

This uses the handler approach like tablesample. The approach creates
an internal function that returns an internal struct. In this case,
a COPY TO handler returns a CopyToRoutine.

We will add support for custom COPY FROM format later. We'll use the
same handler for COPY TO and COPY FROM. PostgreSQL calls a COPY
TO/FROM handler with "is_from" argument. It's true for COPY FROM and
false for COPY TO:

    copy_handler(true) returns CopyToRoutine
    copy_handler(false) returns CopyFromRoutine (not exist yet)

We discussed that we introduce a wrapper struct for it:

    typedef struct CopyRoutine
    {
        NodeTag type;
        /* either CopyToRoutine or CopyFromRoutine */
        Node *routine;
    }

    copy_handler(true) returns CopyRoutine with CopyToRoutine
    copy_handler(false) returns CopyRoutine with CopyFromRoutine

See also:
https://www.postgresql.org/message-id/flat/CAD21AoCunywHird3GaPzWe6s9JG1wzxj3Cr6vGN36DDheGjOjA%40mail.gmail.com

But I noticed that we don't need the wrapper struct. We can just
CopyToRoutine or CopyFromRoutine. Because we can distinct the returned
struct by checking its NodeTag. So I don't use the wrapper struct
approach.
---
 src/backend/commands/copy.c                   | 84 ++++++++++++++-----
 src/backend/nodes/Makefile                    |  1 +
 src/backend/nodes/gen_node_support.pl         |  2 +
 src/backend/utils/adt/pseudotypes.c           |  1 +
 src/include/catalog/pg_proc.dat               |  6 ++
 src/include/catalog/pg_type.dat               |  6 ++
 src/include/commands/copyapi.h                |  2 +
 src/include/nodes/meson.build                 |  1 +
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_copy_format/.gitignore  |  4 +
 src/test/modules/test_copy_format/Makefile    | 23 +++++
 .../expected/test_copy_format.out             | 17 ++++
 src/test/modules/test_copy_format/meson.build | 33 ++++++++
 .../test_copy_format/sql/test_copy_format.sql |  8 ++
 .../test_copy_format--1.0.sql                 |  8 ++
 .../test_copy_format/test_copy_format.c       | 77 +++++++++++++++++
 .../test_copy_format/test_copy_format.control |  4 +
 18 files changed, 260 insertions(+), 19 deletions(-)
 mode change 100644 => 100755 src/backend/nodes/gen_node_support.pl
 create mode 100644 src/test/modules/test_copy_format/.gitignore
 create mode 100644 src/test/modules/test_copy_format/Makefile
 create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out
 create mode 100644 src/test/modules/test_copy_format/meson.build
 create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.c
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5f3697a5f9..6f0db0ae7c 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -32,6 +32,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
 #include "utils/acl.h"
@@ -430,6 +431,69 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
     return COPY_ON_ERROR_STOP;    /* keep compiler quiet */
 }
 
+/*
+ * Process the "format" option.
+ *
+ * This function checks whether the option value is a built-in format such as
+ * "text" and "csv" or not. If the option value isn't a built-in format, this
+ * function finds a COPY format handler that returns a CopyToRoutine. If no
+ * COPY format handler is found, this function reports an error.
+ */
+static void
+ProcessCopyOptionCustomFormat(ParseState *pstate,
+                              CopyFormatOptions *opts_out,
+                              bool is_from,
+                              DefElem *defel)
+{
+    char       *format;
+    Oid            funcargtypes[1];
+    Oid            handlerOid = InvalidOid;
+    Datum        datum;
+    void       *routine;
+
+    format = defGetString(defel);
+
+    /* built-in formats */
+    if (strcmp(format, "text") == 0)
+         /* default format */ return;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        opts_out->to_routine = &CopyToRoutineCSV;
+        return;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        opts_out->to_routine = &CopyToRoutineBinary;
+        return;
+    }
+
+    /* custom format */
+    if (!is_from)
+    {
+        funcargtypes[0] = INTERNALOID;
+        handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                    funcargtypes, true);
+    }
+    if (!OidIsValid(handlerOid))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
+    routine = DatumGetPointer(datum);
+    if (routine == NULL || !IsA(routine, CopyToRoutine))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY handler function %s(%u) did not return a CopyToRoutine struct",
+                        format, handlerOid),
+                 parser_errposition(pstate, defel->location)));
+
+    opts_out->to_routine = routine;
+}
+
 /*
  * Process the statement option list for COPY.
  *
@@ -481,28 +545,10 @@ ProcessCopyOptions(ParseState *pstate,
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-            {
-                opts_out->csv_mode = true;
-                opts_out->to_routine = &CopyToRoutineCSV;
-            }
-            else if (strcmp(fmt, "binary") == 0)
-            {
-                opts_out->binary = true;
-                opts_out->to_routine = &CopyToRoutineBinary;
-            }
-            else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+            ProcessCopyOptionCustomFormat(pstate, opts_out, is_from, defel);
         }
     }
     /* Extract options except "format" from the statement node tree */
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index 66bbad8e6e..173ee11811 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -49,6 +49,7 @@ node_headers = \
     access/sdir.h \
     access/tableam.h \
     access/tsmapi.h \
+    commands/copyapi.h \
     commands/event_trigger.h \
     commands/trigger.h \
     executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
old mode 100644
new mode 100755
index 2f0a59bc87..bd397f45ac
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index a3a991f634..d308780c43 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -373,6 +373,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(internal);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ad74e07dbb..4772bdc0e4 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7617,6 +7617,12 @@
 { oid => '3312', descr => 'I/O',
   proname => 'tsm_handler_out', prorettype => 'cstring',
   proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' },
+{ oid => '8753', descr => 'I/O',
+  proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler',
+  proargtypes => 'cstring', prosrc => 'copy_handler_in' },
+{ oid => '8754', descr => 'I/O',
+  proname => 'copy_handler_out', prorettype => 'cstring',
+  proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
 { oid => '267', descr => 'I/O',
   proname => 'table_am_handler_in', proisstrict => 'f',
   prorettype => 'table_am_handler', proargtypes => 'cstring',
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index d29194da31..2040d5da83 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -632,6 +632,12 @@
   typcategory => 'P', typinput => 'tsm_handler_in',
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+{ oid => '8752',
+  descr => 'pseudo-type for the result of a copy to/from method functoin',
+  typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'copy_handler_in',
+  typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '269',
   typname => 'table_am_handler',
   descr => 'pseudo-type for the result of a table AM handler function',
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index eb68f2fb7b..9c25e1c415 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -29,6 +29,8 @@ typedef void (*CopyToEnd_function) (CopyToState cstate);
 /* Routines for a COPY TO format implementation. */
 typedef struct CopyToRoutine
 {
+    NodeTag        type;
+
     /*
      * Called for processing one COPY TO option. This will return false when
      * the given option is invalid.
diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build
index b665e55b65..103df1a787 100644
--- a/src/include/nodes/meson.build
+++ b/src/include/nodes/meson.build
@@ -11,6 +11,7 @@ node_support_input_i = [
   'access/sdir.h',
   'access/tableam.h',
   'access/tsmapi.h',
+  'commands/copyapi.h',
   'commands/event_trigger.h',
   'commands/trigger.h',
   'executor/tuptable.h',
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index e32c8925f6..9d57b868d5 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
           spgist_name_ops \
           test_bloomfilter \
           test_copy_callbacks \
+          test_copy_format \
           test_custom_rmgrs \
           test_ddl_deparse \
           test_dsa \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 397e0906e6..d76f2a6003 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -13,6 +13,7 @@ subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
 subdir('test_copy_callbacks')
+subdir('test_copy_format')
 subdir('test_custom_rmgrs')
 subdir('test_ddl_deparse')
 subdir('test_dsa')
diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_copy_format/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile
new file mode 100644
index 0000000000..8497f91624
--- /dev/null
+++ b/src/test/modules/test_copy_format/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_format/Makefile
+
+MODULE_big = test_copy_format
+OBJS = \
+    $(WIN32RES) \
+    test_copy_format.o
+PGFILEDESC = "test_copy_format - test custom COPY FORMAT"
+
+EXTENSION = test_copy_format
+DATA = test_copy_format--1.0.sql
+
+REGRESS = test_copy_format
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_format
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
new file mode 100644
index 0000000000..3a24ae7b97
--- /dev/null
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -0,0 +1,17 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test TO stdout WITH (
+    option_before 'before',
+    format 'test_copy_format',
+    option_after 'after'
+);
+NOTICE:  test_copy_format: is_from=false
+NOTICE:  CopyToProcessOption: "option_before"="before"
+NOTICE:  CopyToProcessOption: "option_after"="after"
+NOTICE:  CopyToGetFormat
+NOTICE:  CopyToStart: natts=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToEnd
diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build
new file mode 100644
index 0000000000..4cefe7b709
--- /dev/null
+++ b/src/test/modules/test_copy_format/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+test_copy_format_sources = files(
+  'test_copy_format.c',
+)
+
+if host_system == 'windows'
+  test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_format',
+    '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',])
+endif
+
+test_copy_format = shared_module('test_copy_format',
+  test_copy_format_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_copy_format
+
+test_install_data += files(
+  'test_copy_format.control',
+  'test_copy_format--1.0.sql',
+)
+
+tests += {
+  'name': 'test_copy_format',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_format',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
new file mode 100644
index 0000000000..0eb7ed2e11
--- /dev/null
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -0,0 +1,8 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test TO stdout WITH (
+    option_before 'before',
+    format 'test_copy_format',
+    option_after 'after'
+);
diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql
b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
new file mode 100644
index 0000000000..d24ea03ce9
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit
+
+CREATE FUNCTION test_copy_format(internal)
+    RETURNS copy_handler
+    AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
new file mode 100644
index 0000000000..a2219afcde
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -0,0 +1,77 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_format.c
+ *        Code for testing custom COPY format.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *        src/test/modules/test_copy_format/test_copy_format.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "commands/copy.h"
+#include "commands/defrem.h"
+
+PG_MODULE_MAGIC;
+
+static bool
+CopyToProcessOption(CopyToState cstate, DefElem *defel)
+{
+    ereport(NOTICE,
+            (errmsg("CopyToProcessOption: \"%s\"=\"%s\"",
+                    defel->defname, defGetString(defel))));
+    return true;
+}
+
+static int16
+CopyToGetFormat(CopyToState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyToGetFormat")));
+    return 0;
+}
+
+static void
+CopyToStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts)));
+}
+
+static void
+CopyToOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid)));
+}
+
+static void
+CopyToEnd(CopyToState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyToEnd")));
+}
+
+static const CopyToRoutine CopyToRoutineTestCopyFormat = {
+    .type = T_CopyToRoutine,
+    .CopyToProcessOption = CopyToProcessOption,
+    .CopyToGetFormat = CopyToGetFormat,
+    .CopyToStart = CopyToStart,
+    .CopyToOneRow = CopyToOneRow,
+    .CopyToEnd = CopyToEnd,
+};
+
+PG_FUNCTION_INFO_V1(test_copy_format);
+Datum
+test_copy_format(PG_FUNCTION_ARGS)
+{
+    bool        is_from = PG_GETARG_BOOL(0);
+
+    ereport(NOTICE,
+            (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
+
+    if (is_from)
+        elog(ERROR, "COPY FROM isn't supported yet");
+
+    PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+}
diff --git a/src/test/modules/test_copy_format/test_copy_format.control
b/src/test/modules/test_copy_format/test_copy_format.control
new file mode 100644
index 0000000000..f05a636235
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.control
@@ -0,0 +1,4 @@
+comment = 'Test code for custom COPY format'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_format'
+relocatable = true
-- 
2.43.0

From 6480ebddfabed628c79a7c25f42f87b44d76f74f Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 14:54:10 +0900
Subject: [PATCH v6 3/8] Export CopyToStateData

It's for custom COPY TO format handlers implemented as extension.

This just moves codes. This doesn't change codes except CopyDest enum
values. CopyDest enum values such as COPY_FILE are conflicted
CopySource enum values defined in copyfrom_internal.h. So COPY_DEST_
prefix instead of COPY_ prefix is used. For example, COPY_FILE is
renamed to COPY_DEST_FILE.

Note that this change isn't enough to implement a custom COPY TO
format handler as extension. We'll do the followings in a subsequent
commit:

1. Add an opaque space for custom COPY TO format handler
2. Export CopySendEndOfRow() to flush buffer
---
 src/backend/commands/copyto.c  |  74 +++-----------------
 src/include/commands/copy.h    |  59 ----------------
 src/include/commands/copyapi.h | 120 ++++++++++++++++++++++++++++++++-
 3 files changed, 127 insertions(+), 126 deletions(-)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 6547b7c654..cfc74ee7b1 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -43,64 +43,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * Represents the different dest cases we need to worry about at
- * the bottom level
- */
-typedef enum CopyDest
-{
-    COPY_FILE,                    /* to file (or a piped program) */
-    COPY_FRONTEND,                /* to frontend */
-    COPY_CALLBACK,                /* to callback function */
-} CopyDest;
-
-/*
- * This struct contains all the state variables used throughout a COPY TO
- * operation.
- *
- * Multi-byte encodings: all supported client-side encodings encode multi-byte
- * characters by having the first byte's high bit set. Subsequent bytes of the
- * character can have the high bit not set. When scanning data in such an
- * encoding to look for a match to a single-byte (ie ASCII) character, we must
- * use the full pg_encoding_mblen() machinery to skip over multibyte
- * characters, else we might find a false match to a trailing byte. In
- * supported server encodings, there is no possibility of a false match, and
- * it's faster to make useless comparisons to trailing bytes than it is to
- * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
- * when we have to do it the hard way.
- */
-typedef struct CopyToStateData
-{
-    /* low-level state data */
-    CopyDest    copy_dest;        /* type of copy source/destination */
-    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
-
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy to */
-    QueryDesc  *queryDesc;        /* executable query to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDOUT */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_dest_cb data_dest_cb; /* function for writing data */
-
-    CopyFormatOptions opts;
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    FmgrInfo   *out_functions;    /* lookup info for output functions */
-    MemoryContext rowcontext;    /* per-row evaluation context */
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyToStateData;
-
 /* DestReceiver for COPY (query) TO */
 typedef struct
 {
@@ -160,7 +102,7 @@ CopyToTextSendEndOfRow(CopyToState cstate)
 {
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             /* Default line termination depends on platform */
 #ifndef WIN32
             CopySendChar(cstate, '\n');
@@ -168,7 +110,7 @@ CopyToTextSendEndOfRow(CopyToState cstate)
             CopySendString(cstate, "\r\n");
 #endif
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* The FE/BE protocol uses \n as newline for all platforms */
             CopySendChar(cstate, '\n');
             break;
@@ -419,7 +361,7 @@ SendCopyBegin(CopyToState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_dest = COPY_FRONTEND;
+    cstate->copy_dest = COPY_DEST_FRONTEND;
 }
 
 static void
@@ -466,7 +408,7 @@ CopySendEndOfRow(CopyToState cstate)
 
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -500,11 +442,11 @@ CopySendEndOfRow(CopyToState cstate)
                              errmsg("could not write to COPY file: %m")));
             }
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
-        case COPY_CALLBACK:
+        case COPY_DEST_CALLBACK:
             cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
             break;
     }
@@ -877,12 +819,12 @@ BeginCopyTo(ParseState *pstate,
     /* See Multibyte encoding comment above */
     cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
 
-    cstate->copy_dest = COPY_FILE;    /* default */
+    cstate->copy_dest = COPY_DEST_FILE; /* default */
 
     if (data_dest_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_dest = COPY_CALLBACK;
+        cstate->copy_dest = COPY_DEST_CALLBACK;
         cstate->data_dest_cb = data_dest_cb;
     }
     else if (pipe)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 34bea880ca..b3f4682f95 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -20,69 +20,10 @@
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-/*
- * Represents whether a header line should be present, and whether it must
- * match the actual names (which implies "true").
- */
-typedef enum CopyHeaderChoice
-{
-    COPY_HEADER_FALSE = 0,
-    COPY_HEADER_TRUE,
-    COPY_HEADER_MATCH,
-} CopyHeaderChoice;
-
-/*
- * Represents where to save input processing errors.  More values to be added
- * in the future.
- */
-typedef enum CopyOnErrorChoice
-{
-    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
-    COPY_ON_ERROR_IGNORE,        /* ignore errors */
-} CopyOnErrorChoice;
-
-/*
- * A struct to hold COPY options, in a parsed form. All of these are related
- * to formatting, except for 'freeze', which doesn't really belong here, but
- * it's expedient to parse it along with all the other options.
- */
-typedef struct CopyFormatOptions
-{
-    /* parameters from the COPY command */
-    int            file_encoding;    /* file or remote side's character encoding,
-                                 * -1 if not specified */
-    bool        binary;            /* binary format? */
-    bool        freeze;            /* freeze rows on loading? */
-    bool        csv_mode;        /* Comma Separated Value format? */
-    CopyHeaderChoice header_line;    /* header line? */
-    char       *null_print;        /* NULL marker string (server encoding!) */
-    int            null_print_len; /* length of same */
-    char       *null_print_client;    /* same converted to file encoding */
-    char       *default_print;    /* DEFAULT marker string */
-    int            default_print_len;    /* length of same */
-    char       *delim;            /* column delimiter (must be 1 byte) */
-    char       *quote;            /* CSV quote char (must be 1 byte) */
-    char       *escape;            /* CSV escape char (must be 1 byte) */
-    List       *force_quote;    /* list of column names */
-    bool        force_quote_all;    /* FORCE_QUOTE *? */
-    bool       *force_quote_flags;    /* per-column CSV FQ flags */
-    List       *force_notnull;    /* list of column names */
-    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
-    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
-    List       *force_null;        /* list of column names */
-    bool        force_null_all; /* FORCE_NULL *? */
-    bool       *force_null_flags;    /* per-column CSV FN flags */
-    bool        convert_selectively;    /* do selective binary conversion? */
-    CopyOnErrorChoice on_error; /* what to do when error happened */
-    List       *convert_select; /* list of column names (can be NIL) */
-    CopyToRoutine *to_routine;    /* callback routines for COPY TO */
-} CopyFormatOptions;
-
 /* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
-typedef void (*copy_data_dest_cb) (void *data, int len);
 
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 9c25e1c415..a869d78d72 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -14,10 +14,10 @@
 #ifndef COPYAPI_H
 #define COPYAPI_H
 
+#include "executor/execdesc.h"
 #include "executor/tuptable.h"
 #include "nodes/parsenodes.h"
 
-/* This is private in commands/copyto.c */
 typedef struct CopyToStateData *CopyToState;
 
 typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel);
@@ -58,4 +58,122 @@ extern CopyToRoutine CopyToRoutineText;
 extern CopyToRoutine CopyToRoutineCSV;
 extern CopyToRoutine CopyToRoutineBinary;
 
+/*
+ * Represents whether a header line should be present, and whether it must
+ * match the actual names (which implies "true").
+ */
+typedef enum CopyHeaderChoice
+{
+    COPY_HEADER_FALSE = 0,
+    COPY_HEADER_TRUE,
+    COPY_HEADER_MATCH,
+} CopyHeaderChoice;
+
+/*
+ * Represents where to save input processing errors.  More values to be added
+ * in the future.
+ */
+typedef enum CopyOnErrorChoice
+{
+    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
+    COPY_ON_ERROR_IGNORE,        /* ignore errors */
+} CopyOnErrorChoice;
+
+/*
+ * A struct to hold COPY options, in a parsed form. All of these are related
+ * to formatting, except for 'freeze', which doesn't really belong here, but
+ * it's expedient to parse it along with all the other options.
+ */
+typedef struct CopyFormatOptions
+{
+    /* parameters from the COPY command */
+    int            file_encoding;    /* file or remote side's character encoding,
+                                 * -1 if not specified */
+    bool        binary;            /* binary format? */
+    bool        freeze;            /* freeze rows on loading? */
+    bool        csv_mode;        /* Comma Separated Value format? */
+    CopyHeaderChoice header_line;    /* header line? */
+    char       *null_print;        /* NULL marker string (server encoding!) */
+    int            null_print_len; /* length of same */
+    char       *null_print_client;    /* same converted to file encoding */
+    char       *default_print;    /* DEFAULT marker string */
+    int            default_print_len;    /* length of same */
+    char       *delim;            /* column delimiter (must be 1 byte) */
+    char       *quote;            /* CSV quote char (must be 1 byte) */
+    char       *escape;            /* CSV escape char (must be 1 byte) */
+    List       *force_quote;    /* list of column names */
+    bool        force_quote_all;    /* FORCE_QUOTE *? */
+    bool       *force_quote_flags;    /* per-column CSV FQ flags */
+    List       *force_notnull;    /* list of column names */
+    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
+    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
+    List       *force_null;        /* list of column names */
+    bool        force_null_all; /* FORCE_NULL *? */
+    bool       *force_null_flags;    /* per-column CSV FN flags */
+    bool        convert_selectively;    /* do selective binary conversion? */
+    CopyOnErrorChoice on_error; /* what to do when error happened */
+    List       *convert_select; /* list of column names (can be NIL) */
+    CopyToRoutine *to_routine;    /* callback routines for COPY TO */
+} CopyFormatOptions;
+
+/*
+ * Represents the different dest cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopyDest
+{
+    COPY_DEST_FILE,                /* to file (or a piped program) */
+    COPY_DEST_FRONTEND,            /* to frontend */
+    COPY_DEST_CALLBACK,            /* to callback function */
+} CopyDest;
+
+typedef void (*copy_data_dest_cb) (void *data, int len);
+
+/*
+ * This struct contains all the state variables used throughout a COPY TO
+ * operation.
+ *
+ * Multi-byte encodings: all supported client-side encodings encode multi-byte
+ * characters by having the first byte's high bit set. Subsequent bytes of the
+ * character can have the high bit not set. When scanning data in such an
+ * encoding to look for a match to a single-byte (ie ASCII) character, we must
+ * use the full pg_encoding_mblen() machinery to skip over multibyte
+ * characters, else we might find a false match to a trailing byte. In
+ * supported server encodings, there is no possibility of a false match, and
+ * it's faster to make useless comparisons to trailing bytes than it is to
+ * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
+ * when we have to do it the hard way.
+ */
+typedef struct CopyToStateData
+{
+    /* low-level state data */
+    CopyDest    copy_dest;        /* type of copy source/destination */
+    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
+
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy to */
+    QueryDesc  *queryDesc;        /* executable query to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDOUT */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_dest_cb data_dest_cb; /* function for writing data */
+
+    CopyFormatOptions opts;
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    FmgrInfo   *out_functions;    /* lookup info for output functions */
+    MemoryContext rowcontext;    /* per-row evaluation context */
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyToStateData;
+
 #endif                            /* COPYAPI_H */
-- 
2.43.0

From 636e28b6478a8295469f832fb816a835e9cf24f6 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 15:12:43 +0900
Subject: [PATCH v6 4/8] Add support for implementing custom COPY TO format as
 extension

* Add CopyToStateData::opaque that can be used to keep data for custom
  COPY TO format implementation
* Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf
* Rename CopySendEndOfRow() to CopyToStateFlush() because it's a
  method for CopyToState and it's used for flushing. End-of-row related
  codes were moved to CopyToTextSendEndOfRow().
---
 src/backend/commands/copyto.c  | 15 +++++++--------
 src/include/commands/copyapi.h |  5 +++++
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index cfc74ee7b1..b5d8678394 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -69,7 +69,6 @@ static void SendCopyEnd(CopyToState cstate);
 static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
 static void CopySendString(CopyToState cstate, const char *str);
 static void CopySendChar(CopyToState cstate, char c);
-static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
@@ -117,7 +116,7 @@ CopyToTextSendEndOfRow(CopyToState cstate)
         default:
             break;
     }
-    CopySendEndOfRow(cstate);
+    CopyToStateFlush(cstate);
 }
 
 static void
@@ -302,7 +301,7 @@ CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
         }
     }
 
-    CopySendEndOfRow(cstate);
+    CopyToStateFlush(cstate);
 }
 
 static void
@@ -311,7 +310,7 @@ CopyToBinaryEnd(CopyToState cstate)
     /* Generate trailer for a binary copy */
     CopySendInt16(cstate, -1);
     /* Need to flush out the trailer */
-    CopySendEndOfRow(cstate);
+    CopyToStateFlush(cstate);
 }
 
 CopyToRoutine CopyToRoutineText = {
@@ -377,8 +376,8 @@ SendCopyEnd(CopyToState cstate)
  * CopySendData sends output data to the destination (file or frontend)
  * CopySendString does the same for null-terminated strings
  * CopySendChar does the same for single characters
- * CopySendEndOfRow does the appropriate thing at end of each data row
- *    (data is not actually flushed except by CopySendEndOfRow)
+ * CopyToStateFlush flushes the buffered data
+ *    (data is not actually flushed except by CopyToStateFlush)
  *
  * NB: no data conversion is applied by these functions
  *----------
@@ -401,8 +400,8 @@ CopySendChar(CopyToState cstate, char c)
     appendStringInfoCharMacro(cstate->fe_msgbuf, c);
 }
 
-static void
-CopySendEndOfRow(CopyToState cstate)
+void
+CopyToStateFlush(CopyToState cstate)
 {
     StringInfo    fe_msgbuf = cstate->fe_msgbuf;
 
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index a869d78d72..ffad433a21 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -174,6 +174,11 @@ typedef struct CopyToStateData
     FmgrInfo   *out_functions;    /* lookup info for output functions */
     MemoryContext rowcontext;    /* per-row evaluation context */
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyToStateData;
 
+extern void CopyToStateFlush(CopyToState cstate);
+
 #endif                            /* COPYAPI_H */
-- 
2.43.0

From 53b120ef11a10563fb9f12ad40042adf039bd18c Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 17:21:23 +0900
Subject: [PATCH v6 5/8] Extract COPY FROM format implementations

This doesn't change the current behavior. This just introduces
CopyFromRoutine, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyFromRoutine can't be used from extensions yet because
CopyRead*() aren't exported yet. Extensions can't read data from a
source without CopyRead*(). They will be exported by subsequent
patches.
---
 src/backend/commands/copy.c              |   3 +
 src/backend/commands/copyfrom.c          | 216 ++++++++++----
 src/backend/commands/copyfromparse.c     | 346 ++++++++++++-----------
 src/include/commands/copy.h              |   3 -
 src/include/commands/copyapi.h           |  44 +++
 src/include/commands/copyfrom_internal.h |   4 +
 6 files changed, 401 insertions(+), 215 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6f0db0ae7c..ec6dfff8ab 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -459,12 +459,14 @@ ProcessCopyOptionCustomFormat(ParseState *pstate,
     else if (strcmp(format, "csv") == 0)
     {
         opts_out->csv_mode = true;
+        opts_out->from_routine = &CopyFromRoutineCSV;
         opts_out->to_routine = &CopyToRoutineCSV;
         return;
     }
     else if (strcmp(format, "binary") == 0)
     {
         opts_out->binary = true;
+        opts_out->from_routine = &CopyFromRoutineBinary;
         opts_out->to_routine = &CopyToRoutineBinary;
         return;
     }
@@ -533,6 +535,7 @@ ProcessCopyOptions(ParseState *pstate,
     opts_out->file_encoding = -1;
 
     /* Text is the default format. */
+    opts_out->from_routine = &CopyFromRoutineText;
     opts_out->to_routine = &CopyToRoutineText;
 
     /*
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 05b3d13236..de85e4e9f1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -108,6 +108,170 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+
+/*
+ * CopyFromRoutine implementations.
+ */
+
+/*
+ * CopyFromRoutine implementation for "text" and "csv". CopyFromText*()
+ * refer cstate->opts.csv_mode and change their behavior. We can split this
+ * implementation and stop referring cstate->opts.csv_mode later.
+ */
+
+/* All "text" and "csv" options are parsed in ProcessCopyOptions(). We may
+ * move the code to here later. */
+static bool
+CopyFromTextProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyFromTextGetFormat(CopyFromState cstate)
+{
+    return 0;
+}
+
+static void
+CopyFromTextStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    num_phys_attrs = tupDesc->natts;
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Pick up the required catalog information for each attribute in the
+     * relation, including the input function, the element type (to pass to
+     * the input function).
+     */
+    cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+    for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+    {
+        Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+        Oid            in_func_oid;
+
+        /* We don't need info for dropped attributes */
+        if (att->attisdropped)
+            continue;
+
+        /* Fetch the input function and typioparam info */
+        getTypeInputInfo(att->atttypid,
+                         &in_func_oid, &cstate->typioparams[attnum - 1]);
+        fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]);
+    }
+
+    /* create workspace for CopyReadAttributes results */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+static void
+CopyFromTextEnd(CopyFromState cstate)
+{
+}
+
+/*
+ * CopyFromRoutine implementation for "binary".
+ */
+
+/* All "binary" options are parsed in ProcessCopyOptions(). We may move the
+ * code to here later. */
+static bool
+CopyFromBinaryProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyFromBinaryGetFormat(CopyFromState cstate)
+{
+    return 1;
+}
+
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    num_phys_attrs = tupDesc->natts;
+
+    /*
+     * Pick up the required catalog information for each attribute in the
+     * relation, including the input function, the element type (to pass to
+     * the input function).
+     */
+    cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+    for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+    {
+        Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+        Oid            in_func_oid;
+
+        /* We don't need info for dropped attributes */
+        if (att->attisdropped)
+            continue;
+
+        /* Fetch the input function and typioparam info */
+        getTypeBinaryInputInfo(att->atttypid,
+                               &in_func_oid, &cstate->typioparams[attnum - 1]);
+        fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]);
+    }
+
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+}
+
+CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromProcessOption = CopyFromTextProcessOption,
+    .CopyFromGetFormat = CopyFromTextGetFormat,
+    .CopyFromStart = CopyFromTextStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextEnd,
+};
+
+/*
+ * We can use the same CopyFromRoutine for both of "text" and "csv" because
+ * CopyFromText*() refer cstate->opts.csv_mode and change their behavior. We can
+ * split the implementations and stop referring cstate->opts.csv_mode later.
+ */
+CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromProcessOption = CopyFromTextProcessOption,
+    .CopyFromGetFormat = CopyFromTextGetFormat,
+    .CopyFromStart = CopyFromTextStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextEnd,
+};
+
+CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromProcessOption = CopyFromBinaryProcessOption,
+    .CopyFromGetFormat = CopyFromBinaryGetFormat,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+
 /*
  * error context callback for COPY FROM
  *
@@ -1379,9 +1543,6 @@ BeginCopyFrom(ParseState *pstate,
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
                 num_defaults;
-    FmgrInfo   *in_functions;
-    Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1566,25 +1727,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1603,8 +1745,6 @@ BeginCopyFrom(ParseState *pstate,
      * the input function), and info about defaults and constraints. (Which
      * input function we use depends on text/binary format choice.)
      */
-    in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
 
@@ -1616,15 +1756,6 @@ BeginCopyFrom(ParseState *pstate,
         if (att->attisdropped)
             continue;
 
-        /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-        else
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
 
@@ -1684,8 +1815,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->bytes_processed = 0;
 
     /* We keep those variables in cstate. */
-    cstate->in_functions = in_functions;
-    cstate->typioparams = typioparams;
     cstate->defmap = defmap;
     cstate->defexprs = defexprs;
     cstate->volatile_defexprs = volatile_defexprs;
@@ -1758,20 +1887,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
-    {
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->opts.from_routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1784,6 +1900,8 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    cstate->opts.from_routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 7cacd0b752..49632f75e4 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -172,7 +172,7 @@ ReceiveCopyBegin(CopyFromState cstate)
 {
     StringInfoData buf;
     int            natts = list_length(cstate->attnumlist);
-    int16        format = (cstate->opts.binary ? 1 : 0);
+    int16        format = cstate->opts.from_routine->CopyFromGetFormat(cstate);
     int            i;
 
     pq_beginmessage(&buf, PqMsg_CopyInResponse);
@@ -840,6 +840,185 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     return true;
 }
 
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        if (cstate->opts.csv_mode)
+        {
+            if (string == NULL &&
+                cstate->opts.force_notnull_flags[m])
+            {
+                /*
+                 * FORCE_NOT_NULL option is set and column is NULL - convert
+                 * it to the NULL string.
+                 */
+                string = cstate->opts.null_print;
+            }
+            else if (string != NULL && cstate->opts.force_null_flags[m]
+                     && strcmp(string, cstate->opts.null_print) == 0)
+            {
+                /*
+                 * FORCE_NULL option is set and column matches the NULL
+                 * string. It must have been quoted, or otherwise the string
+                 * would already have been set to NULL. Convert it to NULL as
+                 * specified.
+                 */
+                string = NULL;
+            }
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            cstate->num_errors++;
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -857,181 +1036,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                cstate->num_errors++;
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    if (!cstate->opts.from_routine->CopyFromOneRow(cstate, econtext, values,
+                                                   nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b3f4682f95..df29d42555 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -20,9 +20,6 @@
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-/* This is private in commands/copyfrom.c */
-typedef struct CopyFromStateData *CopyFromState;
-
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index ffad433a21..323e4705d2 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -18,6 +18,49 @@
 #include "executor/tuptable.h"
 #include "nodes/parsenodes.h"
 
+/* This is private in commands/copyfrom.c */
+typedef struct CopyFromStateData *CopyFromState;
+
+typedef bool (*CopyFromProcessOption_function) (CopyFromState cstate, DefElem *defel);
+typedef int16 (*CopyFromGetFormat_function) (CopyFromState cstate);
+typedef void (*CopyFromStart_function) (CopyFromState cstate, TupleDesc tupDesc);
+typedef bool (*CopyFromOneRow_function) (CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+typedef void (*CopyFromEnd_function) (CopyFromState cstate);
+
+/* Routines for a COPY FROM format implementation. */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called for processing one COPY FROM option. This will return false when
+     * the given option is invalid.
+     */
+    CopyFromProcessOption_function CopyFromProcessOption;
+
+    /*
+     * Called when COPY FROM is started. This will return a format as int16
+     * value. It's used for the CopyInResponse message.
+     */
+    CopyFromGetFormat_function CopyFromGetFormat;
+
+    /*
+     * Called when COPY FROM is started. This will initialize something and
+     * receive a header.
+     */
+    CopyFromStart_function CopyFromStart;
+
+    /* Copy one row. It returns false if no more tuples. */
+    CopyFromOneRow_function CopyFromOneRow;
+
+    /* Called when COPY FROM is ended. This will finalize something. */
+    CopyFromEnd_function CopyFromEnd;
+}            CopyFromRoutine;
+
+/* Built-in CopyFromRoutine for "text", "csv" and "binary". */
+extern CopyFromRoutine CopyFromRoutineText;
+extern CopyFromRoutine CopyFromRoutineCSV;
+extern CopyFromRoutine CopyFromRoutineBinary;
+
+
 typedef struct CopyToStateData *CopyToState;
 
 typedef bool (*CopyToProcessOption_function) (CopyToState cstate, DefElem *defel);
@@ -113,6 +156,7 @@ typedef struct CopyFormatOptions
     bool        convert_selectively;    /* do selective binary conversion? */
     CopyOnErrorChoice on_error; /* what to do when error happened */
     List       *convert_select; /* list of column names (can be NIL) */
+    CopyFromRoutine *from_routine;    /* callback routines for COPY FROM */
     CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..921c1513f7 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -183,4 +183,8 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+
+
 #endif                            /* COPYFROM_INTERNAL_H */
-- 
2.43.0

From 8e75d2c93f6ce9d67664d53233d0b71c9f10613a Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Wed, 24 Jan 2024 11:07:14 +0900
Subject: [PATCH v6 6/8] Add support for adding custom COPY FROM format

We use the same approach as we used for custom COPY TO format. Now,
custom COPY format handler can return COPY TO format routines or COPY
FROM format routines based on the "is_from" argument:

    copy_handler(true) returns CopyToRoutine
    copy_handler(false) returns CopyFromRoutine
---
 src/backend/commands/copy.c                   | 53 +++++++++++++------
 src/include/commands/copyapi.h                |  2 +
 .../expected/test_copy_format.out             | 12 +++++
 .../test_copy_format/sql/test_copy_format.sql |  6 +++
 .../test_copy_format/test_copy_format.c       | 50 +++++++++++++++--
 5 files changed, 105 insertions(+), 18 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index ec6dfff8ab..479f36868c 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -472,12 +472,9 @@ ProcessCopyOptionCustomFormat(ParseState *pstate,
     }
 
     /* custom format */
-    if (!is_from)
-    {
-        funcargtypes[0] = INTERNALOID;
-        handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
-                                    funcargtypes, true);
-    }
+    funcargtypes[0] = INTERNALOID;
+    handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                funcargtypes, true);
     if (!OidIsValid(handlerOid))
         ereport(ERROR,
                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -486,14 +483,36 @@ ProcessCopyOptionCustomFormat(ParseState *pstate,
 
     datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
     routine = DatumGetPointer(datum);
-    if (routine == NULL || !IsA(routine, CopyToRoutine))
-        ereport(ERROR,
-                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                 errmsg("COPY handler function %s(%u) did not return a CopyToRoutine struct",
-                        format, handlerOid),
-                 parser_errposition(pstate, defel->location)));
-
-    opts_out->to_routine = routine;
+    if (is_from)
+    {
+        if (routine == NULL || !IsA(routine, CopyFromRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyFromRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+        opts_out->from_routine = routine;
+    }
+    else
+    {
+        if (routine == NULL || !IsA(routine, CopyToRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyToRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+        opts_out->to_routine = routine;
+    }
 }
 
 /*
@@ -692,7 +711,11 @@ ProcessCopyOptions(ParseState *pstate,
         {
             bool        processed = false;
 
-            if (!is_from)
+            if (is_from)
+                processed =
+                    opts_out->from_routine->CopyFromProcessOption(
+                                                                  cstate, defel);
+            else
                 processed = opts_out->to_routine->CopyToProcessOption(cstate, defel);
             if (!processed)
                 ereport(ERROR,
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 323e4705d2..ef1bb201c2 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -30,6 +30,8 @@ typedef void (*CopyFromEnd_function) (CopyFromState cstate);
 /* Routines for a COPY FROM format implementation. */
 typedef struct CopyFromRoutine
 {
+    NodeTag        type;
+
     /*
      * Called for processing one COPY FROM option. This will return false when
      * the given option is invalid.
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
index 3a24ae7b97..6af69f0eb7 100644
--- a/src/test/modules/test_copy_format/expected/test_copy_format.out
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -1,6 +1,18 @@
 CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a INT, b INT, c INT);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (
+    option_before 'before',
+    format 'test_copy_format',
+    option_after 'after'
+);
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromProcessOption: "option_before"="before"
+NOTICE:  CopyFromProcessOption: "option_after"="after"
+NOTICE:  CopyFromGetFormat
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromEnd
 COPY public.test TO stdout WITH (
     option_before 'before',
     format 'test_copy_format',
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
index 0eb7ed2e11..94d3c789a0 100644
--- a/src/test/modules/test_copy_format/sql/test_copy_format.sql
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -1,6 +1,12 @@
 CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a INT, b INT, c INT);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (
+    option_before 'before',
+    format 'test_copy_format',
+    option_after 'after'
+);
+\.
 COPY public.test TO stdout WITH (
     option_before 'before',
     format 'test_copy_format',
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
index a2219afcde..5e1b40e881 100644
--- a/src/test/modules/test_copy_format/test_copy_format.c
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -18,6 +18,50 @@
 
 PG_MODULE_MAGIC;
 
+static bool
+CopyFromProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    ereport(NOTICE,
+            (errmsg("CopyFromProcessOption: \"%s\"=\"%s\"",
+                    defel->defname, defGetString(defel))));
+    return true;
+}
+
+static int16
+CopyFromGetFormat(CopyFromState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyFromGetFormat")));
+    return 0;
+}
+
+static void
+CopyFromStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts)));
+}
+
+static bool
+CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    ereport(NOTICE, (errmsg("CopyFromOneRow")));
+    return false;
+}
+
+static void
+CopyFromEnd(CopyFromState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyFromEnd")));
+}
+
+static const CopyFromRoutine CopyFromRoutineTestCopyFormat = {
+    .type = T_CopyFromRoutine,
+    .CopyFromProcessOption = CopyFromProcessOption,
+    .CopyFromGetFormat = CopyFromGetFormat,
+    .CopyFromStart = CopyFromStart,
+    .CopyFromOneRow = CopyFromOneRow,
+    .CopyFromEnd = CopyFromEnd,
+};
+
 static bool
 CopyToProcessOption(CopyToState cstate, DefElem *defel)
 {
@@ -71,7 +115,7 @@ test_copy_format(PG_FUNCTION_ARGS)
             (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
 
     if (is_from)
-        elog(ERROR, "COPY FROM isn't supported yet");
-
-    PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+        PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat);
+    else
+        PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
 }
-- 
2.43.0

From dc3c21e725849d1d0c163677d08d527a8bf3bc37 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Wed, 24 Jan 2024 14:16:29 +0900
Subject: [PATCH v6 7/8] Export CopyFromStateData

It's for custom COPY FROM format handlers implemented as extension.

This just moves codes. This doesn't change codes except CopySource
enum values. CopySource enum values changes aren't required but I did
like I did for CopyDest enum values. I changed COPY_ prefix to
COPY_SOURCE_ prefix. For example, COPY_FILE to COPY_SOURCE_FILE.

Note that this change isn't enough to implement a custom COPY FROM
format handler as extension. We'll do the followings in a subsequent
commit:

1. Add an opaque space for custom COPY FROM format handler
2. Export CopyReadBinaryData() to read the next data
---
 src/backend/commands/copyfrom.c          |   4 +-
 src/backend/commands/copyfromparse.c     |  10 +-
 src/include/commands/copy.h              |   2 -
 src/include/commands/copyapi.h           | 156 ++++++++++++++++++++++-
 src/include/commands/copyfrom_internal.h | 150 ----------------------
 5 files changed, 162 insertions(+), 160 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index de85e4e9f1..b5f1771ac2 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1705,7 +1705,7 @@ BeginCopyFrom(ParseState *pstate,
                             pg_encoding_to_char(GetDatabaseEncoding()))));
     }
 
-    cstate->copy_src = COPY_FILE;    /* default */
+    cstate->copy_src = COPY_SOURCE_FILE;    /* default */
 
     cstate->whereClause = whereClause;
 
@@ -1824,7 +1824,7 @@ BeginCopyFrom(ParseState *pstate,
     if (data_source_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_src = COPY_CALLBACK;
+        cstate->copy_src = COPY_SOURCE_CALLBACK;
         cstate->data_source_cb = data_source_cb;
     }
     else if (pipe)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 49632f75e4..a78a790060 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -181,7 +181,7 @@ ReceiveCopyBegin(CopyFromState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_src = COPY_FRONTEND;
+    cstate->copy_src = COPY_SOURCE_FRONTEND;
     cstate->fe_msgbuf = makeStringInfo();
     /* We *must* flush here to ensure FE knows it can send. */
     pq_flush();
@@ -249,7 +249,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
 
     switch (cstate->copy_src)
     {
-        case COPY_FILE:
+        case COPY_SOURCE_FILE:
             bytesread = fread(databuf, 1, maxread, cstate->copy_file);
             if (ferror(cstate->copy_file))
                 ereport(ERROR,
@@ -258,7 +258,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
             if (bytesread == 0)
                 cstate->raw_reached_eof = true;
             break;
-        case COPY_FRONTEND:
+        case COPY_SOURCE_FRONTEND:
             while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof)
             {
                 int            avail;
@@ -341,7 +341,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
                 bytesread += avail;
             }
             break;
-        case COPY_CALLBACK:
+        case COPY_SOURCE_CALLBACK:
             bytesread = cstate->data_source_cb(databuf, minread, maxread);
             break;
     }
@@ -1099,7 +1099,7 @@ CopyReadLine(CopyFromState cstate)
          * after \. up to the protocol end of copy data.  (XXX maybe better
          * not to treat \. as special?)
          */
-        if (cstate->copy_src == COPY_FRONTEND)
+        if (cstate->copy_src == COPY_SOURCE_FRONTEND)
         {
             int            inbytes;
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index df29d42555..cd41d32074 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -20,8 +20,6 @@
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
-
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index ef1bb201c2..b7e8f627bf 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -14,11 +14,12 @@
 #ifndef COPYAPI_H
 #define COPYAPI_H
 
+#include "commands/trigger.h"
 #include "executor/execdesc.h"
 #include "executor/tuptable.h"
+#include "nodes/miscnodes.h"
 #include "nodes/parsenodes.h"
 
-/* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
 
 typedef bool (*CopyFromProcessOption_function) (CopyFromState cstate, DefElem *defel);
@@ -162,6 +163,159 @@ typedef struct CopyFormatOptions
     CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
+
+/*
+ * Represents the different source cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopySource
+{
+    COPY_SOURCE_FILE,            /* from file (or a piped program) */
+    COPY_SOURCE_FRONTEND,        /* from frontend */
+    COPY_SOURCE_CALLBACK,        /* from callback function */
+} CopySource;
+
+/*
+ *    Represents the end-of-line terminator type of the input
+ */
+typedef enum EolType
+{
+    EOL_UNKNOWN,
+    EOL_NL,
+    EOL_CR,
+    EOL_CRNL,
+} EolType;
+
+typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+
+/*
+ * This struct contains all the state variables used throughout a COPY FROM
+ * operation.
+ */
+typedef struct CopyFromStateData
+{
+    /* low-level state data */
+    CopySource    copy_src;        /* type of copy source */
+    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
+
+    EolType        eol_type;        /* EOL type of input */
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    Oid            conversion_proc;    /* encoding conversion function */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDIN */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_source_cb data_source_cb; /* function for reading data */
+
+    CopyFormatOptions opts;
+    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /* these are just for error messages, see CopyFromErrorCallback */
+    const char *cur_relname;    /* table name for error messages */
+    uint64        cur_lineno;        /* line number for error messages */
+    const char *cur_attname;    /* current att for error messages */
+    const char *cur_attval;        /* current att value for error messages */
+    bool        relname_only;    /* don't output line number, att, etc. */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    AttrNumber    num_defaults;    /* count of att that are missing and have
+                                 * default value */
+    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
+    Oid           *typioparams;    /* array of element types for in_functions */
+    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
+                                     * execution */
+    uint64        num_errors;        /* total number of rows which contained soft
+                                 * errors */
+    int           *defmap;            /* array of default att numbers related to
+                                 * missing att */
+    ExprState **defexprs;        /* array of default att expressions for all
+                                 * att */
+    bool       *defaults;        /* if DEFAULT marker was found for
+                                 * corresponding att */
+    bool        volatile_defexprs;    /* is any of defexprs volatile? */
+    List       *range_table;    /* single element list of RangeTblEntry */
+    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
+    ExprState  *qualexpr;
+
+    TransitionCaptureState *transition_capture;
+
+    /*
+     * These variables are used to reduce overhead in COPY FROM.
+     *
+     * attribute_buf holds the separated, de-escaped text for each field of
+     * the current line.  The CopyReadAttributes functions return arrays of
+     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
+     * the buffer on each cycle.
+     *
+     * In binary COPY FROM, attribute_buf holds the binary data for the
+     * current field, but the usage is otherwise similar.
+     */
+    StringInfoData attribute_buf;
+
+    /* field raw data pointers found by COPY FROM */
+
+    int            max_fields;
+    char      **raw_fields;
+
+    /*
+     * Similarly, line_buf holds the whole input line being processed. The
+     * input cycle is first to read the whole line into line_buf, and then
+     * extract the individual attribute fields into attribute_buf.  line_buf
+     * is preserved unmodified so that we can display it in error messages if
+     * appropriate.  (In binary mode, line_buf is not used.)
+     */
+    StringInfoData line_buf;
+    bool        line_buf_valid; /* contains the row being processed? */
+
+    /*
+     * input_buf holds input data, already converted to database encoding.
+     *
+     * In text mode, CopyReadLine parses this data sufficiently to locate line
+     * boundaries, then transfers the data to line_buf. We guarantee that
+     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
+     * mode, input_buf is not used.)
+     *
+     * If encoding conversion is not required, input_buf is not a separate
+     * buffer but points directly to raw_buf.  In that case, input_buf_len
+     * tracks the number of bytes that have been verified as valid in the
+     * database encoding, and raw_buf_len is the total number of bytes stored
+     * in the buffer.
+     */
+#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
+    char       *input_buf;
+    int            input_buf_index;    /* next byte to process */
+    int            input_buf_len;    /* total # of bytes stored */
+    bool        input_reached_eof;    /* true if we reached EOF */
+    bool        input_reached_error;    /* true if a conversion error happened */
+    /* Shorthand for number of unconsumed bytes available in input_buf */
+#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
+
+    /*
+     * raw_buf holds raw input data read from the data source (file or client
+     * connection), not yet converted to the database encoding.  Like with
+     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
+     */
+#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
+    char       *raw_buf;
+    int            raw_buf_index;    /* next byte to process */
+    int            raw_buf_len;    /* total # of bytes stored */
+    bool        raw_reached_eof;    /* true if we reached EOF */
+
+    /* Shorthand for number of unconsumed bytes available in raw_buf */
+#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
+
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyFromStateData;
+
 /*
  * Represents the different dest cases we need to worry about at
  * the bottom level
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 921c1513f7..f8f6120255 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -18,28 +18,6 @@
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
-/*
- * Represents the different source cases we need to worry about at
- * the bottom level
- */
-typedef enum CopySource
-{
-    COPY_FILE,                    /* from file (or a piped program) */
-    COPY_FRONTEND,                /* from frontend */
-    COPY_CALLBACK,                /* from callback function */
-} CopySource;
-
-/*
- *    Represents the end-of-line terminator type of the input
- */
-typedef enum EolType
-{
-    EOL_UNKNOWN,
-    EOL_NL,
-    EOL_CR,
-    EOL_CRNL,
-} EolType;
-
 /*
  * Represents the insert method to be used during COPY FROM.
  */
@@ -52,134 +30,6 @@ typedef enum CopyInsertMethod
                                  * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
-/*
- * This struct contains all the state variables used throughout a COPY FROM
- * operation.
- */
-typedef struct CopyFromStateData
-{
-    /* low-level state data */
-    CopySource    copy_src;        /* type of copy source */
-    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
-
-    EolType        eol_type;        /* EOL type of input */
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    Oid            conversion_proc;    /* encoding conversion function */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDIN */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_source_cb data_source_cb; /* function for reading data */
-
-    CopyFormatOptions opts;
-    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /* these are just for error messages, see CopyFromErrorCallback */
-    const char *cur_relname;    /* table name for error messages */
-    uint64        cur_lineno;        /* line number for error messages */
-    const char *cur_attname;    /* current att for error messages */
-    const char *cur_attval;        /* current att value for error messages */
-    bool        relname_only;    /* don't output line number, att, etc. */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    AttrNumber    num_defaults;    /* count of att that are missing and have
-                                 * default value */
-    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
-    Oid           *typioparams;    /* array of element types for in_functions */
-    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
-                                     * execution */
-    uint64        num_errors;        /* total number of rows which contained soft
-                                 * errors */
-    int           *defmap;            /* array of default att numbers related to
-                                 * missing att */
-    ExprState **defexprs;        /* array of default att expressions for all
-                                 * att */
-    bool       *defaults;        /* if DEFAULT marker was found for
-                                 * corresponding att */
-    bool        volatile_defexprs;    /* is any of defexprs volatile? */
-    List       *range_table;    /* single element list of RangeTblEntry */
-    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
-    ExprState  *qualexpr;
-
-    TransitionCaptureState *transition_capture;
-
-    /*
-     * These variables are used to reduce overhead in COPY FROM.
-     *
-     * attribute_buf holds the separated, de-escaped text for each field of
-     * the current line.  The CopyReadAttributes functions return arrays of
-     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
-     * the buffer on each cycle.
-     *
-     * In binary COPY FROM, attribute_buf holds the binary data for the
-     * current field, but the usage is otherwise similar.
-     */
-    StringInfoData attribute_buf;
-
-    /* field raw data pointers found by COPY FROM */
-
-    int            max_fields;
-    char      **raw_fields;
-
-    /*
-     * Similarly, line_buf holds the whole input line being processed. The
-     * input cycle is first to read the whole line into line_buf, and then
-     * extract the individual attribute fields into attribute_buf.  line_buf
-     * is preserved unmodified so that we can display it in error messages if
-     * appropriate.  (In binary mode, line_buf is not used.)
-     */
-    StringInfoData line_buf;
-    bool        line_buf_valid; /* contains the row being processed? */
-
-    /*
-     * input_buf holds input data, already converted to database encoding.
-     *
-     * In text mode, CopyReadLine parses this data sufficiently to locate line
-     * boundaries, then transfers the data to line_buf. We guarantee that
-     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
-     * mode, input_buf is not used.)
-     *
-     * If encoding conversion is not required, input_buf is not a separate
-     * buffer but points directly to raw_buf.  In that case, input_buf_len
-     * tracks the number of bytes that have been verified as valid in the
-     * database encoding, and raw_buf_len is the total number of bytes stored
-     * in the buffer.
-     */
-#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
-    char       *input_buf;
-    int            input_buf_index;    /* next byte to process */
-    int            input_buf_len;    /* total # of bytes stored */
-    bool        input_reached_eof;    /* true if we reached EOF */
-    bool        input_reached_error;    /* true if a conversion error happened */
-    /* Shorthand for number of unconsumed bytes available in input_buf */
-#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
-
-    /*
-     * raw_buf holds raw input data read from the data source (file or client
-     * connection), not yet converted to the database encoding.  Like with
-     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
-     */
-#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
-    char       *raw_buf;
-    int            raw_buf_index;    /* next byte to process */
-    int            raw_buf_len;    /* total # of bytes stored */
-    bool        raw_reached_eof;    /* true if we reached EOF */
-
-    /* Shorthand for number of unconsumed bytes available in raw_buf */
-#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
-
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyFromStateData;
-
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
-- 
2.43.0

From f3cebe8b095f25c5c9bbeb5915c3a4233c45796c Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Wed, 24 Jan 2024 14:19:08 +0900
Subject: [PATCH v6 8/8] Add support for implementing custom COPY FROM format
 as extension

* Add CopyFromStateData::opaque that can be used to keep data for
  custom COPY From format implementation
* Export CopyReadBinaryData() to read the next data
* Rename CopyReadBinaryData() to CopyFromStateRead() because it's a
  method for CopyFromState and "BinaryData" is redundant.
---
 src/backend/commands/copyfromparse.c | 21 ++++++++++-----------
 src/include/commands/copyapi.h       |  5 +++++
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index a78a790060..f8a194635d 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -165,7 +165,6 @@ static int    CopyGetData(CopyFromState cstate, void *databuf,
 static inline bool CopyGetInt32(CopyFromState cstate, int32 *val);
 static inline bool CopyGetInt16(CopyFromState cstate, int16 *val);
 static void CopyLoadInputBuf(CopyFromState cstate);
-static int    CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
 
 void
 ReceiveCopyBegin(CopyFromState cstate)
@@ -194,7 +193,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate)
     int32        tmp;
 
     /* Signature */
-    if (CopyReadBinaryData(cstate, readSig, 11) != 11 ||
+    if (CopyFromStateRead(cstate, readSig, 11) != 11 ||
         memcmp(readSig, BinarySignature, 11) != 0)
         ereport(ERROR,
                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
@@ -222,7 +221,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate)
     /* Skip extension header, if present */
     while (tmp-- > 0)
     {
-        if (CopyReadBinaryData(cstate, readSig, 1) != 1)
+        if (CopyFromStateRead(cstate, readSig, 1) != 1)
             ereport(ERROR,
                     (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                      errmsg("invalid COPY file header (wrong length)")));
@@ -364,7 +363,7 @@ CopyGetInt32(CopyFromState cstate, int32 *val)
 {
     uint32        buf;
 
-    if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
+    if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
     {
         *val = 0;                /* suppress compiler warning */
         return false;
@@ -381,7 +380,7 @@ CopyGetInt16(CopyFromState cstate, int16 *val)
 {
     uint16        buf;
 
-    if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
+    if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
     {
         *val = 0;                /* suppress compiler warning */
         return false;
@@ -692,14 +691,14 @@ CopyLoadInputBuf(CopyFromState cstate)
 }
 
 /*
- * CopyReadBinaryData
+ * CopyFromStateRead
  *
  * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf
  * and writes them to 'dest'.  Returns the number of bytes read (which
  * would be less than 'nbytes' only if we reach EOF).
  */
-static int
-CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
+int
+CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes)
 {
     int            copied_bytes = 0;
 
@@ -988,7 +987,7 @@ CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
          */
         char        dummy;
 
-        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+        if (CopyFromStateRead(cstate, &dummy, 1) > 0)
             ereport(ERROR,
                     (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                      errmsg("received copy data after EOF marker")));
@@ -1997,8 +1996,8 @@ CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
     resetStringInfo(&cstate->attribute_buf);
 
     enlargeStringInfo(&cstate->attribute_buf, fld_size);
-    if (CopyReadBinaryData(cstate, cstate->attribute_buf.data,
-                           fld_size) != fld_size)
+    if (CopyFromStateRead(cstate, cstate->attribute_buf.data,
+                          fld_size) != fld_size)
         ereport(ERROR,
                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                  errmsg("unexpected EOF in COPY data")));
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index b7e8f627bf..22accc83ab 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -314,8 +314,13 @@ typedef struct CopyFromStateData
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyFromStateData;
 
+extern int    CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes);
+
 /*
  * Represents the different dest cases we need to worry about at
  * the bottom level
-- 
2.43.0


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Amit Kapila
Дата:
Сообщение: Re: Synchronizing slots from primary to standby
Следующее
От: Masahiko Sawada
Дата:
Сообщение: Re: Synchronizing slots from primary to standby