Re: Make COPY format extendable: Extract COPY TO format implementations

Поиск
Список
Период
Сортировка
От Sutou Kouhei
Тема Re: Make COPY format extendable: Extract COPY TO format implementations
Дата
Msg-id 20240129.184523.703743711014180020.kou@clear-code.com
обсуждение исходный текст
Ответ на Re: Make COPY format extendable: Extract COPY TO format implementations  (Junwang Zhao <zhjwpku@gmail.com>)
Ответы Re: Make COPY format extendable: Extract COPY TO format implementations  (Masahiko Sawada <sawada.mshk@gmail.com>)
Список pgsql-hackers
Hi,

In <CAEG8a3Jnmbjw82OiSvRK3v9XN2zSshsB8ew1mZCQDAkKq6r9YQ@mail.gmail.com>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Mon, 29 Jan 2024 11:37:07 +0800,
  Junwang Zhao <zhjwpku@gmail.com> wrote:

>> > > Does it make sense to pass only non-builtin options to the custom
>> > > format callback after parsing and evaluating the builtin options? That
>> > > is, we parse and evaluate only the builtin options and populate
>> > > opts_out first, then pass each rest option to the custom format
>> > > handler callback. The callback can refer to the builtin option values.
>>
>> What I imagined is that while parsing the all specified options, we
>> evaluate builtin options and we add non-builtin options to another
>> list. Then when parsing a non-builtin option, we check if this option
>> already exists in the list. If there is, we raise the "option %s not
>> recognized" error.". Once we complete checking all options, we pass
>> each option in the list to the callback.

I implemented this idea and the following ideas:

1. Add init callback for initialization
2. Change GetFormat() to FillCopyXXXResponse()
   because JSON format always use 1 column
3. FROM only: Eliminate more cstate->opts.csv_mode branches
   (This is for performance.)

See the attached v9 patch set for details. Changes since v7:

0001:

* Move CopyToProcessOption() calls to the end of
  ProcessCopyOptions() for easy to option validation
* Add CopyToState::CopyToInit() and call it in
  ProcessCopyOptionFormatTo()
* Change CopyToState::CopyToGetFormat() to
  CopyToState::CopyToFillCopyOutResponse() and use it in
  SendCopyBegin()

0002:

* Move CopyFromProcessOption() calls to the end of
  ProcessCopyOptions() for easy to option validation
* Add CopyFromState::CopyFromInit() and call it in
  ProcessCopyOptionFormatFrom()
* Change CopyFromState::CopyFromGetFormat() to
  CopyFromState::CopyFromFillCopyOutResponse() and use it in
  ReceiveCopyBegin()
* Rename NextCopyFromRawFields() to
  NextCopyFromRawFieldsInternal() and pass the read
  attributes callback explicitly to eliminate more
  cstate->opts.csv_mode branches


Thanks,
-- 
kou
From c136833f4a385574474b246a381014abeb631377 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Fri, 26 Jan 2024 16:46:51 +0900
Subject: [PATCH v9 1/2] Extract COPY TO format implementations

This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
  https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
  https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com

This doesn't change the current behavior. This just introduces
CopyToRoutine, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyToRoutine can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.

Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:

https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us

> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.

You can see that there is no significant loss of performance:

Data: Random 32 bit integers:

    CREATE TABLE data (int32 integer);
    SELECT setseed(0.29);
    INSERT INTO data
      SELECT random() * 10000
        FROM generate_series(1, ${n_records});

The number of records: 100K, 1M and 10M

100K without this change:

    format,elapsed time (ms)
    text,10.561
    csv,10.868
    binary,10.287

100K with this change:

    format,elapsed time (ms)
    text,9.962
    csv,10.453
    binary,9.473

1M without this change:

    format,elapsed time (ms)
    text,103.265
    csv,109.789
    binary,104.078

1M with this change:

    format,elapsed time (ms)
    text,98.612
    csv,101.908
    binary,94.456

10M without this change:

    format,elapsed time (ms)
    text,1060.614
    csv,1065.272
    binary,1025.875

10M with this change:

    format,elapsed time (ms)
    text,1020.050
    csv,1031.279
    binary,954.792
---
 contrib/file_fdw/file_fdw.c     |   2 +-
 src/backend/commands/copy.c     |  82 ++++-
 src/backend/commands/copyfrom.c |   2 +-
 src/backend/commands/copyto.c   | 587 +++++++++++++++++++++++---------
 src/include/commands/copy.h     |   8 +-
 src/include/commands/copyapi.h  |  62 ++++
 6 files changed, 560 insertions(+), 183 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 249d82d3a0..9e4e819858 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -329,7 +329,7 @@ file_fdw_validator(PG_FUNCTION_ARGS)
     /*
      * Now apply the core COPY code's validation logic for more checks.
      */
