diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index cee09c7..8e0b3f2 100644 *** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** SET ENABLE_SEQSCAN TO OFF; *** 2010,2017 **** --- 2010,2125 ---- You should also consider setting hot_standby_feedback as an alternative to using this parameter. + + + Synchronous Replication + + + These settings control the behavior of the built-in + synchronous replication feature. + These parameters would be set on the primary server that is + to send replication data to one or more standby servers. + + + + + synchronous_replication (boolean) + + synchronous_replication configuration parameter + + + + Specifies whether transaction commit will wait for WAL records + to be replicated before the command returns a success + indication to the client. The default setting is off. + When on, there will be a delay while the client waits + for confirmation of successful replication. That delay will + increase depending upon the physical distance and network activity + between primary and standby. The commit wait will last until the + current synchronous standby replies. + + + + On the primary, this parameter can be changed at any time; the + behavior for any one transaction is determined by the setting in + effect when it commits. It is therefore possible, and useful, to have + some transactions replicate synchronously and others asynchronously. + For example, to make a single multistatement transaction commit + asynchronously when the default is synchronous replication, issue + SET LOCAL synchronous_replication TO OFF within the + transaction. + + + + + + replication_timeout_client (integer) + + replication_timeout_client configuration parameter + + + + If the client has synchronous_replication set, + and a synchronous standby is currently available + then the commit will wait for up to replication_timeout_client + seconds before it returns a success. The commit will wait + forever for a confirmation when replication_timeout_client + is set to -1. + + + If the client has synchronous_replication set, + and yet no synchronous standby is available when we commit, then the + setting of allow_standalone_primary determines whether + or not we wait. + + + + + + allow_standalone_primary (boolean) + + allow_standalone_primary configuration parameter + + + + If allow_standalone_primary is set, then the server + can operate normally whether or not replication is active. If + a client requests synchronous_replication and it is + not available, they will use asynchronous replication instead. + + + + + + + + synchronous_standby_names (integer) + + synchronous_standby_names configuration parameter + + + + Specifies a list of standby names that can become the sole + synchronous standby. Other standby servers that are also on the list + become potential standbys. If the current synchronous standby + goes away it will be replaced with one of the potential standbys. + Specifying more than one standby name can allow very high availability. + + + The standby name is currently taken as the application_name of the + standby, as set in the primary_conninfo on the standby. Names are + not enforced for uniqueness, though clearly that can lead to + confusion. Specifying multiple standbys with the same name does not + allow more than one standby to be the current synchronous standby. + + + If a standby is removed from the list of servers then it will stop + being the synchronous standby, allowing another to take it's place. + Standbys may also be added to the list without restarting the server. + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 37ba43b..d2710dd 100644 *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** primary_conninfo = 'host=192.168.1.50 po *** 875,880 **** --- 875,1083 ---- + + Synchronous Replication + + + Synchronous Replication + + + + PostgreSQL streaming replication is asynchronous by + default. If the primary server + crashes then some transactions that were committed may not have been + replicated to the standby server, causing data loss. The amount + of data loss is proportional to the replication delay at the time of + failover. + + + + Synchronous replication offers the ability to confirm that all changes + made by a transaction have been transferred to one synchronous standby + server. This extends the standard level of durability + offered by a transaction commit. This level of protection is referred + to as 2-safe replication in computer science theory. + + + + Synchronous replication works in the following way. When requested, + the commit of a write transaction will wait until confirmation is + received that the commit has been written to the transaction log on disk + of both the primary and standby server. The only possibility that data + can be lost is if both the primary and the standby suffer crashes at the + same time. This can provide a much higher level of durability if the + sysadmin is cautious about the placement and management of the two servers. + Waiting for confirmation increases the user's confidence that the changes + will not be lost in the event of server crashes but it also necessarily + increases the response time for the requesting transaction. The minimum + wait time is the roundtrip time between primary to standby. + + + + Read only transactions and transaction rollbacks need not wait for + replies from standby servers. Subtransaction commits do not wait for + responses from standby servers, only top-level commits. Long + running actions such as data loading or index building do not wait + until the very final commit message. + + + + Basic Configuration + + + Synchronous replication will be active if appropriate options are + enabled on both the primary and at least one standby server. + + + + On the primary server we need to set + + + synchronous_standby_names = 'bill, ted' + synchronous_replication = on + + + and on the standby server we need to set a non-zero value for + + + wal_receiver_status_interval = 10s + + + On the primary, synchronous_replication can be set + for particular users or databases, or dynamically by applications + programs. On the standby, synchronous_replication_feedback + can only be set at server start. + + + + + + Planning for Performance + + + Synchronous replication usually requires carefully planned and placed + standby servers to ensure applications perform acceptably. Waiting + doesn't utilise system resources, but transaction locks continue to be + held until the transfer is confirmed. As a result, incautious use of + synchronous replication will reduce performance for database + applications because of increased response times and higher contention. + + + + PostgreSQL allows the application developer + to specify the durability level required via replication. This can be + specified for the system overall, though it can also be specified for + specific users or connections, or even individual transactions. + + + + For example, an application workload might consist of: + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + + + + With synchronous replication options specified at the application level + (on the primary) we can offer sync rep for the most important changes, + without slowing down the bulk of the total workload. Application level + options are an important and practical tool for allowing the benefits of + synchronous replication for high performance applications. + + + + You should consider that the network bandwidth must be higher than + the rate of generation of WAL data. + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + + + + + + Planning for High Availability + + + The easiest and safest method of gaining High Availability using + synchronous replication is to configure at least two standby servers. + To understand why, we need to examine what can happen when you lose all + standby servers. + + + + Commits made when synchronous_replication is set will wait until at + least one standby responds. The response may never occur if the last, + or only, standby should crash or the network drops. What should we do in + that situation? + + + + Sitting and waiting will typically cause operational problems + because it is an effective outage of the primary server should all + sessions end up waiting. In contrast, allowing the primary server to + continue processing write transactions in the absence of a standby + puts those latest data changes at risk. So in this situation there + is a direct choice between database availability and the potential + durability of the data it contains. How we handle this situation + is controlled by allow_standalone_primary. The default + setting is on, allowing processing to continue, though + there is no recommended setting. Choosing the best setting for + allow_standalone_primary is a difficult decision and best + left to those with combined business responsibility for both data and + applications. The difficulty of this choice is the reason why we + recommend that you reduce the possibility of this situation occurring + by using multiple standby servers. + + + + A user will stop waiting once the replication_timeout_client + has been reached for their specific session. Users are not waiting for + a specific standby to reply, they are waiting for a reply from any + standby, so the unavailability of any one standby is not significant + to a user. It is possible for user sessions to hit timeout even though + standbys are communicating normally. In that case, the setting of + replication_timeout is probably too low. + + + + When a standby first attaches to the primary, it may not be properly + synchronized. The standby is only able to become a synchronous standby + once it has become synchronized, or "caught up" with the the primary. + The catch-up duration may be long immediately after the standby has + been created. If the standby is shutdown, then the catch-up period + will increase according to the length of time the standby has been + down. You are advised to make sure allow_standalone_primary + is not set during the initial catch-up period. + + + + If primary crashes while commits are waiting for acknowledgement, those + transactions will be marked fully committed if the primary database + recovers, no matter how allow_standalone_primary is set. + There is no way to be certain that all standbys have received all + outstanding WAL data at time of the crash of the primary. Some + transactions may not show as committed on the standby, even though + they show as committed on the primary. The guarantee we offer is that + the application will not receive explicit acknowledgement of the + successful commit of a transaction until the WAL data is known to be + safely received by the standby. Hence this mechanism is technically + "semi synchronous" rather than "fully synchronous" replication. Note + that replication still not be fully synchronous even if we wait for + all standby servers, though this would reduce availability, as + described previously. + + + + If you need to re-create a standby server while transactions are + waiting, make sure that the commands to run pg_start_backup() and + pg_stop_backup() are run in a session with + synchronous_replication = off, otherwise those requests will wait + forever for the standby to appear. + + + + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 287ad26..eb3cd6f 100644 *** a/src/backend/access/transam/twophase.c --- b/src/backend/access/transam/twophase.c *************** *** 56,61 **** --- 56,62 ---- #include "pg_trace.h" #include "pgstat.h" #include "replication/walsender.h" + #include "replication/syncrep.h" #include "storage/fd.h" #include "storage/predicate.h" #include "storage/procarray.h" *************** RecordTransactionCommitPrepared(Transact *** 2030,2035 **** --- 2031,2044 ---- MyProc->inCommit = false; END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a0170b4..5f73226 100644 *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** *** 37,42 **** --- 37,43 ---- #include "miscadmin.h" #include "pgstat.h" #include "replication/walsender.h" + #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" *************** RecordTransactionCommit(void) *** 1055,1061 **** * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ ! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0) { /* * Synchronous commit case: --- 1056,1062 ---- * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ ! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested()) { /* * Synchronous commit case: *************** RecordTransactionCommit(void) *** 1125,1130 **** --- 1126,1139 ---- /* Compute latestXid while we have the child XIDs handy */ latestXid = TransactionIdLatest(xid, nchildren, children); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(XactLastRecEnd); + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd.xrecoff = 0; diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 7307c41..2171b50 100644 *** a/src/backend/postmaster/autovacuum.c --- b/src/backend/postmaster/autovacuum.c *************** AutoVacWorkerMain(int argc, char *argv[] *** 1527,1532 **** --- 1527,1540 ---- SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); /* + * Force synchronous replication off to allow regular maintenance even + * if we are waiting for standbys to connect. This is important to + * ensure we aren't blocked from performing anti-wraparound tasks + * when allow_standalone_primary = false + */ + SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE); + + /* * Get the info about the database we're going to work on. */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 42c6eaf..3fe490e 100644 *** a/src/backend/replication/Makefile --- b/src/backend/replication/Makefile *************** top_builddir = ../../.. *** 13,19 **** include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ ! repl_gram.o include $(top_srcdir)/src/backend/common.mk --- 13,19 ---- include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ ! repl_gram.o syncrep.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ...cf8bdd6 . *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 0 **** --- 1,666 ---- + /*------------------------------------------------------------------------- + * + * syncrep.c + * + * Synchronous replication is new as of PostgreSQL 9.1. + * + * If requested, transaction commits wait until their commit LSN is + * acknowledged by the standby, or the wait hits timeout. + * + * This module contains the code for waiting and release of backends. + * All code in this module executes on the primary. The core streaming + * replication transport remains within WALreceiver/WALsender modules. + * + * The essence of this design is that it isolates all logic about + * waiting/releasing onto the primary. The primary defines which standbys + * it wishes to wait for. The standby is completely unaware of the + * durability requirements of transactions on the primary, reducing the + * complexity of the code and streamlining both standby operations and + * network bandwidth because there is no requirement to ship + * per-transaction state information. + * + * The bookeeping approach we take is that a commit is either synchronous + * or not synchronous (async). If it is async, we just fastpath out of + * here. If it is sync, then in 9.1 we wait for the flush location on the + * standby before releasing the waiting backend. Further complexity + * in that interaction is expected in later releases. + * + * The best performing way to manage the waiting backends is to have a + * single ordered queue of waiting backends, so that we can avoid + * searching the through all waiters each time we receive a reply. + * + * Starting sync replication is a multi stage process. First, the standby + * must be a potential synchronous standby. Next, we must have caught up + * with the primary; that may take some time. If there is no current + * synchronous standby then the WALsender will offer a sync rep service. + * + * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include + + #include "access/xact.h" + #include "access/xlog_internal.h" + #include "miscadmin.h" + #include "postmaster/autovacuum.h" + #include "replication/syncrep.h" + #include "replication/walsender.h" + #include "storage/latch.h" + #include "storage/ipc.h" + #include "storage/pmsignal.h" + #include "storage/proc.h" + #include "utils/builtins.h" + #include "utils/guc.h" + #include "utils/guc_tables.h" + #include "utils/memutils.h" + #include "utils/ps_status.h" + + /* User-settable parameters for sync rep */ + bool sync_rep_mode = false; /* Only set in user backends */ + int sync_rep_timeout_client = 120; /* Only set in user backends */ + bool allow_standalone_primary; + char *SyncRepStandbyNames; + + + #define IsOnSyncRepQueue() (MyProc->lwWaiting) + + static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN); + static void SyncRepRemoveFromQueue(void); + static void SyncRepAddToQueue(void); + static bool SyncRepServiceAvailable(void); + static long SyncRepGetWaitTimeout(void); + + static bool IsPotentialSyncRepStandby(void); + static int SyncRepWakeQueue(void); + + + /* + * =========================================================== + * Synchronous Replication functions for normal user backends + * =========================================================== + */ + + /* + * Wait for synchronous replication, if requested by user. + */ + extern void + SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) + { + /* + * Fast exit if user has not requested sync replication, or + * streaming replication is inactive in this server. + */ + if (!SyncRepRequested() || max_wal_senders == 0) + return; + + if (allow_standalone_primary) + { + /* + * Check that the service level we want is available. + * If not, downgrade the service level to async. + */ + if (SyncRepServiceAvailable()) + SyncRepWaitOnQueue(XactCommitLSN); + } + else + { + /* + * Wait, even if service is not yet available. + * Sounds weird, but this mode exists to provide + * higher levels of protection. + */ + SyncRepWaitOnQueue(XactCommitLSN); + } + } + + /* + * Wait for specified LSN to be confirmed at the requested level + * of durability. Each proc has its own wait latch, so we perform + * a normal latch check/wait loop here. + */ + static void + SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + TimestampTz now = GetCurrentTransactionStopTimestamp(); + long timeout = SyncRepGetWaitTimeout(); + char *new_status = NULL; + const char *old_status; + int len; + + ereport(DEBUG3, + (errmsg("synchronous replication waiting for %X/%X starting at %s", + XactCommitLSN.xlogid, + XactCommitLSN.xrecoff, + timestamptz_to_str(GetCurrentTransactionStopTimestamp())))); + + for (;;) + { + ResetLatch(&MyProc->waitLatch); + + /* + * First time through, add ourselves to the appropriate queue. + */ + if (!IsOnSyncRepQueue()) + { + SpinLockAcquire(&queue->qlock); + if (XLByteLE(XactCommitLSN, queue->lsn)) + { + /* No need to wait */ + SpinLockRelease(&queue->qlock); + return; + } + + /* + * Set our waitLSN so WALSender will know when to wake us. + * We set this before we add ourselves to queue, so that + * any proc on the queue can be examined freely without + * taking a lock on each process in the queue. + */ + MyProc->waitLSN = XactCommitLSN; + SyncRepAddToQueue(); + SpinLockRelease(&queue->qlock); + + /* + * Alter ps display to show waiting for sync rep. + */ + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 21 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting for sync rep"); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off " waiting" */ + } + else + { + bool release = false; + bool timed_out = false; + + SpinLockAcquire(&queue->qlock); + + /* + * Check the LSN on our queue and if it's moved far enough then + * remove us from the queue. First time through this is + * unlikely to be far enough, yet is possible. Next time we are + * woken we should be more lucky. + */ + if (XLByteLE(XactCommitLSN, queue->lsn)) + release = true; + else if (timeout > 0 && + TimestampDifferenceExceeds(GetCurrentTransactionStopTimestamp(), + now, timeout)) + { + release = true; + timed_out = true; + } + + if (release) + { + SyncRepRemoveFromQueue(); + SpinLockRelease(&queue->qlock); + + if (new_status) + { + /* Reset ps display */ + set_ps_display(new_status, false); + pfree(new_status); + } + + /* + * Our response to the timeout is to simply post a NOTICE and + * then return to the user. The commit has happened, we just + * haven't been able to verify it has been replicated in the + * way requested. + */ + if (timed_out) + ereport(NOTICE, + (errmsg("synchronous replication timeout at %s", + timestamptz_to_str(now)))); + else + ereport(DEBUG3, + (errmsg("synchronous replication wait complete at %s", + timestamptz_to_str(now)))); + return; + } + + SpinLockRelease(&queue->qlock); + } + + WaitLatch(&MyProc->waitLatch, timeout); + now = GetCurrentTimestamp(); + } + } + + /* + * Remove myself from sync rep wait queue. + * + * Assume on queue at start; will not be on queue at end. + * Queue is already locked at start and remains locked on exit. + */ + void + SyncRepRemoveFromQueue(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + PGPROC *proc = queue->head; + + Assert(IsOnSyncRepQueue()); + + proc = queue->head; + + if (proc == MyProc) + { + if (MyProc->lwWaitLink == NULL) + { + /* + * We were the only waiter on the queue. Reset head and tail. + */ + Assert(queue->tail == MyProc); + queue->head = NULL; + queue->tail = NULL; + } + else + /* + * Move head to next proc on the queue. + */ + queue->head = MyProc->lwWaitLink; + } + else + { + bool found = false; + + while (proc->lwWaitLink != NULL) + { + /* Are we the next proc in our traversal of the queue? */ + if (proc->lwWaitLink == MyProc) + { + /* + * Remove ourselves from middle of queue. + * No need to touch head or tail. + */ + proc->lwWaitLink = MyProc->lwWaitLink; + found = true; + break; + } + + proc = proc->lwWaitLink; + } + + if (!found) + elog(WARNING, "could not locate ourselves on wait queue"); + + /* If MyProc was removed from the tail, maintain list invariant head==tail */ + if (proc->lwWaitLink == NULL) + { + Assert(proc != MyProc); /* impossible since that is the head=MyProc branch above */ + + /* Remove ourselves from tail of queue */ + Assert(queue->tail == MyProc); + queue->tail = proc; + proc->lwWaitLink = NULL; + } + } + MyProc->lwWaitLink = NULL; + MyProc->lwWaiting = false; + } + + /* + * Add myself to sync rep wait queue. + * + * Assume not on queue at start; will be on queue at end. + * Queue is already locked at start and remains locked on exit. + */ + static void + SyncRepAddToQueue(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + PGPROC *tail = queue->tail; + + /* + * Add myself to tail of wait queue. + */ + if (tail == NULL) + { + queue->head = MyProc; + queue->tail = MyProc; + } + else + { + /* + * XXX extra code needed here to maintain sorted invariant. + * Our approach should be same as racing car - slow in, fast out. + */ + Assert(tail->lwWaitLink == NULL); + tail->lwWaitLink = MyProc; + } + queue->tail = MyProc; + + MyProc->lwWaiting = true; + MyProc->lwWaitLink = NULL; + } + + /* + * Dynamically decide the sync rep wait mode. It may seem a trifle + * wasteful to do this for every transaction but we need to do this + * so we can cope sensibly with standby disconnections. It's OK to + * spend a few cycles here anyway, since while we're doing this the + * WALSender will be sending the data we want to wait for, so this + * is dead time and the user has requested to wait anyway. + */ + static bool + SyncRepServiceAvailable(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + bool result = false; + + SpinLockAcquire(&walsndctl->ctlmutex); + result = walsndctl->sync_rep_service_available; + SpinLockRelease(&walsndctl->ctlmutex); + + return result; + } + + /* + * Return a value that we can use directly in WaitLatch(). We need to + * handle special values, plus convert from seconds to microseconds. + * + */ + static long + SyncRepGetWaitTimeout(void) + { + if (sync_rep_timeout_client <= 0) + return -1L; + + return 1000000L * sync_rep_timeout_client; + } + + void + SyncRepCleanupAtProcExit(int code, Datum arg) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + + if (IsOnSyncRepQueue()) + { + SpinLockAcquire(&queue->qlock); + SyncRepRemoveFromQueue(); + SpinLockRelease(&queue->qlock); + } + + if (MyProc != NULL && MyProc->ownLatch) + { + DisownLatch(&MyProc->waitLatch); + MyProc->ownLatch = false; + } + } + + /* + * =========================================================== + * Synchronous Replication functions for wal sender processes + * =========================================================== + */ + + /* + * Check if we are in the list of sync standbys. + * + * Compare the parameter SyncRepStandbyNames against the application_name + * for this WALSender. + */ + static bool + IsPotentialSyncRepStandby(void) + { + char *rawstring; + List *elemlist; + ListCell *l; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(SyncRepStandbyNames); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist); + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid list syntax for parameter \"synchronous_standby_names\""))); + return false; + } + + foreach(l, elemlist) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, application_name) == 0) + { + pfree(rawstring); + list_free(elemlist); + return true; + } + } + + pfree(rawstring); + list_free(elemlist); + return false; + } + + /* + * Take any action required to initialise sync rep state from config + * data. Called at WALSender startup and after each SIGHUP. + */ + void + SyncRepInitConfig(void) + { + bool sync_standby = IsPotentialSyncRepStandby(); + + /* + * Determine if we are a potential sync standby and remember the result + * for handling replies from standby. + */ + if (!MyWalSnd->potential_sync_standby && sync_standby) + { + MyWalSnd->potential_sync_standby = true; + ereport(DEBUG1, + (errmsg("standby \"%s\" is a potential synchronous standby", + application_name))); + } + else if (MyWalSnd->potential_sync_standby && !sync_standby) + { + /* + * We're no longer a potential sync standby. + */ + MyWalSnd->potential_sync_standby = false; + + /* + * Stop providing the sync rep service, to let another take over. + */ + if (MyWalSnd->sync_rep_service) + { + /* + * Update state for this WAL sender. + */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sync_rep_service = false; + SpinLockRelease(&walsnd->mutex); + } + + /* + * Stop providing the sync rep service, even if there are + * waiting backends. + */ + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + + SpinLockAcquire(&walsndctl->ctlmutex); + walsndctl->sync_rep_service_available = false; + SpinLockRelease(&walsndctl->ctlmutex); + } + + ereport(DEBUG1, + (errmsg("standby \"%s\" is no longer the synchronous replication standby", + application_name))); + } + } + } + + /* + * Update the LSNs on each queue based upon our latest state. This + * implements a simple policy of first-valid-standby-releases-waiter. + * + * Other policies are possible, which would change what we do here and what + * perhaps also which information we store as well. + */ + void + SyncRepReleaseWaiters(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + + /* + * If this WALSender is serving a standby that is not on the list of + * potential standbys then we have nothing to do. + */ + if (!MyWalSnd->potential_sync_standby) + return; + + /* + * We're a potential sync standby. If we aren't yet offering a sync + * rep service, check whether we need to begin offering that service. + * We check this dynamically to ensure that we can continue to offer + * a service if we have multiple potential standbys and the current + * sync standby fails. + * + * We don't attempt to enable sync rep service during a base backup since + * during that action we aren't sending WAL at all, so there cannot be + * any meaningful replies. We don't enable sync rep service while we + * are still in catchup mode either, since clients might experience an + * extended wait (perhaps hours) if they waited at that point. + */ + if (!MyWalSnd->sync_rep_service && + MyWalSnd->state == WALSNDSTATE_STREAMING) + { + if (SyncRepServiceAvailable()) + { + /* + * Another WALSender is already providing the sync rep service. + */ + return; + } + else + { + bool enable_service = false; + + /* + * We're a potential sync standby and there isn't currently + * a sync standby, so we're now going to become one. Watch for + * race conditions here. + */ + { + SpinLockAcquire(&WalSndCtl->ctlmutex); + if (!WalSndCtl->sync_rep_service_available) + { + WalSndCtl->sync_rep_service_available = true; + enable_service = true; + } + SpinLockRelease(&WalSndCtl->ctlmutex); + } + + /* + * Another WALSender just is already providing the sync rep service. + */ + if (!enable_service) + return; + + ereport(DEBUG1, + (errmsg("standby \"%s\" is now the synchronous replication standby", + application_name))); + + /* + * Update state for this WAL sender. + */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sync_rep_service = true; + SpinLockRelease(&walsnd->mutex); + } + } + } + + /* + * Maintain queue LSNs and release wakers. + */ + { + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + int numprocs = 0; + + /* + * Lock the queue. Not really necessary with just one sync standby + * but it makes clear what needs to happen. + */ + SpinLockAcquire(&queue->qlock); + if (XLByteLT(queue->lsn, MyWalSnd->flush)) + { + /* + * Set the lsn first so that when we wake backends they will + * release up to this location. + */ + queue->lsn = MyWalSnd->flush; + numprocs = SyncRepWakeQueue(); + } + SpinLockRelease(&queue->qlock); + + elog(DEBUG3, "released %d procs up to %X/%X", + numprocs, + MyWalSnd->flush.xlogid, + MyWalSnd->flush.xrecoff); + } + } + + /* + * Walk queue from head setting the latches of any procs that need + * to be woken. We don't modify the queue, we leave that for individual + * procs to release themselves. + * + * Must hold spinlock on queue. + */ + static int + SyncRepWakeQueue(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + PGPROC *proc = queue->head; + int numprocs = 0; + + /* fast exit for empty queue */ + if (proc == NULL) + return 0; + + for (; proc != NULL; proc = proc->lwWaitLink) + { + /* + * Assume the queue is ordered by LSN + */ + if (XLByteLT(queue->lsn, proc->waitLSN)) + return numprocs; + + numprocs++; + SetLatch(&proc->waitLatch); + } + + return numprocs; + } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb99246..da4dea7 100644 *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 66,72 **** WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ ! static WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ --- 66,72 ---- WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ ! WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ *************** WalSenderMain(void) *** 174,179 **** --- 174,181 ---- SpinLockRelease(&walsnd->mutex); } + SyncRepInitConfig(); + /* Main loop of walsender */ return WalSndLoop(); } *************** StartReplication(StartReplicationCmd * c *** 379,384 **** --- 381,396 ---- */ WalSndSetState(WALSNDSTATE_CATCHUP); + /* + * When we first start replication the standby will be behind the primary. + * For some applications, for example, synchronous replication, it is + * important to have a clear state for this initial catchup mode, so we + * can trigger actions when we change streaming state later. We may stay + * in this state for a long time, which is exactly why we want to be + * able to monitor whether or not we are still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); + /* Send a CopyBothResponse message, and start streaming */ pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); *************** ProcessStandbyReplyMessage(void) *** 584,589 **** --- 596,603 ---- walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } + + SyncRepReleaseWaiters(); } /* *************** WalSndLoop(void) *** 700,705 **** --- 714,720 ---- { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); } /* *************** WalSndLoop(void) *** 771,777 **** --- 786,797 ---- * that point might wait for some time. */ if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup) + { + ereport(DEBUG1, + (errmsg("standby \"%s\" has now caught up with primary", + application_name))); WalSndSetState(WALSNDSTATE_STREAMING); + } ProcessRepliesIfAny(); } *************** WalSndShmemInit(void) *** 1232,1237 **** --- 1252,1258 ---- WalSndCtl = (WalSndCtlData *) ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); + SpinLockInit(&WalSndCtl->ctlmutex); if (!found) { diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index afaf599..62d1d6b 100644 *** a/src/backend/storage/lmgr/proc.c --- b/src/backend/storage/lmgr/proc.c *************** *** 39,44 **** --- 39,45 ---- #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" + #include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" *************** InitProcGlobal(void) *** 196,201 **** --- 197,203 ---- PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* *************** InitProcGlobal(void) *** 214,219 **** --- 216,222 ---- PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* *************** InitProcGlobal(void) *** 224,229 **** --- 227,233 ---- { AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */ PGSemaphoreCreate(&(AuxiliaryProcs[i].sem)); + InitSharedLatch(&procs[i].waitLatch); } /* Create ProcStructLock spinlock, too */ *************** InitProcess(void) *** 326,331 **** --- 330,342 ---- SHMQueueInit(&(MyProc->myProcLocks[i])); MyProc->recoveryConflictPending = false; + /* Initialise the waitLSN for sync rep */ + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + + OwnLatch((Latch *) &MyProc->waitLatch); + MyProc->ownLatch = true; + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly *************** InitProcessPhase2(void) *** 365,370 **** --- 376,382 ---- /* * Arrange to clean that up at backend exit. */ + on_shmem_exit(SyncRepCleanupAtProcExit, 0); on_shmem_exit(RemoveProcFromArray, 0); } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 55cbf75..16dced1 100644 *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 55,60 **** --- 55,61 ---- #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" + #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" *************** static struct config_bool ConfigureNames *** 754,759 **** --- 755,768 ---- true, NULL, NULL }, { + {"synchronous_replication", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Requests synchronous replication."), + NULL + }, + &sync_rep_mode, + false, NULL, NULL + }, + { {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS, gettext_noop("Continues processing past damaged page headers."), gettext_noop("Detection of a damaged page header normally causes PostgreSQL to " *************** static struct config_bool ConfigureNames *** 1270,1275 **** --- 1279,1294 ---- }, { + {"allow_standalone_primary", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("Allow users to proceed without waiting if they request" + "synchronous replication and it is not available."), + NULL + }, + &allow_standalone_primary, + true, NULL, NULL + }, + + { {"hot_standby", PGC_POSTMASTER, WAL_STANDBY_SERVERS, gettext_noop("Allows connections and queries during recovery."), NULL *************** static struct config_int ConfigureNamesI *** 2161,2166 **** --- 2180,2195 ---- }, { + {"sync_replication_timeout_client", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Clients waiting for confirmation will timeout after this duration."), + NULL, + GUC_UNIT_S + }, + &sync_rep_timeout_client, + 120, -1, INT_MAX, NULL, NULL + }, + + { {"track_activity_query_size", PGC_POSTMASTER, RESOURCES_MEM, gettext_noop("Sets the size reserved for pg_stat_activity.current_query, in bytes."), NULL, *************** static struct config_string ConfigureNam *** 2717,2722 **** --- 2746,2761 ---- }, { + {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("List of potential standby names to synchronise with."), + NULL, + GUC_LIST_INPUT | GUC_IS_NAME + }, + &SyncRepStandbyNames, + "", NULL, NULL + }, + + { {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets default text search configuration."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6726733..314b36d 100644 *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 184,190 **** #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables ! # - Streaming Replication - #max_wal_senders = 0 # max number of walsender processes # (change requires restart) --- 184,198 ---- #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables ! # - Replication - User Settings ! ! #synchronous_replication = off # does commit wait for reply from standby ! #replication_timeout_client = 120 # -1 means wait forever ! ! # - Streaming Replication - Server Settings ! ! #allow_standalone_primary = on # sync rep parameter ! #synchronous_standby_names = '' # list of server names for sync rep #max_wal_senders = 0 # max number of walsender processes # (change requires restart) diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index ...faab968 . *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 0 **** --- 1,56 ---- + /*------------------------------------------------------------------------- + * + * syncrep.h + * Exports from replication/syncrep.c. + * + * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #ifndef _SYNCREP_H + #define _SYNCREP_H + + #include "access/xlog.h" + #include "storage/proc.h" + #include "storage/shmem.h" + #include "storage/spin.h" + + #define SyncRepRequested() (sync_rep_mode) + + /* + * Each synchronous rep queue lives in the WAL sender shmem area. + */ + typedef struct SyncRepQueue + { + /* + * Current location of the head of the queue. All waiters should have + * a waitLSN that follows this value, or they are currently being woken + * to remove themselves from the queue. + */ + XLogRecPtr lsn; + + PGPROC *head; + PGPROC *tail; + + slock_t qlock; /* locks shared variables shown above */ + } SyncRepQueue; + + /* user-settable parameters for synchronous replication */ + extern bool sync_rep_mode; + extern int sync_rep_timeout_client; + extern bool allow_standalone_primary; + extern char *SyncRepStandbyNames; + + /* called by user backend */ + extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); + + /* called by wal sender */ + extern void SyncRepInitConfig(void); + extern void SyncRepReleaseWaiters(void); + + /* callback at exit */ + extern void SyncRepCleanupAtProcExit(int code, Datum arg); + + #endif /* _SYNCREP_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 5843307..bd67622 100644 *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 15,20 **** --- 15,21 ---- #include "access/xlog.h" #include "nodes/nodes.h" #include "storage/latch.h" + #include "replication/syncrep.h" #include "storage/spin.h" *************** typedef struct WalSnd *** 52,62 **** --- 53,85 ---- * to do. */ Latch latch; + + /* + * Is this WALSender currently offering a sync replication service? + */ + bool sync_rep_service; + + /* + * Is this WALSender on the list of sync standbys? + */ + bool potential_sync_standby; } WalSnd; + extern WalSnd *MyWalSnd; + /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + /* + * Sync rep wait queue, which maintains the invariant that the + * individual queues are sorted on LSN. + */ + SyncRepQueue sync_rep_queue; + + bool sync_rep_service_available; + + slock_t ctlmutex; /* locks shared variables shown above */ + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 78dbade..27b57c8 100644 *** a/src/include/storage/proc.h --- b/src/include/storage/proc.h *************** *** 14,19 **** --- 14,21 ---- #ifndef _PROC_H_ #define _PROC_H_ + #include "access/xlog.h" + #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" #include "utils/timestamp.h" *************** struct PGPROC *** 115,120 **** --- 117,127 ---- LOCKMASK heldLocks; /* bitmask for lock types already held on this * lock object by this backend */ + /* Info to allow us to wait for synchronous replication, if needed. */ + Latch waitLatch; + XLogRecPtr waitLSN; /* waiting for this LSN or higher */ + bool ownLatch; /* do we own the above latch? */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of