Обсуждение: FDW Parallel Append
Hi hackers,
I am trying to improve my xdr_fdw (a foreign data wrapper that scan file systems that keep big data compacted) to scan partitions in parallel.
I have set "IsForeignScanParallelSafe" to true and added path with "add_partial_path". My "GetForeignPaths" looks like code below:
path = create_foreignscan_path(root, baserel, NULL, fpinfo->rows, fpinfo->startup_cost, fpinfo->total_cost,
NIL, NULL, NULL, NIL);
add_path(baserel, (Path *) path);
/* If appropriate, consider parallel scan */
if (baserel->consider_parallel && baserel->lateral_relids == NULL)
create_foreign_partial_paths(root, baserel);
.
.
.
create_foreign_partial_paths(PlannerInfo *root, RelOptInfo *baserel)
{
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
ForeignPath *path;
int parallel_workers;
/* TODO: Compute workers dynamically (in XDR we need to know the number of scan parttions)
parallel_workers = compute_parallel_worker(baserel, baserel->pages, -1, max_parallel_workers_per_gather);
*/
parallel_workers = max_parallel_workers_per_gather;
/* If any limit was set to zero, the user doesn't want a parallel scan. */
if (parallel_workers <= 0)
return;
path = create_foreignscan_path(root, baserel, NULL, fpinfo->rows, fpinfo->startup_cost, fpinfo->total_cost,
NIL, NULL, NULL, NIL);
path->path.parallel_aware = true;
path->path.parallel_safe = baserel->consider_parallel;
path->path.parallel_workers = parallel_workers;
/* Add an unordered partial path based on a parallel sequential scan. */
add_partial_path(baserel, (Path *) path);
}
For now, I don't need my FDW to be parallel aware, but parallel safe only, with each worker
responsible to scan an entire partition.
Doing so, I get a Parallel Append node to add my partitions data in parallel.
An EXPLAIN looks like:
EXPLAIN SELECT * FROM precio WHERE fecha BETWEEN '2017-01-01' AND '2017-01-02'
Gather (cost=1200.00..1386200.00 rows=60000000 width=126)
Workers Planned: 2
-> Parallel Append (cost=200.00..1325200.00 rows=60000000 width=60)
-> Foreign Scan on precio_xdr20170101 (cost=200.00..1200200.00 rows=30000000 width=60)
Filter: ((fecha >= '2017-01-01'::date) AND (fecha <= '2017-01-02'::date))
-> Foreign Scan on precio_xdr20170102 (cost=200.00..1200200.00 rows=30000000 width=60)
Filter: ((fecha >= '2017-01-01'::date) AND (fecha <= '2017-01-02'::date))
But when I execute or analyze I get an error:
EXPLAIN ANALYZE SELECT * FROM precio WHERE fecha BETWEEN '2017-01-01' AND '2017-01-02'
ERROR: ExtensibleNodeMethods "XDRInfo" was not registered
CONTEXT: parallel worker
SQL state: 42704
XDRInfo is my private structure that implement extensible methods.
It is registered in my "xdr_fdw_handler"
Datum
xdr_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine *routine = makeNode(FdwRoutine);
.
.
.
RegisterXDRInfoExtension();
PG_RETURN_POINTER(routine);
}
I think that each new worker process need to call my RegisterXDRInfoExtension function.
But where do I call it? Do I have an entry point each time a new worker process is created?
I have tried to handle "InitializeWorkerForeignScan" but it is not being called. I think that function
is only called for parallel aware scans, correct?
Is there a way to handle this?
Thanks in advance,
Sanyo Moura.
Hi, On 2018/10/31 3:25, Sanyo Moura wrote: > Hi hackers, > > I am trying to improve my xdr_fdw (a foreign data wrapper that scan file > systems that keep big data compacted) to scan partitions in parallel. > > But when I execute or analyze I get an error: > > EXPLAIN ANALYZE SELECT * FROM precio WHERE fecha BETWEEN '2017-01-01' AND > '2017-01-02' > > ERROR: ExtensibleNodeMethods "XDRInfo" was not registered > CONTEXT: parallel worker > SQL state: 42704 > > XDRInfo is my private structure that implement extensible methods. > It is registered in my "xdr_fdw_handler" > > Datum > xdr_fdw_handler(PG_FUNCTION_ARGS) > { > FdwRoutine *routine = makeNode(FdwRoutine); > . > . > . > RegisterXDRInfoExtension(); > > PG_RETURN_POINTER(routine); > } > > I think that each new worker process need to call my > RegisterXDRInfoExtension function. > But where do I call it? Do I have an entry point each time a new worker > process is created? I think you'll need to call your Register* function from the _PG_init function of your module. If you haven't defined the _PG_init function, you should do that first. Thanks, Amit
Hello Amit
Tks a lot, that worked. Now I have to deal with other concerns when lauching more than 1 worker.
I'll try to advance my tests.
Regards,
Sanyo Moura
Em qua, 31 de out de 2018 às 01:38, Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> escreveu:
Hi,
On 2018/10/31 3:25, Sanyo Moura wrote:
> Hi hackers,
>
> I am trying to improve my xdr_fdw (a foreign data wrapper that scan file
> systems that keep big data compacted) to scan partitions in parallel.
>
> But when I execute or analyze I get an error:
>
> EXPLAIN ANALYZE SELECT * FROM precio WHERE fecha BETWEEN '2017-01-01' AND
> '2017-01-02'
>
> ERROR: ExtensibleNodeMethods "XDRInfo" was not registered
> CONTEXT: parallel worker
> SQL state: 42704
>
> XDRInfo is my private structure that implement extensible methods.
> It is registered in my "xdr_fdw_handler"
>
> Datum
> xdr_fdw_handler(PG_FUNCTION_ARGS)
> {
> FdwRoutine *routine = makeNode(FdwRoutine);
> .
> .
> .
> RegisterXDRInfoExtension();
>
> PG_RETURN_POINTER(routine);
> }
>
> I think that each new worker process need to call my
> RegisterXDRInfoExtension function.
> But where do I call it? Do I have an entry point each time a new worker
> process is created?
I think you'll need to call your Register* function from the _PG_init
function of your module. If you haven't defined the _PG_init function,
you should do that first.
Thanks,
Amit