-    ProcessCopyOptions(NULL, NULL, true, other_options);
+    ProcessCopyOptions(NULL, NULL, true, NULL, other_options);
 
     /*
      * Either filename or program option is required for file_fdw foreign
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cc0786c6f4..dd0fe7f0bb 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -442,6 +442,9 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
  * a list of options.  In that usage, 'opts_out' can be passed as NULL and
  * the collected data is just leaked until CurrentMemoryContext is reset.
  *
+ * 'cstate' is CopyToState* for !is_from, CopyFromState* for is_from. 'cstate'
+ * may be NULL. For example, file_fdw uses NULL.
+ *
  * Note that additional checking, such as whether column names listed in FORCE
  * QUOTE actually exist, has to be applied later.  This just checks for
  * self-consistency of the options list.
@@ -450,6 +453,7 @@ void
 ProcessCopyOptions(ParseState *pstate,
                    CopyFormatOptions *opts_out,
                    bool is_from,
+                   void *cstate,
                    List *options)
 {
     bool        format_specified = false;
@@ -457,6 +461,7 @@ ProcessCopyOptions(ParseState *pstate,
     bool        header_specified = false;
     bool        on_error_specified = false;
     ListCell   *option;
+    List       *unknown_options = NIL;
 
     /* Support external use for option sanity checking */
     if (opts_out == NULL)
@@ -464,30 +469,58 @@ ProcessCopyOptions(ParseState *pstate,
 
     opts_out->file_encoding = -1;
 
-    /* Extract options from the statement node tree */
+    /*
+     * Extract only the "format" option to detect target routine as the first
+     * step
+     */
     foreach(option, options)
     {
         DefElem    *defel = lfirst_node(DefElem, option);
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-                opts_out->csv_mode = true;
-            else if (strcmp(fmt, "binary") == 0)
-                opts_out->binary = true;
+
+            if (is_from)
+            {
+                char       *fmt = defGetString(defel);
+
+                if (strcmp(fmt, "text") == 0)
+                     /* default format */ ;
+                else if (strcmp(fmt, "csv") == 0)
+                {
+                    opts_out->csv_mode = true;
+                }
+                else if (strcmp(fmt, "binary") == 0)
+                {
+                    opts_out->binary = true;
+                }
+                else
+                    ereport(ERROR,
+                            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                             errmsg("COPY format \"%s\" not recognized", fmt),
+                             parser_errposition(pstate, defel->location)));
+            }
             else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+                ProcessCopyOptionFormatTo(pstate, opts_out, cstate, defel);
         }
+    }
+    if (!format_specified)
+        /* Set the default format. */
+        ProcessCopyOptionFormatTo(pstate, opts_out, cstate, NULL);
+
+    /*
+     * Extract options except "format" from the statement node tree. Unknown
+     * options are processed later.
+     */
+    foreach(option, options)
+    {
+        DefElem    *defel = lfirst_node(DefElem, option);
+
+        if (strcmp(defel->defname, "format") == 0)
+            continue;
         else if (strcmp(defel->defname, "freeze") == 0)
         {
             if (freeze_specified)
@@ -616,11 +649,7 @@ ProcessCopyOptions(ParseState *pstate,
             opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from);
         }
         else
-            ereport(ERROR,
-                    (errcode(ERRCODE_SYNTAX_ERROR),
-                     errmsg("option \"%s\" not recognized",
-                            defel->defname),
-                     parser_errposition(pstate, defel->location)));
+            unknown_options = lappend(unknown_options, defel);
     }
 
     /*
@@ -821,6 +850,23 @@ ProcessCopyOptions(ParseState *pstate,
                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                      errmsg("NULL specification and DEFAULT specification cannot be the same")));
     }
+
+    /* Process not built-in options. */
+    foreach(option, unknown_options)
+    {
+        DefElem    *defel = lfirst_node(DefElem, option);
+        bool        processed = false;
+
+        if (!is_from)
+            processed = opts_out->to_routine->CopyToProcessOption(cstate, defel);
+        if (!processed)
+            ereport(ERROR,
+                    (errcode(ERRCODE_SYNTAX_ERROR),
+                     errmsg("option \"%s\" not recognized",
+                            defel->defname),
+                     parser_errposition(pstate, defel->location)));
+    }
+    list_free(unknown_options);
 }
 
 /*
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1fe70b9133..fb3d4d9296 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1416,7 +1416,7 @@ BeginCopyFrom(ParseState *pstate,
     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
     /* Extract options from the statement node tree */
