Обсуждение: parallel pg_restore - WIP patch
Attached is my WIP patch for parallel pg_restore. It's still very rough, but seems to work. Anyone who can test this with highend equipment would be helping some. cheers andrew Index: pg_backup.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v retrieving revision 1.47 diff -c -r1.47 pg_backup.h *** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47 --- pg_backup.h 23 Sep 2008 18:10:58 -0000 *************** *** 123,128 **** --- 123,130 ---- int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; + int number_of_threads; + bool truncate_before_load; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; *************** *** 165,170 **** --- 167,173 ---- extern void CloseArchive(Archive *AH); extern void RestoreArchive(Archive *AH, RestoreOptions *ropt); + extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt); /* Open an existing archive */ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); Index: pg_backup_archiver.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v retrieving revision 1.158 diff -c -r1.158 pg_backup_archiver.c *** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158 --- pg_backup_archiver.c 23 Sep 2008 18:10:59 -0000 *************** *** 27,38 **** --- 27,50 ---- #include <unistd.h> + #include <sys/types.h> + #include <sys/wait.h> + + #ifdef WIN32 #include <io.h> #endif #include "libpq/libpq-fs.h" + typedef struct _parallel_slot + { + pid_t pid; + TocEntry *te; + DumpId dumpId; + } ParallelSlot; + + #define NO_SLOT (-1) const char *progname; *************** *** 70,76 **** --- 82,99 ---- static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); + static bool work_is_being_done(ParallelSlot *slot, int n_slots); + static int get_next_slot(ParallelSlot *slots, int n_slots); + static TocEntry *get_next_work_item(ArchiveHandle *AH); + static void prestore(ArchiveHandle *AH, TocEntry *te); + static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots); + static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel); + static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te); + static void _fix_dependency_counts(ArchiveHandle *AH); + static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te); + + static ArchiveHandle *GAH; /* * Wrapper functions. *************** *** 125,137 **** /* Public */ void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; --- 148,523 ---- /* Public */ void + RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt) + { + + ArchiveHandle *AH = (ArchiveHandle *) AHX; + ParallelSlot *slots; + int next_slot; + TocEntry *next_work_item = NULL; + int work_status; + pid_t ret_child; + int n_slots = ropt->number_of_threads; + TocEntry *te; + teReqs reqs; + + + /* AH->debugLevel = 99; */ + /* some routines that use ahlog() don't get passed AH */ + GAH = AH; + + ahlog(AH,1,"entering RestoreARchiveParallel\n"); + + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots); + AH->ropt = ropt; + + if (ropt->create) + die_horribly(AH,modulename, + "parallel restore is incompatible with --create\n"); + + if (ropt->dropSchema) + die_horribly(AH,modulename, + "parallel restore is incompatible with --clean\n"); + + if (!ropt->useDB) + die_horribly(AH,modulename, + "parallel restore requires direct database connection\n"); + + + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, + "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } + #endif + + ahlog(AH, 1, "connecting to database for restore\n"); + if (AH->version < K_VERS_1_3) + die_horribly(AH, modulename, + "direct database connections are not supported in pre-1.3 archives\n"); + + /* XXX Should get this from the archive */ + AHX->minRemoteVersion = 070100; + AHX->maxRemoteVersion = 999999; + + /* correct dependency counts in case we're doing a partial restore */ + if (ropt->idWanted == NULL) + InitDummyWantedList(AHX,ropt); + _fix_dependency_counts(AH); + + /* + * Since we're talking to the DB directly, don't send comments since they + * obscure SQL when displaying errors + */ + AH->noTocComments = 1; + + /* Do all the early stuff in a single connection in the parent. + * There's no great point in running it in parallel and it will actually + * run faster in a single connection because we avoid all the connection + * and setup overhead, including the 0.5s sleep below. + */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + while((next_work_item = get_next_work_item(AH)) != NULL) + { + /* XXX need to improve this test in case there is no table data */ + /* need to test for indexes, FKs, PK, Unique, etc */ + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) + break; + (void) _restore_one_te(AH, next_work_item, ropt, false); + + next_work_item->prestored = true; + + _reduce_dependencies(AH,next_work_item); + } + + + /* + * now close parent connection in prep for parallel step. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* main parent loop */ + + ahlog(AH,1,"entering main loop\n"); + + while (((next_work_item = get_next_work_item(AH)) != NULL) || + (work_is_being_done(slots,n_slots))) + { + if (next_work_item != NULL && + ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT)) + { + /* there is work still to do and a worker slot available */ + + pid_t child; + + next_work_item->prestored = true; + + child = fork(); + if (child == 0) + { + prestore(AH,next_work_item); + /* should not happen ... we expect prestore to exit */ + exit(1); + } + else if (child > 0) + { + slots[next_slot].pid = child; + slots[next_slot].te = next_work_item; + slots[next_slot].dumpId = next_work_item->dumpId; + } + else + { + /* XXX fork error - handle it! */ + } + /* delay just long enough betweek forks to give the catalog some + * breathing space. Without this sleep I got + * "tuple concurrently updated" errors. + */ + pg_usleep(500000); + continue; /* in case the slots are not yet full */ + } + /* if we get here there must be work being done */ + ret_child = wait(&work_status); + + if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0) + { + mark_work_done(AH, ret_child, slots, n_slots); + } + else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == ret_child) + _inhibit_data_for_failed_table(AH, slots[i].te); + break; + } + mark_work_done(AH, ret_child, slots, n_slots); + } + else + { + /* XXX something went wrong - deal with it */ + } + } + + /* + * now process the ACLs - no need to do this in parallel + */ + + /* reconnect from parent */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Scan TOC to output ownership commands and ACLs + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, true); + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "setting owner and privileges for %s %s\n", + te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, true); + } + } + + /* clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + } + + static bool + work_is_being_done(ParallelSlot *slot, int n_slots) + { + ahlog(GAH,1,"is work being done?\n"); + while(n_slots--) + { + if (slot->pid > 0) + return true; + slot++; + } + ahlog(GAH,1,"work is not being done\n"); + return false; + } + + static int + get_next_slot(ParallelSlot *slots, int n_slots) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == 0) + { + ahlog(GAH,1,"available slots is %d\n",i); + return i; + } + } + ahlog(GAH,1,"No slot available\n"); + return NO_SLOT; + } + + static TocEntry* + get_next_work_item(ArchiveHandle *AH) + { + TocEntry *te; + teReqs reqs; + + /* just search from the top of the queue until we find an available item. + * Note that the queue isn't reordered in the current implementation. If + * we ever do reorder it, then certain code that processes entries from the + * current item to the end of the queue will probably need to be + * re-examined. + */ + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (!te->prestored && te->nDeps < 1) + { + /* make sure it's not an ACL */ + reqs = _tocEntryRequired (te, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) + { + ahlog(AH,1,"next item is %d\n",te->dumpId); + return te; + } + } + } + ahlog(AH,1,"No item ready\n"); + return NULL; + } + + static void + prestore(ArchiveHandle *AH, TocEntry *te) + { + RestoreOptions *ropt = AH->ropt; + int retval; + + ConnectDatabase((Archive *)AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + retval = _restore_one_te(AH, te, ropt, true); + + PQfinish(AH->connection); + exit(retval); + + } + + static void + mark_work_done(ArchiveHandle *AH, pid_t worker, + ParallelSlot *slots, int n_slots) + { + + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == worker) + { + te = slots[i].te; + slots[i].pid = 0; + slots[i].te = NULL; + slots[i].dumpId = 0; + break; + } + } + + /* Assert (te != NULL); */ + + _reduce_dependencies(AH,te); + + + } + + + /* + * Make sure the head of each dependency chain is a live item + * + * Once this is established the property will be maintained by + * _reduce_dependencies called as items are done. + */ + static void + _fix_dependency_counts(ArchiveHandle *AH) + { + TocEntry * te; + RestoreOptions * ropt = AH->ropt; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + static void + _reduce_dependencies(ArchiveHandle * AH, TocEntry *te) + { + DumpId item = te->dumpId; + RestoreOptions * ropt = AH->ropt; + int i; + + for (te = te->next; te != AH->toc; te = te->next) + { + for (i = 0; i < te->nDeps; i++) + if (te->dependencies[i] == item) + { + te->nDeps = te->nDeps - 1; + /* + * If this item won't in fact be done, and is now at + * 0 dependency count, we pretend it's been done and + * reduce the dependency counts of all the things that + * depend on it, by a recursive call + */ + if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + + break; + } + } + + } + + + /* Public */ + void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; *************** *** 171,176 **** --- 557,576 ---- AH->noTocComments = 1; } + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in thisinstallation)\n"); + } + } + #endif + /* * Work out if we have an implied data-only restore. This can happen if * the dump was data only or if the user has used a toc list to exclude *************** *** 270,409 **** */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! AH->currentTE = te; ! ! /* Work out what, if anything, we want from this entry */ ! reqs = _tocEntryRequired(te, ropt, false); ! ! /* Dump any relevant dump warnings to stderr */ ! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) ! { ! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->defn); ! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); ! } ! ! defnDumped = false; ! ! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ ! { ! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); ! ! _printTocEntry(AH, te, ropt, false, false); ! defnDumped = true; ! ! /* ! * If we could not create a table and --no-data-for-failed-tables ! * was given, ignore the corresponding TABLE DATA ! */ ! if (ropt->noDataForFailedTables && ! AH->lastErrorTE == te && ! strcmp(te->desc, "TABLE") == 0) ! { ! TocEntry *tes; ! ! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", ! te->tag); ! ! for (tes = te->next; tes != AH->toc; tes = tes->next) ! { ! if (strcmp(tes->desc, "TABLE DATA") == 0 && ! strcmp(tes->tag, te->tag) == 0 && ! strcmp(tes->namespace ? tes->namespace : "", ! te->namespace ? te->namespace : "") == 0) ! { ! /* mark it unwanted */ ! ropt->idWanted[tes->dumpId - 1] = false; ! break; ! } ! } ! } ! ! /* If we created a DB, connect to it... */ ! if (strcmp(te->desc, "DATABASE") == 0) ! { ! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); ! _reconnectToDB(AH, te->tag); ! } ! } ! ! /* ! * If we have a data component, then process it ! */ ! if ((reqs & REQ_DATA) != 0) ! { ! /* ! * hadDumper will be set if there is genuine data component for ! * this node. Otherwise, we need to check the defn field for ! * statements that need to be executed in data-only restores. ! */ ! if (te->hadDumper) ! { ! /* ! * If we can output the data, then restore it. ! */ ! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) ! { ! #ifndef HAVE_LIBZ ! if (AH->compression != 0) ! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supportedin this installation)\n"); ! #endif ! ! _printTocEntry(AH, te, ropt, true, false); ! ! if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) ! { ! ahlog(AH, 1, "restoring %s\n", te->desc); ! ! _selectOutputSchema(AH, "pg_catalog"); ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! } ! else ! { ! _disableTriggersIfNecessary(AH, te, ropt); ! ! /* Select owner and schema as necessary */ ! _becomeOwner(AH, te); ! _selectOutputSchema(AH, te->namespace); ! ! ahlog(AH, 1, "restoring data for table \"%s\"\n", ! te->tag); ! ! /* ! * If we have a copy statement, use it. As of V1.3, ! * these are separate to allow easy import from ! * withing a database connection. Pre 1.3 archives can ! * not use DB connections and are sent to output only. ! * ! * For V1.3+, the table data MUST have a copy ! * statement so that we can go into appropriate mode ! * with libpq. ! */ ! if (te->copyStmt && strlen(te->copyStmt) > 0) ! { ! ahprintf(AH, "%s", te->copyStmt); ! AH->writingCopyData = true; ! } ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! ! AH->writingCopyData = false; ! ! _enableTriggersIfNecessary(AH, te, ropt); ! } ! } ! } ! else if (!defnDumped) ! { ! /* If we haven't already dumped the defn part, do so now */ ! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); ! _printTocEntry(AH, te, ropt, false, false); ! } ! } ! } /* end loop over TOC entries */ /* * Scan TOC again to output ownership commands and ACLs --- 670,677 ---- */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! (void) _restore_one_te(AH, te, ropt, false); ! } /* * Scan TOC again to output ownership commands and ACLs *************** *** 451,456 **** --- 719,899 ---- } } + static int + _restore_one_te(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) + { + teReqs reqs; + bool defnDumped; + int retval = 0; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } + + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; + + /* + * If we could not create a table and --no-data-for-failed-tables + * was given, ignore the corresponding TABLE DATA + * + * For the parallel case this must be done in the parent, so we just + * set a return value. + */ + if (ropt->noDataForFailedTables && + AH->lastErrorTE == te && + strcmp(te->desc, "TABLE") == 0) + { + if (is_parallel) + retval = 1; + else + _inhibit_data_for_failed_table(AH,te); + } + + /* If we created a DB, connect to it... */ + /* won't happen in parallel restore */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for + * this node. Otherwise, we need to check the defn field for + * statements that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + if (ropt->truncate_before_load) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + + ahprintf(AH, "TRUNCATE TABLE %s;\n\n", + fmtId(te->tag)); } + + /* + * If we have a copy statement, use it. As of V1.3, + * these are separate to allow easy import from + * withing a database connection. Pre 1.3 archives can + * not use DB connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy + * statement so that we can go into appropriate mode + * with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; + + if (ropt->truncate_before_load) + { + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); + } + + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } + } + + return retval; + } + + static void + _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te) + { + TocEntry *tes; + RestoreOptions *ropt = AH->ropt; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = te->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted */ + ropt->idWanted[tes->dumpId - 1] = false; + + _reduce_dependencies(AH, tes); + break; + } + } + } + /* * Allocate a new RestoreOptions block. * This is mainly so we can initialize it, but also for future expansion, Index: pg_backup_archiver.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v retrieving revision 1.76 diff -c -r1.76 pg_backup_archiver.h *** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76 --- pg_backup_archiver.h 23 Sep 2008 18:10:59 -0000 *************** *** 231,236 **** --- 231,237 ---- char *archdbname; /* DB name *read* from archive */ bool requirePassword; PGconn *connection; + char *cachepw; int connectToDB; /* Flag to indicate if direct DB connection is * required */ bool writingCopyData; /* True when we are sending COPY data */ *************** *** 284,289 **** --- 285,291 ---- DumpId dumpId; bool hadDumper; /* Archiver was passed a dumper routine (used * in restore) */ + bool prestored; /* keep track of parallel restore */ char *tag; /* index tag */ char *namespace; /* null or empty string if not in a schema */ char *tablespace; /* null if not in a tablespace; empty string Index: pg_backup_db.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v retrieving revision 1.80 diff -c -r1.80 pg_backup_db.c *** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80 --- pg_backup_db.c 23 Sep 2008 18:10:59 -0000 *************** *** 206,220 **** if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } else AH->requirePassword = false; /* * Start the connection. Loop until we have a password if requested by --- 206,226 ---- if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } + else if (reqPwd) + { + password = AH->cachepw; + } else + { AH->requirePassword = false; + } /* * Start the connection. Loop until we have a password if requested by *************** *** 241,247 **** } while (new_pass); if (password) ! free(password); /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) --- 247,253 ---- } while (new_pass); if (password) ! AH->cachepw = password; /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) Index: pg_restore.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v retrieving revision 1.88 diff -c -r1.88 pg_restore.c *** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88 --- pg_restore.c 23 Sep 2008 18:10:59 -0000 *************** *** 78,83 **** --- 78,84 ---- static int no_data_for_failed_tables = 0; static int outputNoTablespaces = 0; static int use_setsessauth = 0; + static int truncate_before_load = 0; struct option cmdopts[] = { {"clean", 0, NULL, 'c'}, *************** *** 92,97 **** --- 93,99 ---- {"ignore-version", 0, NULL, 'i'}, {"index", 1, NULL, 'I'}, {"list", 0, NULL, 'l'}, + {"multi-thread",1,NULL,'m'}, {"no-privileges", 0, NULL, 'x'}, {"no-acl", 0, NULL, 'x'}, {"no-owner", 0, NULL, 'O'}, *************** *** 114,119 **** --- 116,122 ---- {"disable-triggers", no_argument, &disable_triggers, 1}, {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1}, {"no-tablespaces", no_argument, &outputNoTablespaces, 1}, + {"truncate-before-load", no_argument, &truncate_before_load, 1}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {NULL, 0, NULL, 0} *************** *** 139,145 **** } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) --- 142,148 ---- } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) *************** *** 182,187 **** --- 185,194 ---- opts->tocFile = strdup(optarg); break; + case 'm': + opts->number_of_threads = atoi(optarg); /* XXX fix error checking */ + break; + case 'n': /* Dump data for this schema only */ opts->schemaNames = strdup(optarg); break; *************** *** 262,268 **** break; case 0: ! /* This covers the long options equivalent to -X xxx. */ break; case '1': /* Restore data in a single transaction */ --- 269,278 ---- break; case 0: ! /* ! * This covers the long options without a short equivalent, ! * including those equivalent to -X xxx. ! */ break; case '1': /* Restore data in a single transaction */ *************** *** 299,304 **** --- 309,329 ---- opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; opts->use_setsessauth = use_setsessauth; + opts->truncate_before_load = truncate_before_load; + + if (opts->single_txn) + { + if (opts->number_of_threads > 1) + { + write_msg(NULL, "single transaction not compatible with multi-threading"); + exit(1); + } + else if (opts->truncate_before_load) + { + write_msg(NULL, "single transaction not compatible with truncate-before-load"); + exit(1); + } + } if (opts->formatName) { *************** *** 330,335 **** --- 355,362 ---- AH = OpenArchive(inputFileSpec, opts->format); + /* XXX looks like we'll have to do sanity checks in the parallel archiver */ + /* Let the archiver know how noisy to be */ AH->verbose = opts->verbose; *************** *** 351,356 **** --- 378,385 ---- if (opts->tocSummary) PrintTOCSummary(AH, opts); + else if (opts->number_of_threads > 1) + RestoreArchiveParallel(AH, opts); else RestoreArchive(AH, opts);
Andrew Dunstan wrote: > > Attached is my WIP patch for parallel pg_restore. It's still very rough, > but seems to work. > > Anyone who can test this with highend equipment would be helping some. tried playing with this(on a 22Gb compressed dump using 4 connections) but it does not seem to work at all for me: pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] could not uncompress data: invalid block type pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] could not uncompress data: invalid stored block lengths pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] could not uncompress data: invalid distance too far back pg_restore: [custom archiver] could not uncompress data: invalid distances set pg_restore: [custom archiver] could not uncompress data: invalid code lengths set pg_restore: [custom archiver] could not uncompress data: incorrect data check pg_restore: [custom archiver] could not uncompress data: invalid code lengths set pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] out of memory pg_restore: [custom archiver] could not uncompress data: invalid literal/length code pg_restore: [custom archiver] could not uncompress data: invalid literal/length code pg_restore: [custom archiver] could not uncompress data: invalid block type each pg_restore process seem to eat a few gigabytes of memory in a few seconds. Stefan
Hi, As I'm interested in this topic, I thought I'd take a look at the patch. I have no capability to test it on high end hardware but did some basic testing on my workstation and basic review of the patch. I somehow had the impression that instead of creating a new connection for each restore item we would create the processes at the start and then send them the dumpId's they should be restoring. That would allow the controller to batch dumpId's together and expect the worker to process them in a transaction. But this is probably just an idea I created in my head. Do we know why we experience "tuple concurrently updated" errors if we spawn thread too fast? I completed some test restores using the pg_restore from head with the patch applied. The dump was a custom dump created with pg 8.2 and restored to an 8.2 database. To confirm this would work, I completed a restore using the standard single threaded mode. The schema restore successfully. The only errors reported involved non-existent roles. When I attempt to restore using parallel restore I get out of memory errors reported from _PrintData. The code returning the error is; _PrintData(... while (blkLen != 0) { if (blkLen + 1 > ctx->inSize) { free(ctx->zlibIn); ctx->zlibIn = NULL; ctx->zlibIn = (char *) malloc(blkLen + 1); if (!ctx->zlibIn) die_horribly(AH,modulename, " out of memory\n"); ctx->inSize = blkLen + 1; in = ctx->zlibIn; } It appears from my debugging and looking at the code that in _PrintData; lclContext *ctx = (lclContext *) AH->formatData; the memory context is shared across all threads. Which means that it's possible the memory contexts are stomping on each other. My GDB skills are now up to being able to reproduce this in a gdb session as there are forks going on all over the place. And if you process them in a serial fashion, there aren't any errors. I'm not sure of the fix for this. But in a parallel environment it doesn't seem possible to store the memory context in the AH. I also receive messages saying "pg_restore: [custom archiver] could not read from input file: end of file". I have not investigated these further as my current guess is they are linked to the out of memory error. Given I ran into this error at my first testing attempt I haven't evaluated much else at this point in time. Now all this could be because I'm using the 8.2 archive, but it works fine in single restore mode. The dump file is about 400M compressed and an entire archive schema was removed from the restore path with a custom restore list. Command line used; PGPORT=5432 ./pg_restore -h /var/run/postgresql -m4 --truncate-before-load -v -d tt2 -L tt.list /home/mr-russ/pg-index-test/timetable.pgdump 2> log.txt I sent the log and this email originally to the list, but I think the attachment was too large, so I've resent without anyattachements. Since my initial testing, Stefan has confirmed the problem I am having. If you have any questions, would like me to run other tests or anything, feel free to contact me. Regards Russell
Russell Smith wrote: > Hi, > > As I'm interested in this topic, I thought I'd take a look at the > patch. I have no capability to test it on high end hardware but did > some basic testing on my workstation and basic review of the patch. > > I somehow had the impression that instead of creating a new connection > for each restore item we would create the processes at the start and > then send them the dumpId's they should be restoring. That would allow > the controller to batch dumpId's together and expect the worker to > process them in a transaction. But this is probably just an idea I > created in my head. > Yes it is. To do that I would have to invent a protocol for talking to the workers, etc, and there is not the slightest chance I would get that done by November. And I don't see the virtue in processing them all in a transaction. I've provided a much simpler means of avoiding WAL logging of the COPY. > Do we know why we experience "tuple concurrently updated" errors if we > spawn thread too fast? > No. That's an open item. > I completed some test restores using the pg_restore from head with the > patch applied. The dump was a custom dump created with pg 8.2 and > restored to an 8.2 database. To confirm this would work, I completed a > restore using the standard single threaded mode. The schema restore > successfully. The only errors reported involved non-existent roles. > > When I attempt to restore using parallel restore I get out of memory > errors reported from _PrintData. The code returning the error is; > > _PrintData(... > while (blkLen != 0) > { > if (blkLen + 1 > ctx->inSize) > { > free(ctx->zlibIn); > ctx->zlibIn = NULL; > ctx->zlibIn = (char *) malloc(blkLen + 1); > if (!ctx->zlibIn) > die_horribly(AH, modulename, " out of memory\n"); > > ctx->inSize = blkLen + 1; > in = ctx->zlibIn; > } > > > It appears from my debugging and looking at the code that in _PrintData; > lclContext *ctx = (lclContext *) AH->formatData; > > the memory context is shared across all threads. Which means that it's > possible the memory contexts are stomping on each other. My GDB skills > are now up to being able to reproduce this in a gdb session as there are > forks going on all over the place. And if you process them in a serial > fashion, there aren't any errors. I'm not sure of the fix for this. > But in a parallel environment it doesn't seem possible to store the > memory context in the AH. > There are no threads, hence nothing is shared. fork() create s new process, not a new thread, and all they share are file descriptors. > I also receive messages saying "pg_restore: [custom archiver] could not > read from input file: end of file". I have not investigated these > further as my current guess is they are linked to the out of memory error. > > Given I ran into this error at my first testing attempt I haven't > evaluated much else at this point in time. Now all this could be > because I'm using the 8.2 archive, but it works fine in single restore > mode. The dump file is about 400M compressed and an entire archive > schema was removed from the restore path with a custom restore list. > > Command line used; PGPORT=5432 ./pg_restore -h /var/run/postgresql -m4 > --truncate-before-load -v -d tt2 -L tt.list > /home/mr-russ/pg-index-test/timetable.pgdump 2> log.txt > > I've attached the log.txt file so you can review the errors that I saw. > I have adjusted the "out of memory" error to include a number to work > out which one was being triggered. So you'll see "5 out of memory" in > the log file, which corresponds to the code above. > However, there does seem to be something odd happening with the compression lib, which I will investigate. Thanks for the report. cheers andrew
Stefan Kaltenbrunner wrote: > Andrew Dunstan wrote: >> >> Attached is my WIP patch for parallel pg_restore. It's still very >> rough, but seems to work. >> >> Anyone who can test this with highend equipment would be helping some. > > tried playing with this(on a 22Gb compressed dump using 4 connections) > but it does not seem to work at all for me: > > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] could not uncompress data: invalid block > type > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] could not uncompress data: invalid > stored block lengths > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] could not uncompress data: invalid > distance too far back > pg_restore: [custom archiver] could not uncompress data: invalid > distances set > pg_restore: [custom archiver] could not uncompress data: invalid code > lengths set > pg_restore: [custom archiver] could not uncompress data: incorrect > data check > pg_restore: [custom archiver] could not uncompress data: invalid code > lengths set > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] out of memory > pg_restore: [custom archiver] could not uncompress data: invalid > literal/length code > pg_restore: [custom archiver] could not uncompress data: invalid > literal/length code > pg_restore: [custom archiver] could not uncompress data: invalid block > type > > each pg_restore process seem to eat a few gigabytes of memory in a few > seconds. Ouch. Ok, Thanks for the report. I will investigate. cheers andrew
Andrew Dunstan wrote: >> Do we know why we experience "tuple concurrently updated" errors if we >> spawn thread too fast? >> > > No. That's an open item. Okay, I'll see if I can have a little more of a look into it. No promises as the restore the restore isn't playing nicely. > >> >> the memory context is shared across all threads. Which means that it's >> possible the memory contexts are stomping on each other. My GDB skills >> are now up to being able to reproduce this in a gdb session as there are >> forks going on all over the place. And if you process them in a serial >> fashion, there aren't any errors. I'm not sure of the fix for this. >> But in a parallel environment it doesn't seem possible to store the >> memory context in the AH. >> > > > There are no threads, hence nothing is shared. fork() create s new > process, not a new thread, and all they share are file descriptors. > > However, there does seem to be something odd happening with the > compression lib, which I will investigate. Thanks for the report. I'm sorry, I meant processes there. I'm aware there are no threads. But my feeling was that when you forked with open files you got all of the open file properties, including positions, and as you dupped the descriptor, you share all that it's pointing to with every other copy of the descriptor. My brief research on that shows that in 2005 there was a kernel mailing list discussion on this issue. http://mail.nl.linux.org/kernelnewbies/2005-09/msg00479.html was quite informative for me. I again could be wrong but worth a read. If it is true, then the file needs to be reopened by each child, it can't use the duplicated descriptor. I haven't had a change to implementation test is as it's late here. But I'd take a stab that it will solve the compression library problems. I hope this helps, not hinders Russell.
Russell Smith wrote: > I'm sorry, I meant processes there. I'm aware there are no threads. > But my feeling was that when you forked with open files you got all of > the open file properties, including positions, and as you dupped the > descriptor, you share all that it's pointing to with every other copy of > the descriptor. My brief research on that shows that in 2005 there was > a kernel mailing list discussion on this issue. > http://mail.nl.linux.org/kernelnewbies/2005-09/msg00479.html was quite > informative for me. I again could be wrong but worth a read. If it is > true, then the file needs to be reopened by each child, it can't use the > duplicated descriptor. I haven't had a change to implementation test is > as it's late here. But I'd take a stab that it will solve the > compression library problems. > > I hope this helps, not hinders > > > I'm sure that's the problem. Should be fairly easily fixable, I believe. Thanks for the info. cheers andrew
This version of the patch should fix the "shared file descriptor" bug Russell Smith noticed. It also disables the 1/2 second sleep between forks, so the performance on a small db (regression) is vastly improved. cheers andrew Index: pg_backup.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v retrieving revision 1.47 diff -c -r1.47 pg_backup.h *** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47 --- pg_backup.h 26 Sep 2008 15:15:38 -0000 *************** *** 123,128 **** --- 123,130 ---- int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; + int number_of_threads; + bool truncate_before_load; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; *************** *** 165,170 **** --- 167,173 ---- extern void CloseArchive(Archive *AH); extern void RestoreArchive(Archive *AH, RestoreOptions *ropt); + extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt); /* Open an existing archive */ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); Index: pg_backup_archiver.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v retrieving revision 1.158 diff -c -r1.158 pg_backup_archiver.c *** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158 --- pg_backup_archiver.c 26 Sep 2008 15:15:39 -0000 *************** *** 27,38 **** --- 27,50 ---- #include <unistd.h> + #include <sys/types.h> + #include <sys/wait.h> + + #ifdef WIN32 #include <io.h> #endif #include "libpq/libpq-fs.h" + typedef struct _parallel_slot + { + pid_t pid; + TocEntry *te; + DumpId dumpId; + } ParallelSlot; + + #define NO_SLOT (-1) const char *progname; *************** *** 70,76 **** --- 82,99 ---- static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); + static bool work_is_being_done(ParallelSlot *slot, int n_slots); + static int get_next_slot(ParallelSlot *slots, int n_slots); + static TocEntry *get_next_work_item(ArchiveHandle *AH); + static void prestore(ArchiveHandle *AH, TocEntry *te); + static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots); + static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel); + static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te); + static void _fix_dependency_counts(ArchiveHandle *AH); + static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te); + + static ArchiveHandle *GAH; /* * Wrapper functions. *************** *** 125,137 **** /* Public */ void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; --- 148,529 ---- /* Public */ void + RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt) + { + + ArchiveHandle *AH = (ArchiveHandle *) AHX; + ParallelSlot *slots; + int next_slot; + TocEntry *next_work_item = NULL; + int work_status; + pid_t ret_child; + int n_slots = ropt->number_of_threads; + TocEntry *te; + teReqs reqs; + + + /* AH->debugLevel = 99; */ + /* some routines that use ahlog() don't get passed AH */ + GAH = AH; + + ahlog(AH,1,"entering RestoreARchiveParallel\n"); + + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots); + AH->ropt = ropt; + + if (ropt->create) + die_horribly(AH,modulename, + "parallel restore is incompatible with --create\n"); + + if (ropt->dropSchema) + die_horribly(AH,modulename, + "parallel restore is incompatible with --clean\n"); + + if (!ropt->useDB) + die_horribly(AH,modulename, + "parallel restore requires direct database connection\n"); + + + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, + "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } + #endif + + ahlog(AH, 1, "connecting to database for restore\n"); + if (AH->version < K_VERS_1_3) + die_horribly(AH, modulename, + "direct database connections are not supported in pre-1.3 archives\n"); + + /* XXX Should get this from the archive */ + AHX->minRemoteVersion = 070100; + AHX->maxRemoteVersion = 999999; + + /* correct dependency counts in case we're doing a partial restore */ + if (ropt->idWanted == NULL) + InitDummyWantedList(AHX,ropt); + _fix_dependency_counts(AH); + + /* + * Since we're talking to the DB directly, don't send comments since they + * obscure SQL when displaying errors + */ + AH->noTocComments = 1; + + /* Do all the early stuff in a single connection in the parent. + * There's no great point in running it in parallel and it will actually + * run faster in a single connection because we avoid all the connection + * and setup overhead, including the 0.5s sleep below. + */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + while((next_work_item = get_next_work_item(AH)) != NULL) + { + /* XXX need to improve this test in case there is no table data */ + /* need to test for indexes, FKs, PK, Unique, etc */ + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) + break; + (void) _restore_one_te(AH, next_work_item, ropt, false); + + next_work_item->prestored = true; + + _reduce_dependencies(AH,next_work_item); + } + + + /* + * now close parent connection in prep for parallel step. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* main parent loop */ + + ahlog(AH,1,"entering main loop\n"); + + while (((next_work_item = get_next_work_item(AH)) != NULL) || + (work_is_being_done(slots,n_slots))) + { + if (next_work_item != NULL && + ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT)) + { + /* there is work still to do and a worker slot available */ + + pid_t child; + + next_work_item->prestored = true; + + child = fork(); + if (child == 0) + { + prestore(AH,next_work_item); + /* should not happen ... we expect prestore to exit */ + exit(1); + } + else if (child > 0) + { + slots[next_slot].pid = child; + slots[next_slot].te = next_work_item; + slots[next_slot].dumpId = next_work_item->dumpId; + } + else + { + /* XXX fork error - handle it! */ + } + /* delay just long enough betweek forks to give the catalog some + * breathing space. Without this sleep I got + * "tuple concurrently updated" errors. + */ + /* pg_usleep(500000); */ + continue; /* in case the slots are not yet full */ + } + /* if we get here there must be work being done */ + ret_child = wait(&work_status); + + if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0) + { + mark_work_done(AH, ret_child, slots, n_slots); + } + else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == ret_child) + _inhibit_data_for_failed_table(AH, slots[i].te); + break; + } + mark_work_done(AH, ret_child, slots, n_slots); + } + else + { + /* XXX something went wrong - deal with it */ + } + } + + /* + * now process the ACLs - no need to do this in parallel + */ + + /* reconnect from parent */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Scan TOC to output ownership commands and ACLs + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, true); + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "setting owner and privileges for %s %s\n", + te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, true); + } + } + + /* clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + } + + static bool + work_is_being_done(ParallelSlot *slot, int n_slots) + { + ahlog(GAH,1,"is work being done?\n"); + while(n_slots--) + { + if (slot->pid > 0) + return true; + slot++; + } + ahlog(GAH,1,"work is not being done\n"); + return false; + } + + static int + get_next_slot(ParallelSlot *slots, int n_slots) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == 0) + { + ahlog(GAH,1,"available slots is %d\n",i); + return i; + } + } + ahlog(GAH,1,"No slot available\n"); + return NO_SLOT; + } + + static TocEntry* + get_next_work_item(ArchiveHandle *AH) + { + TocEntry *te; + teReqs reqs; + + /* just search from the top of the queue until we find an available item. + * Note that the queue isn't reordered in the current implementation. If + * we ever do reorder it, then certain code that processes entries from the + * current item to the end of the queue will probably need to be + * re-examined. + */ + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (!te->prestored && te->nDeps < 1) + { + /* make sure it's not an ACL */ + reqs = _tocEntryRequired (te, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) + { + ahlog(AH,1,"next item is %d\n",te->dumpId); + return te; + } + } + } + ahlog(AH,1,"No item ready\n"); + return NULL; + } + + static void + prestore(ArchiveHandle *AH, TocEntry *te) + { + RestoreOptions *ropt = AH->ropt; + int retval; + + /* close and reopen the archive so we have a private copy that doesn't + * stomp on anyone else's file pointer + */ + + (AH->ReopenPtr)(AH); + + ConnectDatabase((Archive *)AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + retval = _restore_one_te(AH, te, ropt, true); + + PQfinish(AH->connection); + exit(retval); + + } + + static void + mark_work_done(ArchiveHandle *AH, pid_t worker, + ParallelSlot *slots, int n_slots) + { + + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == worker) + { + te = slots[i].te; + slots[i].pid = 0; + slots[i].te = NULL; + slots[i].dumpId = 0; + break; + } + } + + /* Assert (te != NULL); */ + + _reduce_dependencies(AH,te); + + + } + + + /* + * Make sure the head of each dependency chain is a live item + * + * Once this is established the property will be maintained by + * _reduce_dependencies called as items are done. + */ + static void + _fix_dependency_counts(ArchiveHandle *AH) + { + TocEntry * te; + RestoreOptions * ropt = AH->ropt; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + static void + _reduce_dependencies(ArchiveHandle * AH, TocEntry *te) + { + DumpId item = te->dumpId; + RestoreOptions * ropt = AH->ropt; + int i; + + for (te = te->next; te != AH->toc; te = te->next) + { + for (i = 0; i < te->nDeps; i++) + if (te->dependencies[i] == item) + { + te->nDeps = te->nDeps - 1; + /* + * If this item won't in fact be done, and is now at + * 0 dependency count, we pretend it's been done and + * reduce the dependency counts of all the things that + * depend on it, by a recursive call + */ + if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + + break; + } + } + + } + + + /* Public */ + void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; *************** *** 171,176 **** --- 563,582 ---- AH->noTocComments = 1; } + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in thisinstallation)\n"); + } + } + #endif + /* * Work out if we have an implied data-only restore. This can happen if * the dump was data only or if the user has used a toc list to exclude *************** *** 270,409 **** */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! AH->currentTE = te; ! ! /* Work out what, if anything, we want from this entry */ ! reqs = _tocEntryRequired(te, ropt, false); ! ! /* Dump any relevant dump warnings to stderr */ ! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) ! { ! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->defn); ! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); ! } ! ! defnDumped = false; ! ! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ ! { ! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); ! ! _printTocEntry(AH, te, ropt, false, false); ! defnDumped = true; ! ! /* ! * If we could not create a table and --no-data-for-failed-tables ! * was given, ignore the corresponding TABLE DATA ! */ ! if (ropt->noDataForFailedTables && ! AH->lastErrorTE == te && ! strcmp(te->desc, "TABLE") == 0) ! { ! TocEntry *tes; ! ! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", ! te->tag); ! ! for (tes = te->next; tes != AH->toc; tes = tes->next) ! { ! if (strcmp(tes->desc, "TABLE DATA") == 0 && ! strcmp(tes->tag, te->tag) == 0 && ! strcmp(tes->namespace ? tes->namespace : "", ! te->namespace ? te->namespace : "") == 0) ! { ! /* mark it unwanted */ ! ropt->idWanted[tes->dumpId - 1] = false; ! break; ! } ! } ! } ! ! /* If we created a DB, connect to it... */ ! if (strcmp(te->desc, "DATABASE") == 0) ! { ! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); ! _reconnectToDB(AH, te->tag); ! } ! } ! ! /* ! * If we have a data component, then process it ! */ ! if ((reqs & REQ_DATA) != 0) ! { ! /* ! * hadDumper will be set if there is genuine data component for ! * this node. Otherwise, we need to check the defn field for ! * statements that need to be executed in data-only restores. ! */ ! if (te->hadDumper) ! { ! /* ! * If we can output the data, then restore it. ! */ ! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) ! { ! #ifndef HAVE_LIBZ ! if (AH->compression != 0) ! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supportedin this installation)\n"); ! #endif ! ! _printTocEntry(AH, te, ropt, true, false); ! ! if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) ! { ! ahlog(AH, 1, "restoring %s\n", te->desc); ! ! _selectOutputSchema(AH, "pg_catalog"); ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! } ! else ! { ! _disableTriggersIfNecessary(AH, te, ropt); ! ! /* Select owner and schema as necessary */ ! _becomeOwner(AH, te); ! _selectOutputSchema(AH, te->namespace); ! ! ahlog(AH, 1, "restoring data for table \"%s\"\n", ! te->tag); ! ! /* ! * If we have a copy statement, use it. As of V1.3, ! * these are separate to allow easy import from ! * withing a database connection. Pre 1.3 archives can ! * not use DB connections and are sent to output only. ! * ! * For V1.3+, the table data MUST have a copy ! * statement so that we can go into appropriate mode ! * with libpq. ! */ ! if (te->copyStmt && strlen(te->copyStmt) > 0) ! { ! ahprintf(AH, "%s", te->copyStmt); ! AH->writingCopyData = true; ! } ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! ! AH->writingCopyData = false; ! ! _enableTriggersIfNecessary(AH, te, ropt); ! } ! } ! } ! else if (!defnDumped) ! { ! /* If we haven't already dumped the defn part, do so now */ ! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); ! _printTocEntry(AH, te, ropt, false, false); ! } ! } ! } /* end loop over TOC entries */ /* * Scan TOC again to output ownership commands and ACLs --- 676,683 ---- */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! (void) _restore_one_te(AH, te, ropt, false); ! } /* * Scan TOC again to output ownership commands and ACLs *************** *** 451,456 **** --- 725,905 ---- } } + static int + _restore_one_te(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) + { + teReqs reqs; + bool defnDumped; + int retval = 0; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } + + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; + + /* + * If we could not create a table and --no-data-for-failed-tables + * was given, ignore the corresponding TABLE DATA + * + * For the parallel case this must be done in the parent, so we just + * set a return value. + */ + if (ropt->noDataForFailedTables && + AH->lastErrorTE == te && + strcmp(te->desc, "TABLE") == 0) + { + if (is_parallel) + retval = 1; + else + _inhibit_data_for_failed_table(AH,te); + } + + /* If we created a DB, connect to it... */ + /* won't happen in parallel restore */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for + * this node. Otherwise, we need to check the defn field for + * statements that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + if (ropt->truncate_before_load) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + + ahprintf(AH, "TRUNCATE TABLE %s;\n\n", + fmtId(te->tag)); } + + /* + * If we have a copy statement, use it. As of V1.3, + * these are separate to allow easy import from + * withing a database connection. Pre 1.3 archives can + * not use DB connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy + * statement so that we can go into appropriate mode + * with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; + + if (ropt->truncate_before_load) + { + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); + } + + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } + } + + return retval; + } + + static void + _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te) + { + TocEntry *tes; + RestoreOptions *ropt = AH->ropt; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = te->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted */ + ropt->idWanted[tes->dumpId - 1] = false; + + _reduce_dependencies(AH, tes); + break; + } + } + } + /* * Allocate a new RestoreOptions block. * This is mainly so we can initialize it, but also for future expansion, Index: pg_backup_archiver.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v retrieving revision 1.76 diff -c -r1.76 pg_backup_archiver.h *** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76 --- pg_backup_archiver.h 26 Sep 2008 15:15:39 -0000 *************** *** 99,104 **** --- 99,105 ---- struct _restoreList; typedef void (*ClosePtr) (struct _archiveHandle * AH); + typedef void (*ReopenPtr) (struct _archiveHandle * AH); typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te); typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te); *************** *** 212,217 **** --- 213,219 ---- WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */ ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */ ClosePtr ClosePtr; /* Close the archive */ + ReopenPtr ReopenPtr; /* Reopen the archive */ WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data * associated with the current archive * format */ *************** *** 231,236 **** --- 233,239 ---- char *archdbname; /* DB name *read* from archive */ bool requirePassword; PGconn *connection; + char *cachepw; int connectToDB; /* Flag to indicate if direct DB connection is * required */ bool writingCopyData; /* True when we are sending COPY data */ *************** *** 284,289 **** --- 287,293 ---- DumpId dumpId; bool hadDumper; /* Archiver was passed a dumper routine (used * in restore) */ + bool prestored; /* keep track of parallel restore */ char *tag; /* index tag */ char *namespace; /* null or empty string if not in a schema */ char *tablespace; /* null if not in a tablespace; empty string Index: pg_backup_custom.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v retrieving revision 1.40 diff -c -r1.40 pg_backup_custom.c *** pg_backup_custom.c 28 Oct 2007 21:55:52 -0000 1.40 --- pg_backup_custom.c 26 Sep 2008 15:15:39 -0000 *************** *** 40,45 **** --- 40,46 ---- static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); + static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); *************** *** 120,125 **** --- 121,127 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; *************** *** 835,840 **** --- 837,879 ---- AH->FH = NULL; } + static void + _ReopenArchive(ArchiveHandle *AH) + { + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + { + die_horribly(AH,modulename,"Can only reopen input archives"); + } + else if ((! AH->fSpec) || strcmp(AH->fSpec, "") == 0) + { + die_horribly(AH,modulename,"Cannot reopen stdin"); + } + + tpos = ftello(AH->FH); + + if (fclose(AH->FH) != 0) + die_horribly(AH, modulename, "could not close archive file: %s\n", + strerror(errno)); + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); + + if (ctx->hasSeek) + { + fseeko(AH->FH, tpos, SEEK_SET); + } + else + { + die_horribly(AH,modulename,"cannot reopen non-seekable file"); + } + + } + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- Index: pg_backup_db.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v retrieving revision 1.80 diff -c -r1.80 pg_backup_db.c *** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80 --- pg_backup_db.c 26 Sep 2008 15:15:39 -0000 *************** *** 206,220 **** if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } else AH->requirePassword = false; /* * Start the connection. Loop until we have a password if requested by --- 206,226 ---- if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } + else if (reqPwd) + { + password = AH->cachepw; + } else + { AH->requirePassword = false; + } /* * Start the connection. Loop until we have a password if requested by *************** *** 241,247 **** } while (new_pass); if (password) ! free(password); /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) --- 247,253 ---- } while (new_pass); if (password) ! AH->cachepw = password; /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) Index: pg_backup_files.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v retrieving revision 1.34 diff -c -r1.34 pg_backup_files.c *** pg_backup_files.c 28 Oct 2007 21:55:52 -0000 1.34 --- pg_backup_files.c 26 Sep 2008 15:15:39 -0000 *************** *** 87,92 **** --- 87,93 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_backup_tar.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v retrieving revision 1.62 diff -c -r1.62 pg_backup_tar.c *** pg_backup_tar.c 15 Nov 2007 21:14:41 -0000 1.62 --- pg_backup_tar.c 26 Sep 2008 15:15:39 -0000 *************** *** 143,148 **** --- 143,149 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_restore.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v retrieving revision 1.88 diff -c -r1.88 pg_restore.c *** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88 --- pg_restore.c 26 Sep 2008 15:15:39 -0000 *************** *** 78,83 **** --- 78,84 ---- static int no_data_for_failed_tables = 0; static int outputNoTablespaces = 0; static int use_setsessauth = 0; + static int truncate_before_load = 0; struct option cmdopts[] = { {"clean", 0, NULL, 'c'}, *************** *** 92,97 **** --- 93,99 ---- {"ignore-version", 0, NULL, 'i'}, {"index", 1, NULL, 'I'}, {"list", 0, NULL, 'l'}, + {"multi-thread",1,NULL,'m'}, {"no-privileges", 0, NULL, 'x'}, {"no-acl", 0, NULL, 'x'}, {"no-owner", 0, NULL, 'O'}, *************** *** 114,119 **** --- 116,122 ---- {"disable-triggers", no_argument, &disable_triggers, 1}, {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1}, {"no-tablespaces", no_argument, &outputNoTablespaces, 1}, + {"truncate-before-load", no_argument, &truncate_before_load, 1}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {NULL, 0, NULL, 0} *************** *** 139,145 **** } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) --- 142,148 ---- } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) *************** *** 182,187 **** --- 185,194 ---- opts->tocFile = strdup(optarg); break; + case 'm': + opts->number_of_threads = atoi(optarg); /* XXX fix error checking */ + break; + case 'n': /* Dump data for this schema only */ opts->schemaNames = strdup(optarg); break; *************** *** 262,268 **** break; case 0: ! /* This covers the long options equivalent to -X xxx. */ break; case '1': /* Restore data in a single transaction */ --- 269,278 ---- break; case 0: ! /* ! * This covers the long options without a short equivalent, ! * including those equivalent to -X xxx. ! */ break; case '1': /* Restore data in a single transaction */ *************** *** 299,304 **** --- 309,329 ---- opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; opts->use_setsessauth = use_setsessauth; + opts->truncate_before_load = truncate_before_load; + + if (opts->single_txn) + { + if (opts->number_of_threads > 1) + { + write_msg(NULL, "single transaction not compatible with multi-threading"); + exit(1); + } + else if (opts->truncate_before_load) + { + write_msg(NULL, "single transaction not compatible with truncate-before-load"); + exit(1); + } + } if (opts->formatName) { *************** *** 330,335 **** --- 355,362 ---- AH = OpenArchive(inputFileSpec, opts->format); + /* XXX looks like we'll have to do sanity checks in the parallel archiver */ + /* Let the archiver know how noisy to be */ AH->verbose = opts->verbose; *************** *** 351,356 **** --- 378,385 ---- if (opts->tocSummary) PrintTOCSummary(AH, opts); + else if (opts->number_of_threads > 1) + RestoreArchiveParallel(AH, opts); else RestoreArchive(AH, opts);
Andrew Dunstan wrote: > > > This version of the patch should fix the "shared file descriptor" bug > Russell Smith noticed. It also disables the 1/2 second sleep between > forks, so the performance on a small db (regression) is vastly improved. this works better but there is something fishy still - using the same dump file I get a proper restore using pg_restore normally. If I however use -m for a parallel one I only get parts (in this case only 243 of the 709 tables) of the database restored ... Stefan
Stefan Kaltenbrunner wrote: > Andrew Dunstan wrote: >> >> >> This version of the patch should fix the "shared file descriptor" bug >> Russell Smith noticed. It also disables the 1/2 second sleep between >> forks, so the performance on a small db (regression) is vastly improved. > > this works better but there is something fishy still - using the same > dump file I get a proper restore using pg_restore normally. If I > however use -m for a parallel one I only get parts (in this case only > 243 of the 709 tables) of the database restored ... > > > Yes, there are several funny things going on, including some stuff with dependencies. I'll have a new patch tomorrow with luck. Thanks for testing. cheers andrew
On Fri, 26 Sep 2008 17:10:44 -0400 Andrew Dunstan <andrew@dunslane.net> wrote: > Yes, there are several funny things going on, including some stuff > with dependencies. I'll have a new patch tomorrow with luck. Thanks > for testing. O.k. I took at look at the patch itself and although I don't understand all of it there were a couple of red flags to me: + if (ropt->create) + die_horribly(AH,modulename, + "parallel restore is incompatible with --create\n"); + This seems like an odd limitation. In my mind, the schema would not be restored in parallel. The schema before data would restore as a single thread. Even the largest schemas would only take minutes (if that). Thus something like --create should never be a problem. I also noticed you check if we have zlib? Is it even possible to use the c format without it? (that would be new to me). I noticed this line: + while((next_work_item = get_next_work_item(AH)) != NULL) + { + /* XXX need to improve this test in case there is no table data */ + /* need to test for indexes, FKs, PK, Unique, etc */ + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) + break; + (void) _restore_one_te(AH, next_work_item, ropt, false); + + next_work_item->prestored = true; + + _reduce_dependencies(AH,next_work_item); + } Intead of the TABLE DATA compare, perhaps it makes sense to back patch pg_dump to have a line delimiter in the TOC? That way even if there is no TABLE DATA there would be a delimiter that says: --- BEGIN TABLE DATA --- END TABLE DATA Thus if nothing is there... nothing is there? + /* delay just long enough betweek forks to give the catalog some + * breathing space. Without this sleep I got + * "tuple concurrently updated" errors. + */ + pg_usleep(500000); + continue; /* in case the slots are not yet full */ + } Could that be solved with a lock instead? Once the lock is released.... Anyway... just some thoughts. I apologize if I misunderstood the patch. Sincerely, Joshua D. Drake > > cheers > > andrew > -- The PostgreSQL Company since 1997: http://www.commandprompt.com/ PostgreSQL Community Conference: http://www.postgresqlconference.org/ United States PostgreSQL Association: http://www.postgresql.us/
Joshua Drake wrote: > On Fri, 26 Sep 2008 17:10:44 -0400 > Andrew Dunstan <andrew@dunslane.net> wrote: > > >> Yes, there are several funny things going on, including some stuff >> with dependencies. I'll have a new patch tomorrow with luck. Thanks >> for testing. >> > > O.k. I took at look at the patch itself and although I don't understand > all of it there were a couple of red flags to me: > > + if (ropt->create) > + die_horribly(AH,modulename, > + "parallel restore is > incompatible with --create\n"); > + > > This seems like an odd limitation. In my mind, the schema would not be > restored in parallel. The schema before data would restore as a single > thread. Even the largest schemas would only take minutes (if that). > Thus something like --create should never be a problem. > Originally I had everything restoring in parallel. Now I am in fact (as the patch should have showed you) restoring the first part in a single thread like you say. Thus I probably can relax that restriction. I will look and see. > I also noticed you check if we have zlib? Is it even possible to use > the c format without it? (that would be new to me). > > I noticed this line: > > > + while((next_work_item = get_next_work_item(AH)) != NULL) > + { > + /* XXX need to improve this test in case there is no > table data */ > + /* need to test for indexes, FKs, PK, Unique, etc */ > + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) > + break; > + (void) _restore_one_te(AH, next_work_item, ropt, > false); > + > + next_work_item->prestored = true; > + > + _reduce_dependencies(AH,next_work_item); > + } > > > Intead of the TABLE DATA compare, perhaps it makes sense to back patch > pg_dump to have a line delimiter in the TOC? That way even if there is > no TABLE DATA there would be a delimiter that says: > > --- BEGIN TABLE DATA > --- END TABLE DATA > > Thus if nothing is there... nothing is there? > The TOC isn't stored as a text file. So we'll need to look by entry tags. It's no big deal - there aren't a huge number. > + /* delay just long enough betweek forks to > give the catalog some > + * breathing space. Without this sleep I got > + * "tuple concurrently updated" errors. > + */ > + pg_usleep(500000); > + continue; /* in case the slots are not yet > full */ > + } > > Could that be solved with a lock instead? Once the lock is released.... > That sleep is now gone. > Anyway... just some thoughts. I apologize if I misunderstood the patch. > > > No problem. Thanks for looking. cheers andrew
Andrew Dunstan wrote: > > >> >> this works better but there is something fishy still - using the same >> dump file I get a proper restore using pg_restore normally. If I >> however use -m for a parallel one I only get parts (in this case only >> 243 of the 709 tables) of the database restored ... >> >> >> > > Yes, there are several funny things going on, including some stuff > with dependencies. I'll have a new patch tomorrow with luck. Thanks > for testing. > > OK, in this version a whole heap of bugs are fixed, mainly those to do with dependencies and saved state. I get identical row counts in the source and destination now, quite reliably. cheers andrew Index: pg_backup.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v retrieving revision 1.47 diff -c -r1.47 pg_backup.h *** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47 --- pg_backup.h 29 Sep 2008 02:43:51 -0000 *************** *** 123,128 **** --- 123,130 ---- int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; + int number_of_threads; + bool truncate_before_load; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; *************** *** 165,170 **** --- 167,173 ---- extern void CloseArchive(Archive *AH); extern void RestoreArchive(Archive *AH, RestoreOptions *ropt); + extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt); /* Open an existing archive */ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); Index: pg_backup_archiver.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v retrieving revision 1.158 diff -c -r1.158 pg_backup_archiver.c *** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158 --- pg_backup_archiver.c 29 Sep 2008 02:43:52 -0000 *************** *** 27,38 **** --- 27,50 ---- #include <unistd.h> + #include <sys/types.h> + #include <sys/wait.h> + + #ifdef WIN32 #include <io.h> #endif #include "libpq/libpq-fs.h" + typedef struct _parallel_slot + { + pid_t pid; + TocEntry *te; + DumpId dumpId; + } ParallelSlot; + + #define NO_SLOT (-1) const char *progname; *************** *** 70,76 **** --- 82,99 ---- static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); + static bool work_is_being_done(ParallelSlot *slot, int n_slots); + static int get_next_slot(ParallelSlot *slots, int n_slots); + static TocEntry *get_next_work_item(ArchiveHandle *AH); + static void prestore(ArchiveHandle *AH, TocEntry *te); + static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots); + static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel); + static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te); + static void _fix_dependency_counts(ArchiveHandle *AH); + static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te); + + static ArchiveHandle *GAH; /* * Wrapper functions. *************** *** 125,137 **** /* Public */ void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; --- 148,579 ---- /* Public */ void + RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt) + { + + ArchiveHandle *AH = (ArchiveHandle *) AHX; + ParallelSlot *slots; + int next_slot; + TocEntry *next_work_item = NULL; + int work_status; + pid_t ret_child; + int n_slots = ropt->number_of_threads; + TocEntry *te; + teReqs reqs; + + + /* AH->debugLevel = 99; */ + /* some routines that use ahlog() don't get passed AH */ + GAH = AH; + + ahlog(AH,1,"entering RestoreARchiveParallel\n"); + + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots); + AH->ropt = ropt; + + /* + if (ropt->create) + die_horribly(AH,modulename, + "parallel restore is incompatible with --create\n"); + */ + + + if (ropt->dropSchema) + die_horribly(AH,modulename, + "parallel restore is incompatible with --clean\n"); + + if (!ropt->useDB) + die_horribly(AH,modulename, + "parallel restore requires direct database connection\n"); + + + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, + "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } + #endif + + ahlog(AH, 1, "connecting to database for restore\n"); + if (AH->version < K_VERS_1_3) + die_horribly(AH, modulename, + "direct database connections are not supported in pre-1.3 archives\n"); + + /* XXX Should get this from the archive */ + AHX->minRemoteVersion = 070100; + AHX->maxRemoteVersion = 999999; + + /* correct dependency counts in case we're doing a partial restore */ + if (ropt->idWanted == NULL) + InitDummyWantedList(AHX,ropt); + _fix_dependency_counts(AH); + + /* + * Since we're talking to the DB directly, don't send comments since they + * obscure SQL when displaying errors + */ + AH->noTocComments = 1; + + /* Do all the early stuff in a single connection in the parent. + * There's no great point in running it in parallel and it will actually + * run faster in a single connection because we avoid all the connection + * and setup overhead, including the 0.5s sleep below. + */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + while((next_work_item = get_next_work_item(AH)) != NULL) + { + /* XXX need to improve this test in case there is no table data */ + /* need to test for indexes, FKs, PK, Unique, etc */ + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) + break; + (void) _restore_one_te(AH, next_work_item, ropt, false); + + next_work_item->prestored = true; + + _reduce_dependencies(AH,next_work_item); + } + + + /* + * now close parent connection in prep for parallel step. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* blow away any preserved state from the previous connection */ + + if (AH->currSchema) + free(AH->currSchema); + AH->currSchema = strdup(""); + if (AH->currUser) + free(AH->currUser); + AH->currUser = strdup(""); + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; + AH->currWithOids = -1; + + /* main parent loop */ + + ahlog(AH,1,"entering main loop\n"); + + while (((next_work_item = get_next_work_item(AH)) != NULL) || + (work_is_being_done(slots,n_slots))) + { + if (next_work_item != NULL && + ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT)) + { + /* there is work still to do and a worker slot available */ + + pid_t child; + + next_work_item->prestored = true; + + child = fork(); + if (child == 0) + { + prestore(AH,next_work_item); + /* should not happen ... we expect prestore to exit */ + exit(1); + } + else if (child > 0) + { + slots[next_slot].pid = child; + slots[next_slot].te = next_work_item; + slots[next_slot].dumpId = next_work_item->dumpId; + } + else + { + /* XXX fork error - handle it! */ + } + /* delay just long enough betweek forks to give the catalog some + * breathing space. Without this sleep I got + * "tuple concurrently updated" errors. + */ + /* pg_usleep(500000); */ + continue; /* in case the slots are not yet full */ + } + /* if we get here there must be work being done */ + ret_child = wait(&work_status); + + if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0) + { + mark_work_done(AH, ret_child, slots, n_slots); + } + else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == ret_child) + _inhibit_data_for_failed_table(AH, slots[i].te); + break; + } + mark_work_done(AH, ret_child, slots, n_slots); + } + else + { + /* XXX something went wrong - deal with it */ + } + } + + /* + * now process the ACLs - no need to do this in parallel + */ + + /* reconnect from parent */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Scan TOC to output ownership commands and ACLs + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, true); + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "setting owner and privileges for %s %s\n", + te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, true); + } + } + + /* clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + } + + static bool + work_is_being_done(ParallelSlot *slot, int n_slots) + { + ahlog(GAH,1,"is work being done?\n"); + while(n_slots--) + { + if (slot->pid > 0) + return true; + slot++; + } + ahlog(GAH,1,"work is not being done\n"); + return false; + } + + static int + get_next_slot(ParallelSlot *slots, int n_slots) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == 0) + { + ahlog(GAH,1,"available slots is %d\n",i); + return i; + } + } + ahlog(GAH,1,"No slot available\n"); + return NO_SLOT; + } + + static TocEntry* + get_next_work_item(ArchiveHandle *AH) + { + TocEntry *te; + teReqs reqs; + + /* just search from the top of the queue until we find an available item. + * Note that the queue isn't reordered in the current implementation. If + * we ever do reorder it, then certain code that processes entries from the + * current item to the end of the queue will probably need to be + * re-examined. + */ + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (!te->prestored && te->depCount < 1) + { + /* make sure it's not an ACL */ + reqs = _tocEntryRequired (te, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) + { + ahlog(AH,1,"next item is %d\n",te->dumpId); + return te; + } + } + } + ahlog(AH,1,"No item ready\n"); + return NULL; + } + + static void + prestore(ArchiveHandle *AH, TocEntry *te) + { + RestoreOptions *ropt = AH->ropt; + int retval; + + /* close and reopen the archive so we have a private copy that doesn't + * stomp on anyone else's file pointer + */ + + (AH->ReopenPtr)(AH); + + ConnectDatabase((Archive *)AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + retval = _restore_one_te(AH, te, ropt, true); + + PQfinish(AH->connection); + exit(retval); + + } + + static void + mark_work_done(ArchiveHandle *AH, pid_t worker, + ParallelSlot *slots, int n_slots) + { + + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == worker) + { + te = slots[i].te; + slots[i].pid = 0; + slots[i].te = NULL; + slots[i].dumpId = 0; + break; + } + } + + /* Assert (te != NULL); */ + + _reduce_dependencies(AH,te); + + + } + + + /* + * Make sure the head of each dependency chain is a live item + * + * Once this is established the property will be maintained by + * _reduce_dependencies called as items are done. + */ + static void + _fix_dependency_counts(ArchiveHandle *AH) + { + TocEntry * te; + RestoreOptions * ropt = AH->ropt; + bool * RealDumpIds; + int i; + + + RealDumpIds = calloc(AH->maxDumpId, sizeof(bool)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + RealDumpIds[te->dumpId-1] = true; + if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + /* + * It is possible that the dependencies list items that are + * not in the archive at all. Reduce the depcounts so those get + * ignored. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + for (i = 0; i < te->nDeps; i++) + if (!RealDumpIds[te->dependencies[i]-1]) + te->depCount--; + } + + static void + _reduce_dependencies(ArchiveHandle * AH, TocEntry *te) + { + DumpId item = te->dumpId; + RestoreOptions * ropt = AH->ropt; + int i; + + for (te = te->next; te != AH->toc; te = te->next) + { + if (te->nDeps == 0) + continue; + + for (i = 0; i < te->nDeps; i++) + if (te->dependencies[i] == item) + te->depCount = te->depCount - 1; + + /* If this is a table data item we are making available, + * make the table's dependencies depend on this item instead of + * the table definition, so they + * don't get scheduled until the data is loaded. + * Have to do this now before the main loop gets to anything + * further down the list. + */ + if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0) + { + TocEntry *tes; + int j; + for (tes = te->next; tes != AH->toc; tes = tes->next) + for (j = 0; j < tes->nDeps; j++) + if (tes->dependencies[j] == item) + tes->dependencies[j] = te->dumpId; + } + + /* + * If this item won't in fact be done, and is now at + * 0 dependency count, we pretend it's been done and + * reduce the dependency counts of all the things that + * depend on it, by a recursive call + */ + if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + } + + + /* Public */ + void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; *************** *** 171,176 **** --- 613,632 ---- AH->noTocComments = 1; } + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in thisinstallation)\n"); + } + } + #endif + /* * Work out if we have an implied data-only restore. This can happen if * the dump was data only or if the user has used a toc list to exclude *************** *** 270,409 **** */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! AH->currentTE = te; ! ! /* Work out what, if anything, we want from this entry */ ! reqs = _tocEntryRequired(te, ropt, false); ! ! /* Dump any relevant dump warnings to stderr */ ! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) ! { ! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->defn); ! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); ! } ! ! defnDumped = false; ! ! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ ! { ! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); ! ! _printTocEntry(AH, te, ropt, false, false); ! defnDumped = true; ! ! /* ! * If we could not create a table and --no-data-for-failed-tables ! * was given, ignore the corresponding TABLE DATA ! */ ! if (ropt->noDataForFailedTables && ! AH->lastErrorTE == te && ! strcmp(te->desc, "TABLE") == 0) ! { ! TocEntry *tes; ! ! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", ! te->tag); ! ! for (tes = te->next; tes != AH->toc; tes = tes->next) ! { ! if (strcmp(tes->desc, "TABLE DATA") == 0 && ! strcmp(tes->tag, te->tag) == 0 && ! strcmp(tes->namespace ? tes->namespace : "", ! te->namespace ? te->namespace : "") == 0) ! { ! /* mark it unwanted */ ! ropt->idWanted[tes->dumpId - 1] = false; ! break; ! } ! } ! } ! ! /* If we created a DB, connect to it... */ ! if (strcmp(te->desc, "DATABASE") == 0) ! { ! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); ! _reconnectToDB(AH, te->tag); ! } ! } ! ! /* ! * If we have a data component, then process it ! */ ! if ((reqs & REQ_DATA) != 0) ! { ! /* ! * hadDumper will be set if there is genuine data component for ! * this node. Otherwise, we need to check the defn field for ! * statements that need to be executed in data-only restores. ! */ ! if (te->hadDumper) ! { ! /* ! * If we can output the data, then restore it. ! */ ! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) ! { ! #ifndef HAVE_LIBZ ! if (AH->compression != 0) ! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supportedin this installation)\n"); ! #endif ! ! _printTocEntry(AH, te, ropt, true, false); ! ! if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) ! { ! ahlog(AH, 1, "restoring %s\n", te->desc); ! ! _selectOutputSchema(AH, "pg_catalog"); ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! } ! else ! { ! _disableTriggersIfNecessary(AH, te, ropt); ! ! /* Select owner and schema as necessary */ ! _becomeOwner(AH, te); ! _selectOutputSchema(AH, te->namespace); ! ! ahlog(AH, 1, "restoring data for table \"%s\"\n", ! te->tag); ! ! /* ! * If we have a copy statement, use it. As of V1.3, ! * these are separate to allow easy import from ! * withing a database connection. Pre 1.3 archives can ! * not use DB connections and are sent to output only. ! * ! * For V1.3+, the table data MUST have a copy ! * statement so that we can go into appropriate mode ! * with libpq. ! */ ! if (te->copyStmt && strlen(te->copyStmt) > 0) ! { ! ahprintf(AH, "%s", te->copyStmt); ! AH->writingCopyData = true; ! } ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! ! AH->writingCopyData = false; ! ! _enableTriggersIfNecessary(AH, te, ropt); ! } ! } ! } ! else if (!defnDumped) ! { ! /* If we haven't already dumped the defn part, do so now */ ! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); ! _printTocEntry(AH, te, ropt, false, false); ! } ! } ! } /* end loop over TOC entries */ /* * Scan TOC again to output ownership commands and ACLs --- 726,733 ---- */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! (void) _restore_one_te(AH, te, ropt, false); ! } /* * Scan TOC again to output ownership commands and ACLs *************** *** 451,456 **** --- 775,955 ---- } } + static int + _restore_one_te(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) + { + teReqs reqs; + bool defnDumped; + int retval = 0; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } + + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; + + /* + * If we could not create a table and --no-data-for-failed-tables + * was given, ignore the corresponding TABLE DATA + * + * For the parallel case this must be done in the parent, so we just + * set a return value. + */ + if (ropt->noDataForFailedTables && + AH->lastErrorTE == te && + strcmp(te->desc, "TABLE") == 0) + { + if (is_parallel) + retval = 1; + else + _inhibit_data_for_failed_table(AH,te); + } + + /* If we created a DB, connect to it... */ + /* won't happen in parallel restore */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for + * this node. Otherwise, we need to check the defn field for + * statements that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + if (ropt->truncate_before_load) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + + ahprintf(AH, "TRUNCATE TABLE %s;\n\n", + fmtId(te->tag)); } + + /* + * If we have a copy statement, use it. As of V1.3, + * these are separate to allow easy import from + * withing a database connection. Pre 1.3 archives can + * not use DB connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy + * statement so that we can go into appropriate mode + * with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; + + if (ropt->truncate_before_load) + { + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); + } + + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } + } + + return retval; + } + + static void + _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te) + { + TocEntry *tes; + RestoreOptions *ropt = AH->ropt; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = te->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted */ + ropt->idWanted[tes->dumpId - 1] = false; + + _reduce_dependencies(AH, tes); + break; + } + } + } + /* * Allocate a new RestoreOptions block. * This is mainly so we can initialize it, but also for future expansion, *************** *** 653,662 **** while (te != AH->toc) { if (_tocEntryRequired(te, ropt, true) != 0) ! ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId, te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); te = te->next; } --- 1152,1167 ---- while (te != AH->toc) { if (_tocEntryRequired(te, ropt, true) != 0) ! { ! int i; ! ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps); ! for (i=0 ;i<te->nDeps; i++) ! ahprintf(AH, "%d ",te->dependencies[i]); ! ahprintf(AH, "] %u %u %s %s %s %s\n", te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); + } te = te->next; } *************** *** 1948,1965 **** --- 2453,2473 ---- deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx); te->dependencies = deps; te->nDeps = depIdx; + te->depCount = depIdx; } else { free(deps); te->dependencies = NULL; te->nDeps = 0; + te->depCount = 0; } } else { te->dependencies = NULL; te->nDeps = 0; + te->depCount = 0; } if (AH->ReadExtraTocPtr) Index: pg_backup_archiver.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v retrieving revision 1.76 diff -c -r1.76 pg_backup_archiver.h *** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76 --- pg_backup_archiver.h 29 Sep 2008 02:43:52 -0000 *************** *** 99,104 **** --- 99,105 ---- struct _restoreList; typedef void (*ClosePtr) (struct _archiveHandle * AH); + typedef void (*ReopenPtr) (struct _archiveHandle * AH); typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te); typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te); *************** *** 212,217 **** --- 213,219 ---- WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */ ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */ ClosePtr ClosePtr; /* Close the archive */ + ReopenPtr ReopenPtr; /* Reopen the archive */ WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data * associated with the current archive * format */ *************** *** 231,236 **** --- 233,239 ---- char *archdbname; /* DB name *read* from archive */ bool requirePassword; PGconn *connection; + char *cachepw; int connectToDB; /* Flag to indicate if direct DB connection is * required */ bool writingCopyData; /* True when we are sending COPY data */ *************** *** 284,289 **** --- 287,293 ---- DumpId dumpId; bool hadDumper; /* Archiver was passed a dumper routine (used * in restore) */ + bool prestored; /* keep track of parallel restore */ char *tag; /* index tag */ char *namespace; /* null or empty string if not in a schema */ char *tablespace; /* null if not in a tablespace; empty string *************** *** 296,301 **** --- 300,306 ---- char *copyStmt; DumpId *dependencies; /* dumpIds of objects this one depends on */ int nDeps; /* number of dependencies */ + int depCount; /* adjustable tally of dependencies */ DataDumperPtr dataDumper; /* Routine to dump data for object */ void *dataDumperArg; /* Arg for above routine */ Index: pg_backup_custom.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v retrieving revision 1.40 diff -c -r1.40 pg_backup_custom.c *** pg_backup_custom.c 28 Oct 2007 21:55:52 -0000 1.40 --- pg_backup_custom.c 29 Sep 2008 02:43:52 -0000 *************** *** 40,45 **** --- 40,46 ---- static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); + static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); *************** *** 120,125 **** --- 121,127 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; *************** *** 835,840 **** --- 837,879 ---- AH->FH = NULL; } + static void + _ReopenArchive(ArchiveHandle *AH) + { + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + { + die_horribly(AH,modulename,"Can only reopen input archives"); + } + else if ((! AH->fSpec) || strcmp(AH->fSpec, "") == 0) + { + die_horribly(AH,modulename,"Cannot reopen stdin"); + } + + tpos = ftello(AH->FH); + + if (fclose(AH->FH) != 0) + die_horribly(AH, modulename, "could not close archive file: %s\n", + strerror(errno)); + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); + + if (ctx->hasSeek) + { + fseeko(AH->FH, tpos, SEEK_SET); + } + else + { + die_horribly(AH,modulename,"cannot reopen non-seekable file"); + } + + } + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- Index: pg_backup_db.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v retrieving revision 1.80 diff -c -r1.80 pg_backup_db.c *** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80 --- pg_backup_db.c 29 Sep 2008 02:43:52 -0000 *************** *** 138,148 **** ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); ! if (AH->requirePassword) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); } do --- 138,153 ---- ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); ! if (AH->requirePassword && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); + AH->requirePassword = true; + } + else if (AH->requirePassword) + { + password = AH->cachepw; } do *************** *** 174,180 **** } } while (new_pass); ! if (password) free(password); /* check for version mismatch */ --- 179,185 ---- } } while (new_pass); ! if (password != AH->cachepw) free(password); /* check for version mismatch */ *************** *** 206,220 **** if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } else AH->requirePassword = false; /* * Start the connection. Loop until we have a password if requested by --- 211,231 ---- if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } + else if (reqPwd) + { + password = AH->cachepw; + } else + { AH->requirePassword = false; + } /* * Start the connection. Loop until we have a password if requested by *************** *** 241,247 **** } while (new_pass); if (password) ! free(password); /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) --- 252,258 ---- } while (new_pass); if (password) ! AH->cachepw = password; /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) Index: pg_backup_files.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v retrieving revision 1.34 diff -c -r1.34 pg_backup_files.c *** pg_backup_files.c 28 Oct 2007 21:55:52 -0000 1.34 --- pg_backup_files.c 29 Sep 2008 02:43:52 -0000 *************** *** 87,92 **** --- 87,93 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_backup_tar.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v retrieving revision 1.62 diff -c -r1.62 pg_backup_tar.c *** pg_backup_tar.c 15 Nov 2007 21:14:41 -0000 1.62 --- pg_backup_tar.c 29 Sep 2008 02:43:52 -0000 *************** *** 143,148 **** --- 143,149 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_restore.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v retrieving revision 1.88 diff -c -r1.88 pg_restore.c *** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88 --- pg_restore.c 29 Sep 2008 02:43:52 -0000 *************** *** 78,83 **** --- 78,84 ---- static int no_data_for_failed_tables = 0; static int outputNoTablespaces = 0; static int use_setsessauth = 0; + static int truncate_before_load = 0; struct option cmdopts[] = { {"clean", 0, NULL, 'c'}, *************** *** 92,97 **** --- 93,99 ---- {"ignore-version", 0, NULL, 'i'}, {"index", 1, NULL, 'I'}, {"list", 0, NULL, 'l'}, + {"multi-thread",1,NULL,'m'}, {"no-privileges", 0, NULL, 'x'}, {"no-acl", 0, NULL, 'x'}, {"no-owner", 0, NULL, 'O'}, *************** *** 114,119 **** --- 116,122 ---- {"disable-triggers", no_argument, &disable_triggers, 1}, {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1}, {"no-tablespaces", no_argument, &outputNoTablespaces, 1}, + {"truncate-before-load", no_argument, &truncate_before_load, 1}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {NULL, 0, NULL, 0} *************** *** 139,145 **** } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) --- 142,148 ---- } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) *************** *** 182,187 **** --- 185,194 ---- opts->tocFile = strdup(optarg); break; + case 'm': + opts->number_of_threads = atoi(optarg); /* XXX fix error checking */ + break; + case 'n': /* Dump data for this schema only */ opts->schemaNames = strdup(optarg); break; *************** *** 262,268 **** break; case 0: ! /* This covers the long options equivalent to -X xxx. */ break; case '1': /* Restore data in a single transaction */ --- 269,278 ---- break; case 0: ! /* ! * This covers the long options without a short equivalent, ! * including those equivalent to -X xxx. ! */ break; case '1': /* Restore data in a single transaction */ *************** *** 299,304 **** --- 309,329 ---- opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; opts->use_setsessauth = use_setsessauth; + opts->truncate_before_load = truncate_before_load; + + if (opts->single_txn) + { + if (opts->number_of_threads > 1) + { + write_msg(NULL, "single transaction not compatible with multi-threading"); + exit(1); + } + else if (opts->truncate_before_load) + { + write_msg(NULL, "single transaction not compatible with truncate-before-load"); + exit(1); + } + } if (opts->formatName) { *************** *** 330,335 **** --- 355,362 ---- AH = OpenArchive(inputFileSpec, opts->format); + /* XXX looks like we'll have to do sanity checks in the parallel archiver */ + /* Let the archiver know how noisy to be */ AH->verbose = opts->verbose; *************** *** 351,356 **** --- 378,385 ---- if (opts->tocSummary) PrintTOCSummary(AH, opts); + else if (opts->number_of_threads > 1) + RestoreArchiveParallel(AH, opts); else RestoreArchive(AH, opts);
Andrew Dunstan wrote: > > > Andrew Dunstan wrote: >> >> >>> >>> this works better but there is something fishy still - using the same >>> dump file I get a proper restore using pg_restore normally. If I >>> however use -m for a parallel one I only get parts (in this case only >>> 243 of the 709 tables) of the database restored ... >>> >>> >>> >> >> Yes, there are several funny things going on, including some stuff >> with dependencies. I'll have a new patch tomorrow with luck. Thanks >> for testing. >> >> > > OK, in this version a whole heap of bugs are fixed, mainly those to do > with dependencies and saved state. I get identical row counts in the > source and destination now, quite reliably. this looks much better (for a restore that usually takes 180min I can get down to 72min using -m 4) - however especially with higher concurrency I'm sometimes running into restore failures due to deadlocks happening during constraint restoration (slightly redacted): pg_restore: [archiver (db)] Error from TOC entry 7765; 2606 1460743180 FK CONSTRAINT fk_av_relations_av db_owner pg_restore: [archiver (db)] could not execute query: ERROR: deadlock detected DETAIL: Process 18100 waits for AccessExclusiveLock on relation 1460818342 of database 1460815284; blocked by process 18103. Process 18103 waits for AccessExclusiveLock on relation 1460818336 of database 1460815284; blocked by process 18100. HINT: See server log for query details. ALTER TABLE ONLY foo ADD CONSTRAINT fk_av_relations_av FOREIGN KEY (vs_id) REFERENCES bar ...
Stefan Kaltenbrunner <stefan@kaltenbrunner.cc> writes: > pg_restore: [archiver (db)] could not execute query: ERROR: deadlock > detected > DETAIL: Process 18100 waits for AccessExclusiveLock on relation > 1460818342 of database 1460815284; blocked by process 18103. > Process 18103 waits for AccessExclusiveLock on relation 1460818336 of > database 1460815284; blocked by process 18100. > HINT: See server log for query details. > ALTER TABLE ONLY foo > ADD CONSTRAINT fk_av_relations_av FOREIGN KEY (vs_id) REFERENCES > bar ... Hmm, I'll bet the restore code doesn't realize that this can't run in parallel with index creation on either table ... regards, tom lane
Tom Lane wrote: > Stefan Kaltenbrunner <stefan@kaltenbrunner.cc> writes: > >> pg_restore: [archiver (db)] could not execute query: ERROR: deadlock >> detected >> DETAIL: Process 18100 waits for AccessExclusiveLock on relation >> 1460818342 of database 1460815284; blocked by process 18103. >> Process 18103 waits for AccessExclusiveLock on relation 1460818336 of >> database 1460815284; blocked by process 18100. >> HINT: See server log for query details. >> > > >> ALTER TABLE ONLY foo >> ADD CONSTRAINT fk_av_relations_av FOREIGN KEY (vs_id) REFERENCES >> bar ... >> > > Hmm, I'll bet the restore code doesn't realize that this can't run in > parallel with index creation on either table ... > > > Yeah. Of course, it's never needed to bother with stuff like that till now. The very simple fix is probably to run a separate parallel cycle just for FKs, after the index creation. A slightly more elegant fix would probably be to add dependencies from each index that might cause this to the FK constraint. I'll work on the first for now. Is there any chance that the locks we're taking here are too strong? Intuitively it looks a bit like it. cheers andrew
Andrew Dunstan <andrew@dunslane.net> writes: > Tom Lane wrote: >> Hmm, I'll bet the restore code doesn't realize that this can't run in >> parallel with index creation on either table ... > Yeah. Of course, it's never needed to bother with stuff like that till now. > The very simple fix is probably to run a separate parallel cycle just > for FKs, after the index creation. Um, FKs could conflict with each other too, so that by itself isn't gonna fix anything. regards, tom lane
Tom Lane wrote: > Andrew Dunstan <andrew@dunslane.net> writes: > >> Tom Lane wrote: >> >>> Hmm, I'll bet the restore code doesn't realize that this can't run in >>> parallel with index creation on either table ... >>> > > >> Yeah. Of course, it's never needed to bother with stuff like that till now. >> > > >> The very simple fix is probably to run a separate parallel cycle just >> for FKs, after the index creation. >> > > Um, FKs could conflict with each other too, so that by itself isn't > gonna fix anything. > > > Good point. Looks like we'll need to make a list of "can't run in parallel with" items as well as strict dependencies. cheers andrew
Andrew Dunstan <andrew@dunslane.net> writes: > Tom Lane wrote: >> Um, FKs could conflict with each other too, so that by itself isn't >> gonna fix anything. > Good point. Looks like we'll need to make a list of "can't run in > parallel with" items as well as strict dependencies. Yeah, I was just thinking about that. The current archive format doesn't really carry enough information for this. I think there are two basic solutions we could adopt: * Extend the archive format to provide some indication that "restoring this object requires exclusive access to these dependencies". * Hardwire knowledge into pg_restore that certain types of objects require exclusive access to their dependencies. The former seems more flexible, as well as more in tune with the basic design assumption that pg_restore shouldn't have a lot of knowledge about individual archive object types. But it would mean that you couldn't use parallel restore with any pre-8.4 dumps. In the long run that's no big deal, but in the short run it's annoying. Another angle is that it's not clear what happens if the need for exclusive access changes over time. You were just speculating about reducing the lock strength required for ALTER TABLE ADD FOREIGN KEY. I don't know if that's workable or not, but certainly reducing the lock strength for some types of ALTER TABLE might be in our future. Contrarily, we don't currently try hard to lock any non-table objects (schemas, functions, etc) while building dependent objects; but that's obviously not really right, and someday we might decide to fix it. So having pg_dump prepare the list of exclusive dependencies at dump time might be the wrong thing --- it would reflect the behavior of the source server version, not the target which is what matters. Thoughts? regards, tom lane
Le lundi 29 septembre 2008, Tom Lane a écrit : > * Extend the archive format to provide some indication that "restoring > this object requires exclusive access to these dependencies". > > * Hardwire knowledge into pg_restore that certain types of objects > require exclusive access to their dependencies. Well, it seems to me that currently the FK needs in term of existing indexes and locks, and some other object lock needs, are all hardwired. Is it even safe to consider having the locks needed for certain commands not be hardwired? Provided I'm not all wrong here, I don't see how having something more flexible at restore time than at build time is a win. The drawback is that whenever you change a lock need in commands, you have to remember teaching pg_restore about it too. So my vote here is in favor of hardwired knowledge of pg_restore, matching target server code assumptions and needs. Regards, -- dim
Dimitri Fontaine wrote: > Le lundi 29 septembre 2008, Tom Lane a écrit : > >> * Extend the archive format to provide some indication that "restoring >> this object requires exclusive access to these dependencies". >> >> * Hardwire knowledge into pg_restore that certain types of objects >> require exclusive access to their dependencies. >> > > Well, it seems to me that currently the FK needs in term of existing indexes > and locks, and some other object lock needs, are all hardwired. Is it even > safe to consider having the locks needed for certain commands not be > hardwired? > > Provided I'm not all wrong here, I don't see how having something more > flexible at restore time than at build time is a win. The drawback is that > whenever you change a lock need in commands, you have to remember teaching > pg_restore about it too. > > So my vote here is in favor of hardwired knowledge of pg_restore, matching > target server code assumptions and needs. > > Well, I've had to use some knowledge of various item types already, and I have been trying not to disturb pg_dump also, so I'm inclined to build this knowledge into pg_restore. ISTM that "things that will have lock conflicts" are different and more target version dependent than "things that logically depend on other things", so we can still rely on pg_dump to some extent to provide the latter while building the former at restore time. cheers andrew
Tom Lane wrote: > Andrew Dunstan <andrew@dunslane.net> writes: >> Tom Lane wrote: >>> Um, FKs could conflict with each other too, so that by itself isn't >>> gonna fix anything. > >> Good point. Looks like we'll need to make a list of "can't run in >> parallel with" items as well as strict dependencies. > > Yeah, I was just thinking about that. The current archive format > doesn't really carry enough information for this. I think there > are two basic solutions we could adopt: > > * Extend the archive format to provide some indication that "restoring > this object requires exclusive access to these dependencies". > > * Hardwire knowledge into pg_restore that certain types of objects > require exclusive access to their dependencies. > > The former seems more flexible, as well as more in tune with the basic > design assumption that pg_restore shouldn't have a lot of knowledge > about individual archive object types. But it would mean that you > couldn't use parallel restore with any pre-8.4 dumps. In the long run > that's no big deal, but in the short run it's annoying. hmm not sure how much of a problem that really is - we usually recommend to use the pg_dump version of the target database anyway. Stefan
Stefan Kaltenbrunner wrote: > Tom Lane wrote: >> Andrew Dunstan <andrew@dunslane.net> writes: >>> Tom Lane wrote: >>>> Um, FKs could conflict with each other too, so that by itself isn't >>>> gonna fix anything. >> >>> Good point. Looks like we'll need to make a list of "can't run in >>> parallel with" items as well as strict dependencies. >> >> Yeah, I was just thinking about that. The current archive format >> doesn't really carry enough information for this. I think there >> are two basic solutions we could adopt: >> >> * Extend the archive format to provide some indication that "restoring >> this object requires exclusive access to these dependencies". >> >> * Hardwire knowledge into pg_restore that certain types of objects >> require exclusive access to their dependencies. >> >> The former seems more flexible, as well as more in tune with the basic >> design assumption that pg_restore shouldn't have a lot of knowledge >> about individual archive object types. But it would mean that you >> couldn't use parallel restore with any pre-8.4 dumps. In the long run >> that's no big deal, but in the short run it's annoying. > > hmm not sure how much of a problem that really is - we usually > recommend to use the pg_dump version of the target database anyway. > > > > We don't really need a huge amount of hardwiring as it turns out. Here is a version of the patch that tries to do what's needed in this area. cheers andrew Index: pg_backup.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v retrieving revision 1.47 diff -c -r1.47 pg_backup.h *** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47 --- pg_backup.h 29 Sep 2008 23:34:57 -0000 *************** *** 123,128 **** --- 123,130 ---- int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; + int number_of_threads; + bool truncate_before_load; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; *************** *** 165,170 **** --- 167,173 ---- extern void CloseArchive(Archive *AH); extern void RestoreArchive(Archive *AH, RestoreOptions *ropt); + extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt); /* Open an existing archive */ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); Index: pg_backup_archiver.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v retrieving revision 1.158 diff -c -r1.158 pg_backup_archiver.c *** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158 --- pg_backup_archiver.c 29 Sep 2008 23:34:58 -0000 *************** *** 27,38 **** --- 27,51 ---- #include <unistd.h> + #include <sys/types.h> + #include <sys/wait.h> + + #ifdef WIN32 #include <io.h> #endif #include "libpq/libpq-fs.h" + typedef struct _parallel_slot + { + pid_t pid; + TocEntry *te; + DumpId dumpId; + DumpId tdeps[2]; + } ParallelSlot; + + #define NO_SLOT (-1) const char *progname; *************** *** 70,76 **** --- 83,100 ---- static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); + static bool work_is_being_done(ParallelSlot *slot, int n_slots); + static int get_next_slot(ParallelSlot *slots, int n_slots); + static TocEntry *get_next_work_item(ArchiveHandle *AH, ParallelSlot *slots, int n_slots); + static void prestore(ArchiveHandle *AH, TocEntry *te); + static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots); + static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel); + static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te); + static void _fix_dependency_counts(ArchiveHandle *AH); + static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te); + + static ArchiveHandle *GAH; /* * Wrapper functions. *************** *** 125,137 **** /* Public */ void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; --- 149,633 ---- /* Public */ void + RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt) + { + + ArchiveHandle *AH = (ArchiveHandle *) AHX; + ParallelSlot *slots; + int next_slot; + TocEntry *next_work_item = NULL; + int work_status; + pid_t ret_child; + int n_slots = ropt->number_of_threads; + TocEntry *te; + teReqs reqs; + + + AH->debugLevel = 99; + /* some routines that use ahlog() don't get passed AH */ + GAH = AH; + + ahlog(AH,1,"entering RestoreARchiveParallel\n"); + + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots); + AH->ropt = ropt; + + /* + if (ropt->create) + die_horribly(AH,modulename, + "parallel restore is incompatible with --create\n"); + */ + + + if (ropt->dropSchema) + die_horribly(AH,modulename, + "parallel restore is incompatible with --clean\n"); + + if (!ropt->useDB) + die_horribly(AH,modulename, + "parallel restore requires direct database connection\n"); + + + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, + "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } + #endif + + ahlog(AH, 1, "connecting to database for restore\n"); + if (AH->version < K_VERS_1_3) + die_horribly(AH, modulename, + "direct database connections are not supported in pre-1.3 archives\n"); + + /* XXX Should get this from the archive */ + AHX->minRemoteVersion = 070100; + AHX->maxRemoteVersion = 999999; + + /* correct dependency counts in case we're doing a partial restore */ + if (ropt->idWanted == NULL) + InitDummyWantedList(AHX,ropt); + _fix_dependency_counts(AH); + + /* + * Since we're talking to the DB directly, don't send comments since they + * obscure SQL when displaying errors + */ + AH->noTocComments = 1; + + /* Do all the early stuff in a single connection in the parent. + * There's no great point in running it in parallel and it will actually + * run faster in a single connection because we avoid all the connection + * and setup overhead, including the 0.5s sleep below. + */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + while((next_work_item = get_next_work_item(AH,NULL,0)) != NULL) + { + /* XXX need to improve this test in case there is no table data */ + /* need to test for indexes, FKs, PK, Unique, etc */ + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) + break; + (void) _restore_one_te(AH, next_work_item, ropt, false); + + next_work_item->prestored = true; + + _reduce_dependencies(AH,next_work_item); + } + + + /* + * now close parent connection in prep for parallel step. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* blow away any preserved state from the previous connection */ + + if (AH->currSchema) + free(AH->currSchema); + AH->currSchema = strdup(""); + if (AH->currUser) + free(AH->currUser); + AH->currUser = strdup(""); + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; + AH->currWithOids = -1; + + /* main parent loop */ + + ahlog(AH,1,"entering main loop\n"); + + while (((next_work_item = get_next_work_item(AH,slots,n_slots)) != NULL) || + (work_is_being_done(slots,n_slots))) + { + if (next_work_item != NULL && + ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT)) + { + /* there is work still to do and a worker slot available */ + + pid_t child; + + next_work_item->prestored = true; + + child = fork(); + if (child == 0) + { + prestore(AH,next_work_item); + /* should not happen ... we expect prestore to exit */ + exit(1); + } + else if (child > 0) + { + slots[next_slot].pid = child; + slots[next_slot].te = next_work_item; + slots[next_slot].dumpId = next_work_item->dumpId; + slots[next_slot].tdeps[0] = next_work_item->tdeps[0]; + slots[next_slot].tdeps[1] = next_work_item->tdeps[1]; + } + else + { + /* XXX fork error - handle it! */ + } + continue; /* in case the slots are not yet full */ + } + /* if we get here there must be work being done */ + ret_child = wait(&work_status); + + if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0) + { + mark_work_done(AH, ret_child, slots, n_slots); + } + else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == ret_child) + _inhibit_data_for_failed_table(AH, slots[i].te); + break; + } + mark_work_done(AH, ret_child, slots, n_slots); + } + else + { + /* XXX something went wrong - deal with it */ + } + } + + /* + * now process the ACLs - no need to do this in parallel + */ + + /* reconnect from parent */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Scan TOC to output ownership commands and ACLs + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, true); + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "setting owner and privileges for %s %s\n", + te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, true); + } + } + + /* clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + } + + static bool + work_is_being_done(ParallelSlot *slot, int n_slots) + { + ahlog(GAH,1,"is work being done?\n"); + while(n_slots--) + { + if (slot->pid > 0) + return true; + slot++; + } + ahlog(GAH,1,"work is not being done\n"); + return false; + } + + static int + get_next_slot(ParallelSlot *slots, int n_slots) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == 0) + { + ahlog(GAH,1,"available slots is %d\n",i); + return i; + } + } + ahlog(GAH,1,"No slot available\n"); + return NO_SLOT; + } + + static TocEntry* + get_next_work_item(ArchiveHandle *AH, ParallelSlot *slots, int n_slots) + { + TocEntry *te; + teReqs reqs; + int i; + + /* just search from the top of the queue until we find an available item. + * Note that the queue isn't reordered in the current implementation. If + * we ever do reorder it, then certain code that processes entries from the + * current item to the end of the queue will probably need to be + * re-examined. + */ + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (!te->prestored && te->depCount < 1) + { + /* make sure it's not an ACL */ + reqs = _tocEntryRequired (te, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0) + continue; + + /* check against parallel slots for incompatible table locks */ + for (i=0; i < n_slots; i++) + { + if ((slots[i].tdeps[0] != 0 && + (te->tdeps[0] == slots[i].tdeps[0] || te->tdeps[1] == slots[i].tdeps[0])) || + (slots[i].tdeps[1] != 0 && + (te->tdeps[0] == slots[i].tdeps[1] || te->tdeps[1] == slots[i].tdeps[1]))) + { + if (strcmp(te->desc,"CONSTRAINT") == 0 || + strcmp(te->desc,"FK CONSTRAINT") == 0 || + strcmp(te->desc,"CHECK CONSTRAINT") == 0 || + strcmp(te->desc,"TRIGGER") == 0 || + strcmp(slots[i].te->desc,"CONSTRAINT") == 0 || + strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 || + strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 || + strcmp(slots[i].te->desc,"TRIGGER") == 0) + { + /* If either the thing that is running will have an + * AccessExclusive lock on the table, or this item + * would acquire such a lock, the item can't run yet. + */ + continue; + } + + } + } + + ahlog(AH,1,"next item is %d\n",te->dumpId); + return te; + } + } + ahlog(AH,1,"No item ready\n"); + return NULL; + } + + static void + prestore(ArchiveHandle *AH, TocEntry *te) + { + RestoreOptions *ropt = AH->ropt; + int retval; + + /* close and reopen the archive so we have a private copy that doesn't + * stomp on anyone else's file pointer + */ + + (AH->ReopenPtr)(AH); + + ConnectDatabase((Archive *)AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + retval = _restore_one_te(AH, te, ropt, true); + + PQfinish(AH->connection); + exit(retval); + + } + + static void + mark_work_done(ArchiveHandle *AH, pid_t worker, + ParallelSlot *slots, int n_slots) + { + + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == worker) + { + te = slots[i].te; + slots[i].pid = 0; + slots[i].te = NULL; + slots[i].dumpId = 0; + slots[i].tdeps[0] = 0; + slots[i].tdeps[1] = 0; + + break; + } + } + + /* Assert (te != NULL); */ + + _reduce_dependencies(AH,te); + + + } + + + /* + * Make sure the head of each dependency chain is a live item + * + * Once this is established the property will be maintained by + * _reduce_dependencies called as items are done. + */ + static void + _fix_dependency_counts(ArchiveHandle *AH) + { + TocEntry * te; + RestoreOptions * ropt = AH->ropt; + bool *RealDumpIds, *TableDumpIds; + DumpId d; + int i; + + + RealDumpIds = calloc(AH->maxDumpId, sizeof(bool)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + RealDumpIds[te->dumpId-1] = true; + if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + /* + * It is possible that the dependencies list items that are + * not in the archive at all. Reduce the depcounts so those get + * ignored. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + for (i = 0; i < te->nDeps; i++) + if (!RealDumpIds[te->dependencies[i]-1]) + te->depCount--; + + TableDumpIds = calloc(AH->maxDumpId,sizeof(bool)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + if (strcmp(te->desc,"TABLE") == 0) + TableDumpIds[te->dumpId-1] = true; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + for (i = 0; i < te->nDeps; i++) + { + d = te->dependencies[i]; + if (TableDumpIds[d-1]) + { + if (te->tdeps[0] == d || te->tdeps[1] == d) + continue; + + if (te->tdeps[0] == 0) + te->tdeps[0] = d; + else if (te->tdeps[1] == 0) + te->tdeps[1] = d; + else + die_horribly(AH,modulename, + "item %d has a dependency on more than two tables", te->dumpId); + } + } + } + + static void + _reduce_dependencies(ArchiveHandle * AH, TocEntry *te) + { + DumpId item = te->dumpId; + RestoreOptions * ropt = AH->ropt; + int i; + + for (te = te->next; te != AH->toc; te = te->next) + { + if (te->nDeps == 0) + continue; + + for (i = 0; i < te->nDeps; i++) + if (te->dependencies[i] == item) + te->depCount = te->depCount - 1; + + /* If this is a table data item we are making available, + * make the table's dependencies depend on this item instead of + * the table definition, so they + * don't get scheduled until the data is loaded. + * Have to do this now before the main loop gets to anything + * further down the list. + */ + if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0) + { + TocEntry *tes; + int j; + for (tes = te->next; tes != AH->toc; tes = tes->next) + for (j = 0; j < tes->nDeps; j++) + if (tes->dependencies[j] == item) + tes->dependencies[j] = te->dumpId; + } + + /* + * If this item won't in fact be done, and is now at + * 0 dependency count, we pretend it's been done and + * reduce the dependency counts of all the things that + * depend on it, by a recursive call + */ + if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + } + + + /* Public */ + void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; *************** *** 171,176 **** --- 667,686 ---- AH->noTocComments = 1; } + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in thisinstallation)\n"); + } + } + #endif + /* * Work out if we have an implied data-only restore. This can happen if * the dump was data only or if the user has used a toc list to exclude *************** *** 270,409 **** */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! AH->currentTE = te; ! ! /* Work out what, if anything, we want from this entry */ ! reqs = _tocEntryRequired(te, ropt, false); ! ! /* Dump any relevant dump warnings to stderr */ ! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) ! { ! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->defn); ! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); ! } ! ! defnDumped = false; ! ! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ ! { ! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); ! ! _printTocEntry(AH, te, ropt, false, false); ! defnDumped = true; ! ! /* ! * If we could not create a table and --no-data-for-failed-tables ! * was given, ignore the corresponding TABLE DATA ! */ ! if (ropt->noDataForFailedTables && ! AH->lastErrorTE == te && ! strcmp(te->desc, "TABLE") == 0) ! { ! TocEntry *tes; ! ! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", ! te->tag); ! ! for (tes = te->next; tes != AH->toc; tes = tes->next) ! { ! if (strcmp(tes->desc, "TABLE DATA") == 0 && ! strcmp(tes->tag, te->tag) == 0 && ! strcmp(tes->namespace ? tes->namespace : "", ! te->namespace ? te->namespace : "") == 0) ! { ! /* mark it unwanted */ ! ropt->idWanted[tes->dumpId - 1] = false; ! break; ! } ! } ! } ! ! /* If we created a DB, connect to it... */ ! if (strcmp(te->desc, "DATABASE") == 0) ! { ! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); ! _reconnectToDB(AH, te->tag); ! } ! } ! ! /* ! * If we have a data component, then process it ! */ ! if ((reqs & REQ_DATA) != 0) ! { ! /* ! * hadDumper will be set if there is genuine data component for ! * this node. Otherwise, we need to check the defn field for ! * statements that need to be executed in data-only restores. ! */ ! if (te->hadDumper) ! { ! /* ! * If we can output the data, then restore it. ! */ ! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) ! { ! #ifndef HAVE_LIBZ ! if (AH->compression != 0) ! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supportedin this installation)\n"); ! #endif ! ! _printTocEntry(AH, te, ropt, true, false); ! ! if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) ! { ! ahlog(AH, 1, "restoring %s\n", te->desc); ! ! _selectOutputSchema(AH, "pg_catalog"); ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! } ! else ! { ! _disableTriggersIfNecessary(AH, te, ropt); ! ! /* Select owner and schema as necessary */ ! _becomeOwner(AH, te); ! _selectOutputSchema(AH, te->namespace); ! ! ahlog(AH, 1, "restoring data for table \"%s\"\n", ! te->tag); ! ! /* ! * If we have a copy statement, use it. As of V1.3, ! * these are separate to allow easy import from ! * withing a database connection. Pre 1.3 archives can ! * not use DB connections and are sent to output only. ! * ! * For V1.3+, the table data MUST have a copy ! * statement so that we can go into appropriate mode ! * with libpq. ! */ ! if (te->copyStmt && strlen(te->copyStmt) > 0) ! { ! ahprintf(AH, "%s", te->copyStmt); ! AH->writingCopyData = true; ! } ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! ! AH->writingCopyData = false; ! ! _enableTriggersIfNecessary(AH, te, ropt); ! } ! } ! } ! else if (!defnDumped) ! { ! /* If we haven't already dumped the defn part, do so now */ ! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); ! _printTocEntry(AH, te, ropt, false, false); ! } ! } ! } /* end loop over TOC entries */ /* * Scan TOC again to output ownership commands and ACLs --- 780,787 ---- */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! (void) _restore_one_te(AH, te, ropt, false); ! } /* * Scan TOC again to output ownership commands and ACLs *************** *** 451,456 **** --- 829,1009 ---- } } + static int + _restore_one_te(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) + { + teReqs reqs; + bool defnDumped; + int retval = 0; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } + + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; + + /* + * If we could not create a table and --no-data-for-failed-tables + * was given, ignore the corresponding TABLE DATA + * + * For the parallel case this must be done in the parent, so we just + * set a return value. + */ + if (ropt->noDataForFailedTables && + AH->lastErrorTE == te && + strcmp(te->desc, "TABLE") == 0) + { + if (is_parallel) + retval = 1; + else + _inhibit_data_for_failed_table(AH,te); + } + + /* If we created a DB, connect to it... */ + /* won't happen in parallel restore */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for + * this node. Otherwise, we need to check the defn field for + * statements that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + if (ropt->truncate_before_load) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + + ahprintf(AH, "TRUNCATE TABLE %s;\n\n", + fmtId(te->tag)); } + + /* + * If we have a copy statement, use it. As of V1.3, + * these are separate to allow easy import from + * withing a database connection. Pre 1.3 archives can + * not use DB connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy + * statement so that we can go into appropriate mode + * with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; + + if (ropt->truncate_before_load) + { + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); + } + + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } + } + + return retval; + } + + static void + _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te) + { + TocEntry *tes; + RestoreOptions *ropt = AH->ropt; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = te->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted */ + ropt->idWanted[tes->dumpId - 1] = false; + + _reduce_dependencies(AH, tes); + break; + } + } + } + /* * Allocate a new RestoreOptions block. * This is mainly so we can initialize it, but also for future expansion, *************** *** 614,619 **** --- 1167,1173 ---- TocEntry *te = AH->toc->next; OutputContext sav; char *fmtName; + bool *TableDumpIds; if (ropt->filename) sav = SetOutput(AH, ropt->filename, 0 /* no compression */ ); *************** *** 650,662 **** ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n"); while (te != AH->toc) { if (_tocEntryRequired(te, ropt, true) != 0) ! ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId, te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); te = te->next; } --- 1204,1235 ---- ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n"); + TableDumpIds = calloc(AH->maxDumpId,sizeof(bool)); + while(te!= AH->toc) + { + if (strcmp(te->desc,"TABLE") == 0) + TableDumpIds[te->dumpId-1] = true; + te = te->next; + } + te = AH->toc->next; + while (te != AH->toc) { if (_tocEntryRequired(te, ropt, true) != 0) ! { ! int i; ! ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps); ! for (i=0 ;i<te->nDeps; i++) ! ahprintf(AH, "%d ",te->dependencies[i]); ! ahprintf(AH, "] { "); ! for (i=0 ;i<te->nDeps; i++) ! if (TableDumpIds[te->dependencies[i]-1]) ! ahprintf(AH, "%d ",te->dependencies[i]); ! ahprintf(AH,"} %u %u %s %s %s %s\n", te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); + } te = te->next; } *************** *** 1948,1965 **** --- 2521,2541 ---- deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx); te->dependencies = deps; te->nDeps = depIdx; + te->depCount = depIdx; } else { free(deps); te->dependencies = NULL; te->nDeps = 0; + te->depCount = 0; } } else { te->dependencies = NULL; te->nDeps = 0; + te->depCount = 0; } if (AH->ReadExtraTocPtr) Index: pg_backup_archiver.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v retrieving revision 1.76 diff -c -r1.76 pg_backup_archiver.h *** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76 --- pg_backup_archiver.h 29 Sep 2008 23:34:58 -0000 *************** *** 99,104 **** --- 99,105 ---- struct _restoreList; typedef void (*ClosePtr) (struct _archiveHandle * AH); + typedef void (*ReopenPtr) (struct _archiveHandle * AH); typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te); typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te); *************** *** 212,217 **** --- 213,219 ---- WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */ ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */ ClosePtr ClosePtr; /* Close the archive */ + ReopenPtr ReopenPtr; /* Reopen the archive */ WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data * associated with the current archive * format */ *************** *** 231,236 **** --- 233,239 ---- char *archdbname; /* DB name *read* from archive */ bool requirePassword; PGconn *connection; + char *cachepw; int connectToDB; /* Flag to indicate if direct DB connection is * required */ bool writingCopyData; /* True when we are sending COPY data */ *************** *** 284,289 **** --- 287,293 ---- DumpId dumpId; bool hadDumper; /* Archiver was passed a dumper routine (used * in restore) */ + bool prestored; /* keep track of parallel restore */ char *tag; /* index tag */ char *namespace; /* null or empty string if not in a schema */ char *tablespace; /* null if not in a tablespace; empty string *************** *** 296,301 **** --- 300,307 ---- char *copyStmt; DumpId *dependencies; /* dumpIds of objects this one depends on */ int nDeps; /* number of dependencies */ + int depCount; /* adjustable tally of dependencies */ + int tdeps[2]; DataDumperPtr dataDumper; /* Routine to dump data for object */ void *dataDumperArg; /* Arg for above routine */ Index: pg_backup_custom.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v retrieving revision 1.40 diff -c -r1.40 pg_backup_custom.c *** pg_backup_custom.c 28 Oct 2007 21:55:52 -0000 1.40 --- pg_backup_custom.c 29 Sep 2008 23:34:58 -0000 *************** *** 40,45 **** --- 40,46 ---- static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); + static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); *************** *** 120,125 **** --- 121,127 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; *************** *** 835,840 **** --- 837,879 ---- AH->FH = NULL; } + static void + _ReopenArchive(ArchiveHandle *AH) + { + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + { + die_horribly(AH,modulename,"Can only reopen input archives"); + } + else if ((! AH->fSpec) || strcmp(AH->fSpec, "") == 0) + { + die_horribly(AH,modulename,"Cannot reopen stdin"); + } + + tpos = ftello(AH->FH); + + if (fclose(AH->FH) != 0) + die_horribly(AH, modulename, "could not close archive file: %s\n", + strerror(errno)); + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); + + if (ctx->hasSeek) + { + fseeko(AH->FH, tpos, SEEK_SET); + } + else + { + die_horribly(AH,modulename,"cannot reopen non-seekable file"); + } + + } + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- Index: pg_backup_db.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v retrieving revision 1.80 diff -c -r1.80 pg_backup_db.c *** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80 --- pg_backup_db.c 29 Sep 2008 23:34:58 -0000 *************** *** 138,148 **** ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); ! if (AH->requirePassword) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); } do --- 138,153 ---- ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); ! if (AH->requirePassword && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); + AH->requirePassword = true; + } + else if (AH->requirePassword) + { + password = AH->cachepw; } do *************** *** 174,180 **** } } while (new_pass); ! if (password) free(password); /* check for version mismatch */ --- 179,185 ---- } } while (new_pass); ! if (password != AH->cachepw) free(password); /* check for version mismatch */ *************** *** 206,220 **** if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } else AH->requirePassword = false; /* * Start the connection. Loop until we have a password if requested by --- 211,231 ---- if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } + else if (reqPwd) + { + password = AH->cachepw; + } else + { AH->requirePassword = false; + } /* * Start the connection. Loop until we have a password if requested by *************** *** 241,247 **** } while (new_pass); if (password) ! free(password); /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) --- 252,258 ---- } while (new_pass); if (password) ! AH->cachepw = password; /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) Index: pg_backup_files.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v retrieving revision 1.34 diff -c -r1.34 pg_backup_files.c *** pg_backup_files.c 28 Oct 2007 21:55:52 -0000 1.34 --- pg_backup_files.c 29 Sep 2008 23:34:58 -0000 *************** *** 87,92 **** --- 87,93 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_backup_tar.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v retrieving revision 1.62 diff -c -r1.62 pg_backup_tar.c *** pg_backup_tar.c 15 Nov 2007 21:14:41 -0000 1.62 --- pg_backup_tar.c 29 Sep 2008 23:34:58 -0000 *************** *** 143,148 **** --- 143,149 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_restore.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v retrieving revision 1.88 diff -c -r1.88 pg_restore.c *** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88 --- pg_restore.c 29 Sep 2008 23:34:58 -0000 *************** *** 78,83 **** --- 78,84 ---- static int no_data_for_failed_tables = 0; static int outputNoTablespaces = 0; static int use_setsessauth = 0; + static int truncate_before_load = 0; struct option cmdopts[] = { {"clean", 0, NULL, 'c'}, *************** *** 92,97 **** --- 93,99 ---- {"ignore-version", 0, NULL, 'i'}, {"index", 1, NULL, 'I'}, {"list", 0, NULL, 'l'}, + {"multi-thread",1,NULL,'m'}, {"no-privileges", 0, NULL, 'x'}, {"no-acl", 0, NULL, 'x'}, {"no-owner", 0, NULL, 'O'}, *************** *** 114,119 **** --- 116,122 ---- {"disable-triggers", no_argument, &disable_triggers, 1}, {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1}, {"no-tablespaces", no_argument, &outputNoTablespaces, 1}, + {"truncate-before-load", no_argument, &truncate_before_load, 1}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {NULL, 0, NULL, 0} *************** *** 139,145 **** } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) --- 142,148 ---- } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) *************** *** 182,187 **** --- 185,194 ---- opts->tocFile = strdup(optarg); break; + case 'm': + opts->number_of_threads = atoi(optarg); /* XXX fix error checking */ + break; + case 'n': /* Dump data for this schema only */ opts->schemaNames = strdup(optarg); break; *************** *** 262,268 **** break; case 0: ! /* This covers the long options equivalent to -X xxx. */ break; case '1': /* Restore data in a single transaction */ --- 269,278 ---- break; case 0: ! /* ! * This covers the long options without a short equivalent, ! * including those equivalent to -X xxx. ! */ break; case '1': /* Restore data in a single transaction */ *************** *** 299,304 **** --- 309,329 ---- opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; opts->use_setsessauth = use_setsessauth; + opts->truncate_before_load = truncate_before_load; + + if (opts->single_txn) + { + if (opts->number_of_threads > 1) + { + write_msg(NULL, "single transaction not compatible with multi-threading"); + exit(1); + } + else if (opts->truncate_before_load) + { + write_msg(NULL, "single transaction not compatible with truncate-before-load"); + exit(1); + } + } if (opts->formatName) { *************** *** 330,335 **** --- 355,362 ---- AH = OpenArchive(inputFileSpec, opts->format); + /* XXX looks like we'll have to do sanity checks in the parallel archiver */ + /* Let the archiver know how noisy to be */ AH->verbose = opts->verbose; *************** *** 351,356 **** --- 378,385 ---- if (opts->tocSummary) PrintTOCSummary(AH, opts); + else if (opts->number_of_threads > 1) + RestoreArchiveParallel(AH, opts); else RestoreArchive(AH, opts);
> > + if (strcmp(te->desc,"CONSTRAINT") == 0 || > + strcmp(te->desc,"FK CONSTRAINT") == 0 || > + strcmp(te->desc,"CHECK CONSTRAINT") == 0 || > + strcmp(te->desc,"TRIGGER") == 0 || > + strcmp(slots[i].te->desc,"CONSTRAINT") == 0 || > + strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 || > + strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 || > + strcmp(slots[i].te->desc,"TRIGGER") == 0) > Really just an observation from the peanut gallery here, but every time pg_restore hard-codes this kind of thing, it introduces yet another possible side-effect bug when someone, eg, adds a new TOC type. Would it substantially decrease the benefits of the patch to skip *any* toc entry that shares dependencies with another? (rather than just those listed above).
Philip Warner wrote: >> + if (strcmp(te->desc,"CONSTRAINT") == 0 || >> + strcmp(te->desc,"FK CONSTRAINT") == 0 || >> + strcmp(te->desc,"CHECK CONSTRAINT") == 0 || >> + strcmp(te->desc,"TRIGGER") == 0 || >> + strcmp(slots[i].te->desc,"CONSTRAINT") == 0 || >> + strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 || >> + strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 || >> + strcmp(slots[i].te->desc,"TRIGGER") == 0) >> >> > Really just an observation from the peanut gallery here, but every time > pg_restore hard-codes this kind of thing, it introduces yet another > possible side-effect bug when someone, eg, adds a new TOC type. > > Would it substantially decrease the benefits of the patch to skip *any* > toc entry that shares dependencies with another? (rather than just those > listed above). > > > Unfortunately, it quite possibly would. You would not be able to build two indexes on the same table in parallel, even though they wouldn't have conflicting locks. cheers andrew
Andrew Dunstan wrote: > Unfortunately, it quite possibly would. You would not be able to build > two indexes on the same table in parallel, even though they wouldn't > have conflicting locks. I suppose so, but: 1. By the same logic it might speed things up; it might build two completely separate indexes and thereby avoid (some kind of) contention. In any case, it would most likely do *something* else. It should only reduce performance if (a) it can do nothing or (b) there is a benefit in building multiple indexes on the same table at the same time. 2. Perhaps if there are a limited number of items that share dependencies but which are known to be OK (ie. indexes), maybe list them in the inner loop as exceptions and allow them to run parallel. This would mean a failure to list a new TOC item type would result in worse performance rather than a crash.
Philip Warner wrote: > Andrew Dunstan wrote: > >> Unfortunately, it quite possibly would. You would not be able to build >> two indexes on the same table in parallel, even though they wouldn't >> have conflicting locks. >> > I suppose so, but: > > 1. By the same logic it might speed things up; it might build two > completely separate indexes and thereby avoid (some kind of) contention. > In any case, it would most likely do *something* else. It should only > reduce performance if (a) it can do nothing or (b) there is a benefit in > building multiple indexes on the same table at the same time. > > 2. Perhaps if there are a limited number of items that share > dependencies but which are known to be OK (ie. indexes), maybe list them > in the inner loop as exceptions and allow them to run parallel. This > would mean a failure to list a new TOC item type would result in worse > performance rather than a crash. > > > I will look at it in due course. Right now my concern is simply to get something that works that we can do some testing with. I think that's what we have now (fingers crossed). Some parts of it are jury rigged. BTW, though, building indexes for the same table together is likely to be a win AIUI, especially given the recent work on synchronised scans. cheers andrew
Andrew Dunstan wrote: > > > Stefan Kaltenbrunner wrote: >> Tom Lane wrote: >>> Andrew Dunstan <andrew@dunslane.net> writes: >>>> Tom Lane wrote: >>>>> Um, FKs could conflict with each other too, so that by itself isn't >>>>> gonna fix anything. >>> >>>> Good point. Looks like we'll need to make a list of "can't run in >>>> parallel with" items as well as strict dependencies. >>> >>> Yeah, I was just thinking about that. The current archive format >>> doesn't really carry enough information for this. I think there >>> are two basic solutions we could adopt: >>> >>> * Extend the archive format to provide some indication that "restoring >>> this object requires exclusive access to these dependencies". >>> >>> * Hardwire knowledge into pg_restore that certain types of objects >>> require exclusive access to their dependencies. >>> >>> The former seems more flexible, as well as more in tune with the basic >>> design assumption that pg_restore shouldn't have a lot of knowledge >>> about individual archive object types. But it would mean that you >>> couldn't use parallel restore with any pre-8.4 dumps. In the long run >>> that's no big deal, but in the short run it's annoying. >> >> hmm not sure how much of a problem that really is - we usually >> recommend to use the pg_dump version of the target database anyway. >> >> >> >> > > We don't really need a huge amount of hardwiring as it turns out. Here > is a version of the patch that tries to do what's needed in this area. this one is much better - however I still seem to be able to create deadlock scenarios with strange FK relations - ie FKs going in both directions between two tables. for those interested these are the timings on my 8 core testbox for my test database: single process restore: 169min -m2: 101min -m6: 64min -m8: 63min -m16: 56min Stefan