-    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
+    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , cstate, options);
 
     /* Process the target relation */
     cstate->rel = rel;
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index d3dc3fc854..4fb41f04fc 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/defrem.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -131,6 +132,427 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToRoutine implementations.
+ */
+
+/*
+ * CopyToRoutine implementation for "text" and "csv". CopyToTextBased*() are
+ * shared by both of "text" and "csv". CopyToText*() are only for "text" and
+ * CopyToCSV*() are only for "csv".
+ *
+ * We can use the same functions for all callbacks by referring
+ * cstate->opts.csv_mode but splitting callbacks to eliminate "if
+ * (cstate->opts.csv_mode)" branches from all callbacks has performance
+ * merit when many tuples are copied. So we use separated callbacks for "text"
+ * and "csv".
+ */
+
+static void
+CopyToTextBasedInit(CopyToState cstate)
+{
+}
+
+/*
+ * All "text" and "csv" options are parsed in ProcessCopyOptions(). We may
+ * move the code to here later.
+ */
+static bool
+CopyToTextBasedProcessOption(CopyToState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static void
+CopyToTextBasedFillCopyOutResponse(CopyToState cstate, StringInfoData *buf)
+{
+    int16        format = 0;
+    int            natts = list_length(cstate->attnumlist);
+    int            i;
+
+    pq_sendbyte(buf, format);    /* overall format */
+    pq_sendint16(buf, natts);
+    for (i = 0; i < natts; i++)
+        pq_sendint16(buf, format);    /* per-column formats */
+}
+
+static void
+CopyToTextBasedSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+    CopySendEndOfRow(cstate);
+}
+
+typedef void (*CopyAttributeOutHeaderFunction) (CopyToState cstate, char *string);
+
+/*
+ * We can use CopyAttributeOutText() directly but define this for consistency
+ * with CopyAttributeOutCSVHeader(). "static inline" will prevent performance
+ * penalty by this wrapping.
+ */
+static inline void
+CopyAttributeOutTextHeader(CopyToState cstate, char *string)
+{
+    CopyAttributeOutText(cstate, string);
+}
+
+static inline void
+CopyAttributeOutCSVHeader(CopyToState cstate, char *string)
+{
+    CopyAttributeOutCSV(cstate, string, false,
+                        list_length(cstate->attnumlist) == 1);
+}
+
+/*
+ * We don't use this function as a callback directly. We define
+ * CopyToTextStart() and CopyToCSVStart() and use them instead. It's for
+ * eliminating a "if (cstate->opts.csv_mode)" branch. This callback is called
+ * only once per COPY TO. So this optimization may be meaningless but done for
+ * consistency with CopyToTextBasedOneRow().
+ *
+ * This must initialize cstate->out_functions for CopyToTextBasedOneRow().
+ */
+static inline void
+CopyToTextBasedStart(CopyToState cstate, TupleDesc tupDesc, CopyAttributeOutHeaderFunction out)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            out(cstate, colname);
+        }
+
+        CopyToTextBasedSendEndOfRow(cstate);
+    }
+}
+
+static void
+CopyToTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    CopyToTextBasedStart(cstate, tupDesc, CopyAttributeOutTextHeader);
+}
+
+static void
+CopyToCSVStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    CopyToTextBasedStart(cstate, tupDesc, CopyAttributeOutCSVHeader);
+}
+
+typedef void (*CopyAttributeOutValueFunction) (CopyToState cstate, char *string, int attnum);
+
+static inline void
+CopyAttributeOutTextValue(CopyToState cstate, char *string, int attnum)
+{
+    CopyAttributeOutText(cstate, string);
+}
+
+static inline void
+CopyAttributeOutCSVValue(CopyToState cstate, char *string, int attnum)
+{
+    CopyAttributeOutCSV(cstate, string,
+                        cstate->opts.force_quote_flags[attnum - 1],
+                        list_length(cstate->attnumlist) == 1);
+}
+
+/*
+ * We don't use this function as a callback directly. We define
+ * CopyToTextOneRow() and CopyToCSVOneRow() and use them instead. It's for
+ * eliminating a "if (cstate->opts.csv_mode)" branch. This callback is called
+ * per tuple. So this optimization will be valuable when many tuples are
+ * copied.
+ *
+ * cstate->out_functions must be initialized in CopyToTextBasedStart().
+ */
+static void
+CopyToTextBasedOneRow(CopyToState cstate, TupleTableSlot *slot, CopyAttributeOutValueFunction out)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1], value);
+            out(cstate, string, attnum);
+        }
+    }
+
+    CopyToTextBasedSendEndOfRow(cstate);
+}
+
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextBasedOneRow(cstate, slot, CopyAttributeOutTextValue);
+}
+
+static void
+CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextBasedOneRow(cstate, slot, CopyAttributeOutCSVValue);
+}
+
+static void
+CopyToTextBasedEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToRoutine implementation for "binary".
+ */
+
+static void
+CopyToBinaryInit(CopyToState cstate)
+{
+}
+
+/*
+ * All "binary" options are parsed in ProcessCopyOptions(). We may move the
+ * code to here later.
+ */
+static bool
+CopyToBinaryProcessOption(CopyToState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static void
+CopyToBinaryFillCopyOutResponse(CopyToState cstate, StringInfoData *buf)
+{
+    int16        format = 1;
+    int            natts = list_length(cstate->attnumlist);
+    int            i;
+
+    pq_sendbyte(buf, format);    /* overall format */
+    pq_sendint16(buf, natts);
+    for (i = 0; i < natts; i++)
+        pq_sendint16(buf, format);    /* per-column formats */
+}
+
+/*
+ * This must initialize cstate->out_functions for CopyToBinaryOneRow().
+ */
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    {
+        /* Generate header for a binary copy */
+        int32        tmp;
+
+        /* Signature */
+        CopySendData(cstate, BinarySignature, 11);
+        /* Flags field */
+        tmp = 0;
+        CopySendInt32(cstate, tmp);
+        /* No header extension */
+        tmp = 0;
+        CopySendInt32(cstate, tmp);
+    }
+}
+
+/*
+ * cstate->out_functions must be initialized in CopyToBinaryStart().
+ */
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextBased*() are shared with "csv". CopyToText*() are only for "text".
+ */
+static const CopyToRoutine CopyToRoutineText = {
+    .CopyToInit = CopyToTextBasedInit,
+    .CopyToProcessOption = CopyToTextBasedProcessOption,
+    .CopyToFillCopyOutResponse = CopyToTextBasedFillCopyOutResponse,
+    .CopyToStart = CopyToTextStart,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextBasedEnd,
+};
+
+/*
+ * CopyToTextBased*() are shared with "text". CopyToCSV*() are only for "csv".
+ */
+static const CopyToRoutine CopyToRoutineCSV = {
+    .CopyToInit = CopyToTextBasedInit,
+    .CopyToProcessOption = CopyToTextBasedProcessOption,
+    .CopyToFillCopyOutResponse = CopyToTextBasedFillCopyOutResponse,
+    .CopyToStart = CopyToCSVStart,
+    .CopyToOneRow = CopyToCSVOneRow,
+    .CopyToEnd = CopyToTextBasedEnd,
+};
+
+static const CopyToRoutine CopyToRoutineBinary = {
+    .CopyToInit = CopyToBinaryInit,
+    .CopyToProcessOption = CopyToBinaryProcessOption,
+    .CopyToFillCopyOutResponse = CopyToBinaryFillCopyOutResponse,
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
+
+/*
+ * Process the "format" option for COPY TO.
+ *
+ * If defel is NULL, the default format "text" is used.
+ */
+void
+ProcessCopyOptionFormatTo(ParseState *pstate,
+                          CopyFormatOptions *opts_out,
+                          CopyToState cstate,
+                          DefElem *defel)
+{
+    char       *format;
+
+    if (defel)
+        format = defGetString(defel);
+    else
+        format = "text";
+
+    if (strcmp(format, "text") == 0)
+        opts_out->to_routine = &CopyToRoutineText;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        opts_out->to_routine = &CopyToRoutineCSV;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        opts_out->to_routine = &CopyToRoutineBinary;
+    }
+    else
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    opts_out->to_routine->CopyToInit(cstate);
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -140,15 +562,9 @@ static void
 SendCopyBegin(CopyToState cstate)
 {
     StringInfoData buf;
-    int            natts = list_length(cstate->attnumlist);
-    int16        format = (cstate->opts.binary ? 1 : 0);
-    int            i;
 
     pq_beginmessage(&buf, PqMsg_CopyOutResponse);
-    pq_sendbyte(&buf, format);    /* overall format */
-    pq_sendint16(&buf, natts);
-    for (i = 0; i < natts; i++)
-        pq_sendint16(&buf, format); /* per-column formats */
+    cstate->opts.to_routine->CopyToFillCopyOutResponse(cstate, &buf);
     pq_endmessage(&buf);
     cstate->copy_dest = COPY_FRONTEND;
 }
@@ -198,16 +614,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -242,10 +648,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -431,7 +833,7 @@ BeginCopyTo(ParseState *pstate,
     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
     /* Extract options from the statement node tree */
-    ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
+    ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , cstate, options);
 
     /* Process the source/target relation or query */
     if (rel)
@@ -748,8 +1150,6 @@ DoCopyTo(CopyToState cstate)
     bool        pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
     bool        fe_copy = (pipe && whereToSendOutput == DestRemote);
     TupleDesc    tupDesc;
-    int            num_phys_attrs;
-    ListCell   *cur;
     uint64        processed;
 
     if (fe_copy)
@@ -759,32 +1159,11 @@ DoCopyTo(CopyToState cstate)
         tupDesc = RelationGetDescr(cstate->rel);
     else
         tupDesc = cstate->queryDesc->tupDesc;
-    num_phys_attrs = tupDesc->natts;
     cstate->opts.null_print_client = cstate->opts.null_print;    /* default */
 
     /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
     cstate->fe_msgbuf = makeStringInfo();
 
-    /* Get info about the columns we need to process. */
-    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
-        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
-        if (cstate->opts.binary)
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-        else
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-    }
-
     /*
      * Create a temporary memory context that we can reset once per row to
      * recover palloc'd memory.  This avoids any problems with leaks inside
@@ -795,57 +1174,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false,
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->opts.to_routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -884,13 +1213,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
+    cstate->opts.to_routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -906,71 +1229,15 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    bool        need_delim = false;
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
-    ListCell   *cur;
-    char       *string;
 
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Datum        value = slot->tts_values[attnum - 1];
-        bool        isnull = slot->tts_isnull[attnum - 1];
-
-        if (!cstate->opts.binary)
-        {
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-        }
-
-        if (isnull)
-        {
-            if (!cstate->opts.binary)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-                CopySendInt32(cstate, -1);
-        }
-        else
-        {
-            if (!cstate->opts.binary)
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1],
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-            else
-            {
-                bytea       *outputbytes;
-
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->opts.to_routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b3da3cb0be..de316cfd81 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,6 +14,7 @@
 #ifndef COPY_H
 #define COPY_H
 
+#include "commands/copyapi.h"
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
@@ -74,11 +75,11 @@ typedef struct CopyFormatOptions
     bool        convert_selectively;    /* do selective binary conversion? */
     CopyOnErrorChoice on_error; /* what to do when error happened */
     List       *convert_select; /* list of column names (can be NIL) */
+    const        CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
+/* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
@@ -87,7 +88,8 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
 
-extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options);
+extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List
*options);
+extern void ProcessCopyOptionFormatTo(ParseState *pstate, CopyFormatOptions *opts_out, CopyToState cstate, DefElem
*defel);
 extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause,
                                    const char *filename,
                                    bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List
*options);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 0000000000..f8901cac51
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,62 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO/FROM handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/parsenodes.h"
+
+/* This is private in commands/copyto.c */
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called when this CopyToRoutine is chosen. This can be used for
+     * initialization.
+     */
+    void        (*CopyToInit) (CopyToState cstate);
+
+    /*
+     * Called for processing one COPY TO option. This will return false when
+     * the given option is invalid.
+     */
+    bool        (*CopyToProcessOption) (CopyToState cstate, DefElem *defel);
+
+    /*
+     * Called when COPY TO via the PostgreSQL protocol is started. This must
+     * fill buf as a valid CopyOutResponse message:
+     *
+     */
+    /*--
+     * +--------+--------+--------+--------+--------+   +--------+--------+
+     * | Format | N attributes    | Attr1's format  |...| AttrN's format  |
+     * +--------+--------+--------+--------+--------+   +--------+--------+
+     * 0: text                      0: text               0: text
+     * 1: binary                    1: binary             1: binary
+     */
+    void        (*CopyToFillCopyOutResponse) (CopyToState cstate, StringInfoData *buf);
+
+    /* Called when COPY TO is started. This will send a header. */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /* Copy one row for COPY TO. */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO is ended. This will send a trailer. */
+    void        (*CopyToEnd) (CopyToState cstate);
+}            CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
-- 
2.43.0

From 720cda9c40d4f2f9a6c0b2cf9be5f4526da818d1 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Fri, 26 Jan 2024 17:21:53 +0900
Subject: [PATCH v9 2/2] Extract COPY FROM format implementations

This doesn't change the current behavior. This just introduces
CopyFromRoutine, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyFromRoutine can't be used from extensions yet because
CopyRead*() aren't exported yet. Extensions can't read data from a
source without CopyRead*(). They will be exported by subsequent
patches.
---
 src/backend/commands/copy.c              |  31 +-
 src/backend/commands/copyfrom.c          | 300 +++++++++++++---
 src/backend/commands/copyfromparse.c     | 428 +++++++++++++----------
 src/include/commands/copy.h              |   6 +-
 src/include/commands/copyapi.h           |  46 +++
 src/include/commands/copyfrom_internal.h |   4 +
 6 files changed, 561 insertions(+), 254 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index dd0fe7f0bb..7aabed5614 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -484,32 +484,19 @@ ProcessCopyOptions(ParseState *pstate,
             format_specified = true;
 
             if (is_from)
-            {
-                char       *fmt = defGetString(defel);
-
-                if (strcmp(fmt, "text") == 0)
-                     /* default format */ ;
-                else if (strcmp(fmt, "csv") == 0)
-                {
-                    opts_out->csv_mode = true;
-                }
-                else if (strcmp(fmt, "binary") == 0)
-                {
-                    opts_out->binary = true;
-                }
-                else
-                    ereport(ERROR,
-                            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                             errmsg("COPY format \"%s\" not recognized", fmt),
-                             parser_errposition(pstate, defel->location)));
-            }
+                ProcessCopyOptionFormatFrom(pstate, opts_out, cstate, defel);
             else
                 ProcessCopyOptionFormatTo(pstate, opts_out, cstate, defel);
         }
     }
     if (!format_specified)
+    {
         /* Set the default format. */
-        ProcessCopyOptionFormatTo(pstate, opts_out, cstate, NULL);
+        if (is_from)
+            ProcessCopyOptionFormatFrom(pstate, opts_out, cstate, NULL);
+        else
+            ProcessCopyOptionFormatTo(pstate, opts_out, cstate, NULL);
+    }
 
     /*
      * Extract options except "format" from the statement node tree. Unknown
@@ -857,7 +844,9 @@ ProcessCopyOptions(ParseState *pstate,
         DefElem    *defel = lfirst_node(DefElem, option);
         bool        processed = false;
 
-        if (!is_from)
+        if (is_from)
+            processed = opts_out->from_routine->CopyFromProcessOption(cstate, defel);
+        else
             processed = opts_out->to_routine->CopyToProcessOption(cstate, defel);
         if (!processed)
             ereport(ERROR,
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index fb3d4d9296..338a885e2c 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -32,6 +32,7 @@
 #include "catalog/namespace.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/defrem.h"
 #include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
@@ -108,6 +109,253 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+
+/*
+ * CopyFromRoutine implementations.
+ */
+
+/*
+ * CopyFromRoutine implementation for "text" and "csv". CopyFromTextBased*()
+ * are shared by both of "text" and "csv". CopyFromText*() are only for "text"
+ * and CopyFromCSV*() are only for "csv".
+ *
+ * We can use the same functions for all callbacks by referring
+ * cstate->opts.csv_mode but splitting callbacks to eliminate "if
+ * (cstate->opts.csv_mode)" branches from all callbacks has performance merit
+ * when many tuples are copied. So we use separated callbacks for "text" and
+ * "csv".
+ */
+
+static void
+CopyFromTextBasedInit(CopyFromState cstate)
+{
+}
+
+/*
+ * All "text" and "csv" options are parsed in ProcessCopyOptions(). We may
+ * move the code to here later.
+ */
+static bool
+CopyFromTextBasedProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static void
+CopyFromTextBasedFillCopyInResponse(CopyFromState cstate, StringInfoData *buf)
+{
+    int16        format = 0;
+    int            natts = list_length(cstate->attnumlist);
+    int            i;
+
+    pq_sendbyte(buf, format);    /* overall format */
+    pq_sendint16(buf, natts);
+    for (i = 0; i < natts; i++)
+        pq_sendint16(buf, format);    /* per-column formats */
+}
+
+/*
+ * This must initialize cstate->in_functions for CopyFromTextBasedOneRow().
+ */
+static void
+CopyFromTextBasedStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    num_phys_attrs = tupDesc->natts;
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Pick up the required catalog information for each attribute in the
+     * relation, including the input function, the element type (to pass to
+     * the input function).
+     */
+    cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+    for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+    {
+        Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+        Oid            in_func_oid;
+
+        /* We don't need info for dropped attributes */
+        if (att->attisdropped)
+            continue;
+
+        /* Fetch the input function and typioparam info */
+        getTypeInputInfo(att->atttypid,
+                         &in_func_oid, &cstate->typioparams[attnum - 1]);
+        fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]);
+    }
+
+    /* create workspace for CopyReadAttributes results */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+static void
+CopyFromTextBasedEnd(CopyFromState cstate)
+{
+}
+
+/*
+ * CopyFromRoutine implementation for "binary".
+ */
+
+static void
+CopyFromBinaryInit(CopyFromState cstate)
+{
+}
+
+/*
+ * All "binary" options are parsed in ProcessCopyOptions(). We may move the
+ * code to here later.
+ */
+static bool
+CopyFromBinaryProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static void
+CopyFromBinaryFillCopyInResponse(CopyFromState cstate, StringInfoData *buf)
+{
+    int16        format = 1;
+    int            natts = list_length(cstate->attnumlist);
+    int            i;
+
+    pq_sendbyte(buf, format);    /* overall format */
+    pq_sendint16(buf, natts);
+    for (i = 0; i < natts; i++)
+        pq_sendint16(buf, format);    /* per-column formats */
+}
+
+/*
+ * This must initialize cstate->in_functions for CopyFromBinaryOneRow().
+ */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    num_phys_attrs = tupDesc->natts;
+
+    /*
+     * Pick up the required catalog information for each attribute in the
+     * relation, including the input function, the element type (to pass to
+     * the input function).
+     */
+    cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+    for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+    {
+        Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+        Oid            in_func_oid;
+
+        /* We don't need info for dropped attributes */
+        if (att->attisdropped)
+            continue;
+
+        /* Fetch the input function and typioparam info */
+        getTypeBinaryInputInfo(att->atttypid,
+                               &in_func_oid, &cstate->typioparams[attnum - 1]);
+        fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]);
+    }
+
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+}
+
+/*
+ * CopyFromTextBased*() are shared with "csv". CopyFromText*() are only for "text".
+ */
+static const CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromInit = CopyFromTextBasedInit,
+    .CopyFromProcessOption = CopyFromTextBasedProcessOption,
+    .CopyFromFillCopyInResponse = CopyFromTextBasedFillCopyInResponse,
+    .CopyFromStart = CopyFromTextBasedStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextBasedEnd,
+};
+
+/*
+ * CopyFromTextBased*() are shared with "text". CopyFromCSV*() are only for "csv".
+ */
+static const CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromInit = CopyFromTextBasedInit,
+    .CopyFromProcessOption = CopyFromTextBasedProcessOption,
+    .CopyFromFillCopyInResponse = CopyFromTextBasedFillCopyInResponse,
+    .CopyFromStart = CopyFromTextBasedStart,
+    .CopyFromOneRow = CopyFromCSVOneRow,
+    .CopyFromEnd = CopyFromTextBasedEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromInit = CopyFromBinaryInit,
+    .CopyFromProcessOption = CopyFromBinaryProcessOption,
+    .CopyFromFillCopyInResponse = CopyFromBinaryFillCopyInResponse,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/*
+ * Process the "format" option for COPY FROM.
+ *
+ * If defel is NULL, the default format "text" is used.
+ */
+void
+ProcessCopyOptionFormatFrom(ParseState *pstate,
+                            CopyFormatOptions *opts_out,
+                            CopyFromState cstate,
+                            DefElem *defel)
+{
+    char       *format;
+
+    if (defel)
+        format = defGetString(defel);
+    else
+        format = "text";
+
+    if (strcmp(format, "text") == 0)
+        opts_out->from_routine = &CopyFromRoutineText;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        opts_out->from_routine = &CopyFromRoutineCSV;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        opts_out->from_routine = &CopyFromRoutineBinary;
+    }
+    else
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    opts_out->from_routine->CopyFromInit(cstate);
+}
+
+
 /*
  * error context callback for COPY FROM
  *
@@ -1384,9 +1632,6 @@ BeginCopyFrom(ParseState *pstate,
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
                 num_defaults;
-    FmgrInfo   *in_functions;
-    Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1571,25 +1816,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1608,8 +1834,6 @@ BeginCopyFrom(ParseState *pstate,
      * the input function), and info about defaults and constraints. (Which
      * input function we use depends on text/binary format choice.)
      */
-    in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
 
@@ -1621,15 +1845,6 @@ BeginCopyFrom(ParseState *pstate,
         if (att->attisdropped)
             continue;
 
-        /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-        else
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
 
@@ -1689,8 +1904,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->bytes_processed = 0;
 
     /* We keep those variables in cstate. */
-    cstate->in_functions = in_functions;
-    cstate->typioparams = typioparams;
     cstate->defmap = defmap;
     cstate->defexprs = defexprs;
     cstate->volatile_defexprs = volatile_defexprs;
@@ -1763,20 +1976,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
-    {
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->opts.from_routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1789,6 +1989,8 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    cstate->opts.from_routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 7cacd0b752..f6b130458b 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -171,15 +171,9 @@ void
 ReceiveCopyBegin(CopyFromState cstate)
 {
     StringInfoData buf;
-    int            natts = list_length(cstate->attnumlist);
-    int16        format = (cstate->opts.binary ? 1 : 0);
-    int            i;
 
     pq_beginmessage(&buf, PqMsg_CopyInResponse);
-    pq_sendbyte(&buf, format);    /* overall format */
-    pq_sendint16(&buf, natts);
-    for (i = 0; i < natts; i++)
-        pq_sendint16(&buf, format); /* per-column formats */
+    cstate->opts.from_routine->CopyFromFillCopyInResponse(cstate, &buf);
     pq_endmessage(&buf);
     cstate->copy_src = COPY_FRONTEND;
     cstate->fe_msgbuf = makeStringInfo();
@@ -740,8 +734,19 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
     return copied_bytes;
 }
 
+typedef int (*CopyReadAttributes) (CopyFromState cstate);
+
 /*
- * Read raw fields in the next line for COPY FROM in text or csv mode.
+ * Read raw fields in the next line for COPY FROM in text or csv
+ * mode. CopyReadAttributesText() must be used for text mode and
+ * CopyReadAttributesCSV() for csv mode. This inconvenient is for
+ * optimization. If "if (cstate->opts.csv_mode)" branch is removed, there is
+ * performance merit for COPY FROM with many tuples.
+ *
+ * NextCopyFromRawFields() can be used instead for convenience
+ * use. NextCopyFromRawFields() chooses CopyReadAttributesText() or
+ * CopyReadAttributesCSV() internally.
+ *
  * Return false if no more lines.
  *
  * An internal temporary buffer is returned via 'fields'. It is valid until
@@ -751,8 +756,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  *
  * NOTE: force_not_null option are not applied to the returned fields.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static inline bool
+NextCopyFromRawFieldsInternal(CopyFromState cstate, char ***fields, int *nfields, CopyReadAttributes
copy_read_attributes)
 {
     int            fldct;
     bool        done;
@@ -775,11 +780,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
         {
             int            fldnum;
 
-            if (cstate->opts.csv_mode)
-                fldct = CopyReadAttributesCSV(cstate);
-            else
-                fldct = CopyReadAttributesText(cstate);
-
+            fldct = copy_read_attributes(cstate);
             if (fldct != list_length(cstate->attnumlist))
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
@@ -830,16 +831,240 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
         return false;
 
     /* Parse the line into de-escaped field values */
-    if (cstate->opts.csv_mode)
-        fldct = CopyReadAttributesCSV(cstate);
-    else
-        fldct = CopyReadAttributesText(cstate);
+    fldct = copy_read_attributes(cstate);
 
     *fields = cstate->raw_fields;
     *nfields = fldct;
     return true;
 }
 
+/*
+ * See NextCopyFromRawFieldsInternal() for details.
+ */
+bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+{
+    if (cstate->opts.csv_mode)
+        return NextCopyFromRawFieldsInternal(cstate, fields, nfields, CopyReadAttributesCSV);
+    else
+        return NextCopyFromRawFieldsInternal(cstate, fields, nfields, CopyReadAttributesText);
+}
+
+typedef char *(*PostpareColumnValue) (CopyFromState cstate, char *string, int m);
+
+static inline char *
+PostpareColumnValueText(CopyFromState cstate, char *string, int m)
+{
+    /* do nothing */
+    return string;
+}
+
+static inline char *
+PostpareColumnValueCSV(CopyFromState cstate, char *string, int m)
+{
+    if (string == NULL &&
+        cstate->opts.force_notnull_flags[m])
+    {
+        /*
+         * FORCE_NOT_NULL option is set and column is NULL - convert it to the
+         * NULL string.
+         */
+        string = cstate->opts.null_print;
+    }
+    else if (string != NULL && cstate->opts.force_null_flags[m]
+             && strcmp(string, cstate->opts.null_print) == 0)
+    {
+        /*
+         * FORCE_NULL option is set and column matches the NULL string. It
+         * must have been quoted, or otherwise the string would already have
+         * been set to NULL. Convert it to NULL as specified.
+         */
+        string = NULL;
+    }
+    return string;
+}
+
+/*
+ * We don't use this function as a callback directly. We define
+ * CopyFromTextOneRow() and CopyFromCSVOneRow() and use them instead. It's for
+ * eliminating a "if (cstate->opts.csv_mode)" branch. This callback is called
+ * per tuple. So this optimization will be valuable when many tuples are
+ * copied.
+ *
+ * cstate->in_functions must be initialized in CopyFromTextBasedStart().
+ */
+static inline bool
+CopyFromTextBasedOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls, CopyReadAttributes
copy_read_attributes,PostpareColumnValue postpare_column_value)
 
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFieldsInternal(cstate, &field_strings, &fldct, copy_read_attributes))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        string = postpare_column_value(cstate, string, m);
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            cstate->num_errors++;
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    return CopyFromTextBasedOneRow(cstate, econtext, values, nulls, CopyReadAttributesText, PostpareColumnValueText);
+}
+
+bool
+CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    return CopyFromTextBasedOneRow(cstate, econtext, values, nulls, CopyReadAttributesCSV, PostpareColumnValueCSV);
+}
+
+/*
+ * cstate->in_functions must be initialized in CopyFromBinaryStart().
+ */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -857,181 +1082,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                cstate->num_errors++;
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    if (!cstate->opts.from_routine->CopyFromOneRow(cstate, econtext, values,
+                                                   nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index de316cfd81..cab05a0aa0 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -75,12 +75,11 @@ typedef struct CopyFormatOptions
     bool        convert_selectively;    /* do selective binary conversion? */
     CopyOnErrorChoice on_error; /* what to do when error happened */
     List       *convert_select; /* list of column names (can be NIL) */
+    const        CopyFromRoutine *from_routine;    /* callback routines for COPY
+                                                 * FROM */
     const        CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
-/* This is private in commands/copyfrom.c */
-typedef struct CopyFromStateData *CopyFromState;
-
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
 
@@ -89,6 +88,7 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    uint64 *processed);
 
 extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List
*options);
+extern void ProcessCopyOptionFormatFrom(ParseState *pstate, CopyFormatOptions *opts_out, CopyFromState cstate, DefElem
*defel);
 extern void ProcessCopyOptionFormatTo(ParseState *pstate, CopyFormatOptions *opts_out, CopyToState cstate, DefElem
*defel);
 extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause,
                                    const char *filename,
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index f8901cac51..9f5a4958aa 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -15,8 +15,54 @@
 #define COPYAPI_H
 
 #include "executor/tuptable.h"
+#include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 
+/* This is private in commands/copyfrom.c */
+typedef struct CopyFromStateData *CopyFromState;
+
+/* Routines for a COPY FROM format implementation. */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called when this CopyFromRoutine is chosen. This can be used for
+     * initialization.
+     */
+    void        (*CopyFromInit) (CopyFromState cstate);
+
+    /*
+     * Called for processing one COPY FROM option. This will return false when
+     * the given option is invalid.
+     */
+    bool        (*CopyFromProcessOption) (CopyFromState cstate, DefElem *defel);
+
+    /*
+     * Called when COPY FROM via the PostgreSQL protocol is started. This must
+     * fill buf as a valid CopyInResponse message:
+     *
+     */
+    /*--
+     * +--------+--------+--------+--------+--------+   +--------+--------+
+     * | Format | N attributes    | Attr1's format  |...| AttrN's format  |
+     * +--------+--------+--------+--------+--------+   +--------+--------+
+     * 0: text                      0: text               0: text
+     * 1: binary                    1: binary             1: binary
+     */
+    void        (*CopyFromFillCopyInResponse) (CopyFromState cstate, StringInfoData *buf);
+
+    /*
+     * Called when COPY FROM is started. This will initialize something and
+     * receive a header.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /* Copy one row. It returns false if no more tuples. */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+
+    /* Called when COPY FROM is ended. This will finalize something. */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+}            CopyFromRoutine;
+
 /* This is private in commands/copyto.c */
 typedef struct CopyToStateData *CopyToState;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..096b55011e 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -183,4 +183,8 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+
 #endif                            /* COPYFROM_INTERNAL_H */
-- 
2.43.0


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Dean Rasheed
Дата:
Сообщение: Re: Supporting MERGE on updatable views
Следующее
От: "Hayato Kuroda (Fujitsu)"
Дата:
Сообщение: RE: speed up a logical replica setup