*** a/doc/src/sgml/gist.sgml --- b/doc/src/sgml/gist.sgml *************** *** 709,741 **** my_distance(PG_FUNCTION_ARGS) - - Crash Recovery - - - Usually, replay of the WAL log is sufficient to restore the integrity - of a GiST index following a database crash. However, there are some - corner cases in which the index state is not fully rebuilt. The index - will still be functionally correct, but there might be some performance - degradation. When this occurs, the index can be repaired by - VACUUMing its table, or by rebuilding the index using - REINDEX. In some cases a plain VACUUM is - not sufficient, and either VACUUM FULL or REINDEX - is needed. The need for one of these procedures is indicated by occurrence - of this log message during crash recovery: - - LOG: index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery - - or this log message during routine index insertions: - - LOG: index "FOO" needs VACUUM or REINDEX to finish crash recovery - - If a plain VACUUM finds itself unable to complete recovery - fully, it will return a notice: - - NOTICE: index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery - - - - --- 709,712 ---- *** a/src/backend/access/gist/README --- b/src/backend/access/gist/README *************** *** 108,150 **** Penalty is used for choosing a subtree to insert; method PickSplit is used for the node splitting algorithm; method Union is used for propagating changes upward to maintain the tree properties. ! NOTICE: We modified original INSERT algorithm for performance reason. In ! particularly, it is now a single-pass algorithm. ! ! Function findLeaf is used to identify subtree for insertion. Page, in which ! insertion is proceeded, is locked as well as its parent page. Functions ! findParent and findPath are used to find parent pages, which could be changed ! because of concurrent access. Function pageSplit is recurrent and could split ! page by more than 2 pages, which could be necessary if keys have different ! lengths or more than one key are inserted (in such situation, user defined ! function pickSplit cannot guarantee free space on page). ! ! findLeaf(new-key) ! push(stack, [root, 0]) //page, LSN ! while(true) ! ptr = top of stack ! latch( ptr->page, S-mode ) ! ptr->lsn = ptr->page->lsn ! if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn ) ! unlatch( ptr->page ) ! pop stack ! else if ( ptr->page is not leaf ) ! push( stack, [get_best_child(ptr->page, new-key), 0] ) ! unlatch( ptr->page ) ! else ! unlatch( ptr->page ) ! latch( ptr->page, X-mode ) ! if ( ptr->page is not leaf ) ! //the only root page can become a non-leaf ! unlatch( ptr->page ) ! else if ( ptr->parent->lsn < ptr->page->nsn ) ! unlatch( ptr->page ) ! pop stack ! else ! return stack ! end ! end ! end findPath( stack item ) push stack, [root, 0, 0] // page, LSN, parent --- 108,178 ---- the node splitting algorithm; method Union is used for propagating changes upward to maintain the tree properties. ! To insert a tuple, we first have to find a suitable leaf page to insert to. ! The algorithm walks down the tree, starting from the root, along the path ! of smallest Penalty. At each step: ! ! 1. Has this page been split since we looked at the parent? If so, it's ! possible that we should be inserting to the other half instead, so retreat ! back to the parent. ! 2. If this is a leaf node, we've found our target node. ! 3. Otherwise use Penalty to pick a new target subtree. ! 4. Check the key representing the target subtree. If it doesn't already cover ! the key we're inserting, replace it with the Union of the old downlink key ! and the key being inserted. (Actually, we always call Union, and just skip ! the replacement if the Unioned key is the same as the existing key) ! 5. Replacing the key in step 4 might cause the page to be split. In that case, ! propagate the change upwards and restart the algorithm from the first parent ! that didn't need to be split. ! 6. Walk down to the target subtree, and goto 1. ! ! This differs from the insertion algorithm in the original paper. In the ! original paper, you first walk down the tree until you reach a leaf page, and ! then you adjust the downlink in the parent, and propagating the adjustment up, ! all the way up to the root in the worst case. But we adjust the downlinks to ! cover the new key already when we walk down, so that when we reach the leaf ! page, we don't need to update the parents anymore, except to insert the ! downlinks if we have to split the page. This makes crash recovery simpler: ! after inserting a key to the page, the tree is immediately self-consistent ! without having to update the parents. Even if we split a page and crash before ! inserting the downlink to the parent, the tree is self-consistent because the ! right half of the split is accessible via the rightlink of the left page ! (which replaced the original page). ! ! Note that the algorithm can walk up and down the tree before reaching a leaf ! page, if internal pages need to split while adjusting the downlinks for the ! new key. Eventually, you should reach the bottom, and proceed with the ! insertion of the new tuple. ! ! Once we've found the target page to insert to, we check if there's room ! for the new tuple. If there is, the tuple is inserted, and we're done. ! If it doesn't fit, however, the page needs to be split. Note that it is ! possible that a page needs to be split into more than two pages, if keys have ! different lengths or more than one key is being inserted at a time (which can ! happen when inserting downlinks for a page split that resulted in more than ! two pages at the lower level). After splitting a page, the parent page needs ! to be updated. The downlink for the new page needs to be inserted, and the ! downlink for the old page, which became the left half of the split, needs to ! be updated to only cover those tuples that stayed on the left page. Inserting ! the downlink in the parent can again lead to a page split, recursing up to the ! root page in the worst case. ! ! gistplacetopage is the workhorse function that performs one step of the ! insertion. If the tuple fits, it inserts it to the given page, otherwise ! it splits the page, and constructs the new downlink tuples for the split ! pages. The caller must then call gistplacetopage() on the parent page to ! insert the downlink tuples. The parent page that holds the downlink to ! the child might have migrated as a result of concurrent splits of the ! parent, gistfindCorrectParent() is used to find the parent page. ! ! Splitting the root page works slightly differently. At root split, ! gistplacetopage() allocates the new child pages and replaces the old root ! page with the new root containing downlinks to the new children, all in one ! operation. ! ! ! findPath is a subroutine of findParent, used when the correct parent page ! can't be found by following the rightlinks at the parent level: findPath( stack item ) push stack, [root, 0, 0] // page, LSN, parent *************** *** 165,170 **** findPath( stack item ) --- 193,203 ---- pop stack end + + gistFindCorrectParent is used to re-find the parent of a page during + insertion. It might have migrated to the right since we traversed down the + tree because of page splits. + findParent( stack item ) parent = item->parent latch( parent->page, X-mode ) *************** *** 184,189 **** findParent( stack item ) --- 217,225 ---- return findParent( item ) end + pageSplit function decides how to distribute keys to the new pages after + page split: + pageSplit(page, allkeys) (lkeys, rkeys) = pickSplit( allkeys ) if ( page is root ) *************** *** 204,242 **** pageSplit(page, allkeys) return newkeys - placetopage(page, keysarray) - if ( no space left on page ) - keysarray = pageSplit(page, [ extract_keys(page), keysarray]) - last page in chain gets old NSN, - original and others - new NSN equals to LSN - if ( page is root ) - make new root with keysarray - end - else - put keysarray on page - if ( length of keysarray > 1 ) - keysarray = [ union(keysarray) ] - end - end ! insert(new-key) ! stack = findLeaf(new-key) ! keysarray = [new-key] ! ptr = top of stack ! while(true) ! findParent( ptr ) //findParent latches parent page ! keysarray = placetopage(ptr->page, keysarray) ! unlatch( ptr->page ) ! pop stack; ! ptr = top of stack ! if (length of keysarray == 1) ! newboundingkey = union(oldboundingkey, keysarray) ! if (newboundingkey == oldboundingkey) ! unlatch ptr->page ! break loop ! end ! end ! end Authors: Teodor Sigaev --- 240,283 ---- return newkeys ! Concurrency control ! ------------------- ! As a rule of thumb, if you need to hold a lock on multiple pages at the ! same time, the locks should be acquired in the following order: child page ! before parent, and left-to-right at the same level. Always acquiring the ! locks in the same order avoids deadlocks. ! ! The search algorithm only looks at and locks one page at a time. Consequently ! there's a race condition between a search and a page split. A page split ! happens in two phases: 1. The page is split 2. The downlink is inserted to the ! parent. If a search looks at the parent page between those steps, before the ! downlink is inserted, it will still find the new right half by following the ! rightlink on the left half. But it must not follow the rightlink if it saw the ! downlink in the parent, or the page will be visited twice! ! ! A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan ! sees that flag set, it knows that the right page is missing the downlink, and ! should be visited too. When split inserts the downlink to the parent, it ! clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the ! child page header to match the LSN of the insertion on the parent. If the ! F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the ! LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page ! before the downlink was inserted, so it should follow the rightlink. Otherwise ! the scan saw the downlink in the parent page, and will/did follow that as ! usual. ! ! A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because ! a page split keeps the child pages locked until the downlink has been inserted ! to the parent and the flag cleared again. But if a crash happens in the middle ! of a page split, before the downlinks are inserted into the parent, that will ! leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine, ! but we'll eventually want to fix that for performance reasons. And more ! importantly, dealing with pages with missing downlink pointers in the parent ! would complicate the insertion algorithm. So when an insertion sees a page ! with F_FOLLOW_RIGHT set, it immediately tries to bring the split that ! crashed in the middle to completion by adding the downlink in the parent. ! Authors: Teodor Sigaev *** a/src/backend/access/gist/gist.c --- b/src/backend/access/gist/gist.c *************** *** 31,36 **** typedef struct --- 31,42 ---- MemoryContext tmpCtx; } GISTBuildState; + /* A List of these is used represent a split-in-progress. */ + typedef struct + { + Buffer buf; /* the split page "half" */ + IndexTuple downlink; /* downlink for this half. */ + } GISTPageSplitInfo; /* non-export function prototypes */ static void gistbuildCallback(Relation index, *************** *** 43,50 **** static void gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *GISTstate); ! static void gistfindleaf(GISTInsertState *state, ! GISTSTATE *giststate); #define ROTATEDIST(d) do { \ --- 49,61 ---- IndexTuple itup, Size freespace, GISTSTATE *GISTstate); ! static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate); ! static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, ! GISTSTATE *giststate, ! IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, ! Buffer leftchild); ! static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, ! GISTSTATE *giststate, List *splitinfo); #define ROTATEDIST(d) do { \ *************** *** 251,291 **** gistinsert(PG_FUNCTION_ARGS) /* ! * Workhouse routine for doing insertion into a GiST index. Note that ! * this routine assumes it is invoked in a short-lived memory context, ! * so it does not bother releasing palloc'd allocations. */ ! static void ! gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) { ! GISTInsertState state; ! ! memset(&state, 0, sizeof(GISTInsertState)); ! ! state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); ! state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); ! memcpy(state.itup[0], itup, IndexTupleSize(itup)); ! state.ituplen = 1; ! state.freespace = freespace; ! state.r = r; ! state.key = itup->t_tid; ! state.needInsertComplete = true; ! ! state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ! state.stack->blkno = GIST_ROOT_BLKNO; ! gistfindleaf(&state, giststate); ! gistmakedeal(&state, giststate); ! } ! static bool ! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) ! { ! bool is_splitted = false; ! bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; /* ! * if (!is_leaf) remove old key: This node's key has been modified, either * because a child split occurred or because we needed to adjust our key * for an insert in a child node. Therefore, remove the old version of * this node's key. --- 262,313 ---- /* ! * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple ! * at that offset is atomically removed along with inserting the new tuples. ! * This is used to replace a tuple with a new one. ! * ! * If 'leftchildbuf' is valid, we're inserting the downlink for the page ! * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'. ! * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set. ! * ! * If there is not enough room on the page, it is split. All the split ! * pages are kept pinned and locked and returned in *splitinfo, the caller ! * is responsible for inserting the downlinks for them. However, if ! * 'buffer' is the root page and it needs to be split, gistplacetopage() ! * performs the split as one atomic operation, and *splitinfo is set to NIL. ! * In that case, we continue to hold the root page locked, and the child ! * pages are released; note that new tuple(s) are *not* on the root page ! * but in one of the new child pages. */ ! static bool ! gistplacetopage(GISTInsertState *state, GISTSTATE *giststate, ! Buffer buffer, ! IndexTuple *itup, int ntup, OffsetNumber oldoffnum, ! Buffer leftchildbuf, ! List **splitinfo) { ! Page page = BufferGetPage(buffer); ! bool is_leaf = (GistPageIsLeaf(page)) ? true : false; ! XLogRecPtr recptr; ! int i; ! bool is_split; ! /* ! * Refuse to modify a page that's incompletely split. This should ! * not happen because we finish any incomplete splits while we walk ! * down the tree. However, it's remotely possible that another ! * concurrent inserter splits a parent page, and errors out before ! * completing the split. We will just throw an error in that case, ! * and leave any split we had in progress unfinished too. The next ! * insert that comes along will clean up the mess. ! */ ! if (GistFollowRight(page)) ! elog(ERROR, "concurrent GiST page split was incomplete"); ! *splitinfo = NIL; /* ! * if isupdate, remove old key: This node's key has been modified, either * because a child split occurred or because we needed to adjust our key * for an insert in a child node. Therefore, remove the old version of * this node's key. *************** *** 293,369 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * for WAL replay, in the non-split case we handle this by setting up a * one-element todelete array; in the split case, it's handled implicitly * because the tuple vector passed to gistSplit won't include this tuple. - * - * XXX: If we want to change fillfactors between node and leaf, fillfactor - * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor) */ ! if (gistnospace(state->stack->page, state->itup, state->ituplen, ! is_leaf ? InvalidOffsetNumber : state->stack->childoffnum, ! state->freespace)) { /* no space for insertion */ IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; ! BlockNumber rrlink = InvalidBlockNumber; ! GistNSN oldnsn; ! is_splitted = true; /* ! * Form index tuples vector to split: remove old tuple if t's needed ! * and add new tuples to vector */ ! itvec = gistextractpage(state->stack->page, &tlen); ! if (!is_leaf) { /* on inner page we should remove old tuple */ ! int pos = state->stack->childoffnum - FirstOffsetNumber; tlen--; if (pos != tlen) memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); } ! itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); ! dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate); ! ! state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen); ! state->ituplen = 0; ! if (state->stack->blkno != GIST_ROOT_BLKNO) { ! /* ! * if non-root split then we should not allocate new buffer, but ! * we must create temporary page to operate ! */ ! dist->buffer = state->stack->buffer; ! dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer)); /* clean all flags except F_LEAF */ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; } ! /* make new pages and fills them */ for (ptr = dist; ptr; ptr = ptr->next) { ! int i; ! char *data; ! /* get new page */ ! if (ptr->buffer == InvalidBuffer) { ! ptr->buffer = gistNewBuffer(state->r); ! GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); ! ptr->page = BufferGetPage(ptr->buffer); } ! ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); ! /* ! * fill page, we can do it because all these pages are new (ie not ! * linked in tree or masked by temp page ! */ ! data = (char *) (ptr->list); for (i = 0; i < ptr->block.num; i++) { if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) --- 315,448 ---- * for WAL replay, in the non-split case we handle this by setting up a * one-element todelete array; in the split case, it's handled implicitly * because the tuple vector passed to gistSplit won't include this tuple. */ ! is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace); ! if (is_split) { /* no space for insertion */ IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; ! BlockNumber oldrlink = InvalidBlockNumber; ! GistNSN oldnsn = { 0, 0 }; ! SplitedPageLayout rootpg; ! BlockNumber blkno = BufferGetBlockNumber(buffer); ! bool is_rootsplit; ! is_rootsplit = (blkno == GIST_ROOT_BLKNO); /* ! * Form index tuples vector to split. If we're replacing an old tuple, ! * remove the old version from the vector. */ ! itvec = gistextractpage(page, &tlen); ! if (OffsetNumberIsValid(oldoffnum)) { /* on inner page we should remove old tuple */ ! int pos = oldoffnum - FirstOffsetNumber; tlen--; if (pos != tlen) memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); } ! itvec = gistjoinvector(itvec, &tlen, itup, ntup); ! dist = gistSplit(state->r, page, itvec, tlen, giststate); ! /* ! * Set up pages to work with. Allocate new buffers for all but the ! * leftmost page. The original page becomes the new leftmost page, ! * and is just replaced with the new contents. ! * ! * For a root-split, allocate new buffers for all child pages, the ! * original page is overwritten with new root page containing ! * downlinks to the new child pages. ! */ ! ptr = dist; ! if (!is_rootsplit) { ! /* save old rightlink and NSN */ ! oldrlink = GistPageGetOpaque(page)->rightlink; ! oldnsn = GistPageGetOpaque(page)->nsn; ! ! dist->buffer = buffer; ! dist->block.blkno = BufferGetBlockNumber(buffer); ! dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer)); /* clean all flags except F_LEAF */ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; + + ptr = ptr->next; + } + for (; ptr; ptr = ptr->next) + { + /* Allocate new page */ + ptr->buffer = gistNewBuffer(state->r); + GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); + ptr->page = BufferGetPage(ptr->buffer); + ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); } ! /* ! * Now that we know whick blocks the new pages go to, set up downlink ! * tuples to point to them. ! */ for (ptr = dist; ptr; ptr = ptr->next) { ! ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); ! GistTupleSetValid(ptr->itup); ! } ! /* ! * If this is a root split, we construct the new root page with the ! * downlinks here directly, instead of requiring the caller to insert ! * them. Add the new root page to the list along with the child pages. ! */ ! if (is_rootsplit) ! { ! IndexTuple *downlinks; ! int ndownlinks = 0; ! int i; ! ! rootpg.buffer = buffer; ! rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer)); ! GistPageGetOpaque(rootpg.page)->flags = 0; ! ! /* Prepare a vector of all the downlinks */ ! for (ptr = dist; ptr; ptr = ptr->next) ! ndownlinks++; ! downlinks = palloc(sizeof(IndexTuple) * ndownlinks); ! for (i = 0, ptr = dist; ptr; ptr = ptr->next) ! downlinks[i++] = ptr->itup; ! ! rootpg.block.blkno = GIST_ROOT_BLKNO; ! rootpg.block.num = ndownlinks; ! rootpg.list = gistfillitupvec(downlinks, ndownlinks, ! &(rootpg.lenlist)); ! rootpg.itup = NULL; ! ! rootpg.next = dist; ! dist = &rootpg; ! } ! else ! { ! /* Prepare split-info to be returned to caller */ ! for (ptr = dist; ptr; ptr = ptr->next) { ! GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo)); ! si->buf = ptr->buffer; ! si->downlink = ptr->itup; ! *splitinfo = lappend(*splitinfo, si); } ! } ! /* ! * Fill all pages. All the pages are new, ie. freshly allocated empty ! * pages, or a temporary copy of the old page. ! */ ! for (ptr = dist; ptr; ptr = ptr->next) ! { ! char *data = (char *) (ptr->list); for (i = 0; i < ptr->block.num; i++) { if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) *************** *** 371,646 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) data += IndexTupleSize((IndexTuple) data); } ! /* set up ItemPointer and remember it for parent */ ! ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); ! state->itup[state->ituplen] = ptr->itup; ! state->ituplen++; ! } ! /* saves old rightlink */ ! if (state->stack->blkno != GIST_ROOT_BLKNO) ! rrlink = GistPageGetOpaque(dist->page)->rightlink; START_CRIT_SECTION(); /* ! * must mark buffers dirty before XLogInsert, even though we'll still ! * be changing their opaque fields below. set up right links. */ for (ptr = dist; ptr; ptr = ptr->next) - { MarkBufferDirty(ptr->buffer); ! GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ? ! ptr->next->block.blkno : rrlink; ! } ! /* restore splitted non-root page */ ! if (state->stack->blkno != GIST_ROOT_BLKNO) ! { ! PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); ! dist->page = BufferGetPage(dist->buffer); ! } if (RelationNeedsWAL(state->r)) ! { ! XLogRecPtr recptr; ! XLogRecData *rdata; ! ! rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, ! is_leaf, &(state->key), dist); ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ! ! for (ptr = dist; ptr; ptr = ptr->next) ! { ! PageSetLSN(ptr->page, recptr); ! PageSetTLI(ptr->page, ThisTimeLineID); ! } ! } else ! { ! for (ptr = dist; ptr; ptr = ptr->next) ! { ! PageSetLSN(ptr->page, GetXLogRecPtrForTemp()); ! } ! } ! ! /* set up NSN */ ! oldnsn = GistPageGetOpaque(dist->page)->nsn; ! if (state->stack->blkno == GIST_ROOT_BLKNO) ! /* if root split we should put initial value */ ! oldnsn = PageGetLSN(dist->page); for (ptr = dist; ptr; ptr = ptr->next) { ! /* only for last set oldnsn */ ! GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ? ! PageGetLSN(ptr->page) : oldnsn; } /* ! * release buffers, if it was a root split then release all buffers ! * because we create all buffers */ ! ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next; ! for (; ptr; ptr = ptr->next) ! UnlockReleaseBuffer(ptr->buffer); ! ! if (state->stack->blkno == GIST_ROOT_BLKNO) { ! gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); ! state->needInsertComplete = false; } - - END_CRIT_SECTION(); } else { ! /* enough space */ START_CRIT_SECTION(); ! if (!is_leaf) ! PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); ! gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); ! MarkBufferDirty(state->stack->buffer); if (RelationNeedsWAL(state->r)) { ! OffsetNumber noffs = 0, ! offs[1]; ! XLogRecPtr recptr; ! XLogRecData *rdata; ! if (!is_leaf) { ! /* only on inner page we should delete previous version */ ! offs[0] = state->stack->childoffnum; ! noffs = 1; } ! rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, ! offs, noffs, ! state->itup, state->ituplen, ! &(state->key)); ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); ! PageSetLSN(state->stack->page, recptr); ! PageSetTLI(state->stack->page, ThisTimeLineID); } else ! PageSetLSN(state->stack->page, GetXLogRecPtrForTemp()); ! ! if (state->stack->blkno == GIST_ROOT_BLKNO) ! state->needInsertComplete = false; ! END_CRIT_SECTION(); ! if (state->ituplen > 1) ! { /* previous is_splitted==true */ ! /* ! * child was splited, so we must form union for insertion in ! * parent ! */ ! IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); ! ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); ! state->itup[0] = newtup; ! state->ituplen = 1; ! } ! else if (is_leaf) ! { ! /* ! * itup[0] store key to adjust parent, we set it to valid to ! * correct check by GistTupleIsInvalid macro in gistgetadjusted() ! */ ! ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); ! GistTupleSetValid(state->itup[0]); ! } } ! return is_splitted; } /* ! * returns stack of pages, all pages in stack are pinned, and ! * leaf is X-locked */ - static void ! gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; ! GISTPageOpaque opaque; /* ! * walk down, We don't lock page for a long time, but so we should be ! * ready to recheck path in a bad case... We remember, that page->lsn ! * should never be invalid. */ for (;;) { ! if (XLogRecPtrIsInvalid(state->stack->lsn)) ! state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); ! LockBuffer(state->stack->buffer, GIST_SHARE); ! gistcheckpage(state->r, state->stack->buffer); ! state->stack->page = (Page) BufferGetPage(state->stack->buffer); ! opaque = GistPageGetOpaque(state->stack->page); ! state->stack->lsn = PageGetLSN(state->stack->page); ! Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn)); ! if (state->stack->blkno != GIST_ROOT_BLKNO && ! XLByteLT(state->stack->parent->lsn, opaque->nsn)) { /* ! * caused split non-root page is detected, go up to parent to ! * choose best child */ ! UnlockReleaseBuffer(state->stack->buffer); ! state->stack = state->stack->parent; continue; } ! if (!GistPageIsLeaf(state->stack->page)) { /* ! * This is an internal page, so continue to walk down the tree. We ! * find the child node that has the minimum insertion penalty and ! * recursively invoke ourselves to modify that node. Once the ! * recursive call returns, we may need to adjust the parent node ! * for two reasons: the child node split, or the key in this node ! * needs to be adjusted for the newly inserted key below us. */ ! GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ! ! state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); ! iid = PageGetItemId(state->stack->page, state->stack->childoffnum); ! idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); ! item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! item->parent = state->stack; ! item->child = NULL; ! if (state->stack) ! state->stack->child = item; ! state->stack = item; ! } ! else ! { ! /* be carefull, during unlock/lock page may be changed... */ ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! LockBuffer(state->stack->buffer, GIST_EXCLUSIVE); ! state->stack->page = (Page) BufferGetPage(state->stack->buffer); ! opaque = GistPageGetOpaque(state->stack->page); ! if (state->stack->blkno == GIST_ROOT_BLKNO) { /* ! * the only page can become inner instead of leaf is a root ! * page, so for root we should recheck it */ ! if (!GistPageIsLeaf(state->stack->page)) { ! /* ! * very rarely situation: during unlock/lock index with ! * number of pages = 1 was increased ! */ ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! continue; ! } /* ! * we don't need to check root split, because checking ! * leaf/inner is enough to recognize split for root */ ! } ! else if (XLByteLT(state->stack->parent->lsn, opaque->nsn)) { ! /* ! * detecting split during unlock/lock, so we should find ! * better child on parent ! */ ! /* forget buffer */ ! UnlockReleaseBuffer(state->stack->buffer); ! state->stack = state->stack->parent; ! continue; } ! state->stack->lsn = PageGetLSN(state->stack->page); ! /* ok we found a leaf page and it X-locked */ break; } } - - /* now state->stack->(page, buffer and blkno) points to leaf page */ } /* --- 450,830 ---- data += IndexTupleSize((IndexTuple) data); } ! /* Set up rightlinks */ ! if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO) ! GistPageGetOpaque(ptr->page)->rightlink = ! ptr->next->block.blkno; ! else ! GistPageGetOpaque(ptr->page)->rightlink = oldrlink; ! ! if (ptr->next && !is_rootsplit) ! GistMarkFollowRight(ptr->page); ! else ! GistClearFollowRight(ptr->page); ! /* ! * Copy the NSN of the original page to all pages. The ! * F_FOLLOW_RIGHT flags ensure that scans will follow the ! * rightlinks until the downlinks are inserted. ! */ ! GistPageGetOpaque(ptr->page)->nsn = oldnsn; ! } START_CRIT_SECTION(); /* ! * Must mark buffers dirty before XLogInsert, even though we'll still ! * be changing their opaque fields below. */ for (ptr = dist; ptr; ptr = ptr->next) MarkBufferDirty(ptr->buffer); ! if (BufferIsValid(leftchildbuf)) ! MarkBufferDirty(leftchildbuf); ! /* ! * The first page in the chain was a temporary working copy meant ! * to replace the old page. Copy it over the old page. ! */ ! PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); ! dist->page = BufferGetPage(dist->buffer); + /* Write the WAL record */ if (RelationNeedsWAL(state->r)) ! recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf, ! dist, oldrlink, oldnsn, leftchildbuf); else ! recptr = GetXLogRecPtrForTemp(); for (ptr = dist; ptr; ptr = ptr->next) { ! PageSetLSN(ptr->page, recptr); ! PageSetTLI(ptr->page, ThisTimeLineID); } /* ! * Return the new child buffers to the caller. ! * ! * If this was a root split, we've already inserted the downlink ! * pointers, in the form of a new root page. Therefore we can ! * release all the new buffers, and keep just the root page locked. */ ! if (is_rootsplit) { ! for (ptr = dist->next; ptr; ptr = ptr->next) ! UnlockReleaseBuffer(ptr->buffer); } } else { ! /* ! * Enough space. We also get here if ntuples==0. ! */ START_CRIT_SECTION(); ! if (OffsetNumberIsValid(oldoffnum)) ! PageIndexTupleDelete(page, oldoffnum); ! gistfillbuffer(page, itup, ntup, InvalidOffsetNumber); ! MarkBufferDirty(buffer); ! ! if (BufferIsValid(leftchildbuf)) ! MarkBufferDirty(leftchildbuf); if (RelationNeedsWAL(state->r)) { ! OffsetNumber ndeloffs = 0, ! deloffs[1]; ! if (OffsetNumberIsValid(oldoffnum)) { ! deloffs[0] = oldoffnum; ! ndeloffs = 1; } ! recptr = gistXLogUpdate(state->r->rd_node, buffer, ! deloffs, ndeloffs, itup, ntup, ! leftchildbuf); ! PageSetLSN(page, recptr); ! PageSetTLI(page, ThisTimeLineID); } else ! { ! recptr = GetXLogRecPtrForTemp(); ! PageSetLSN(page, recptr); ! } ! *splitinfo = NIL; ! } ! /* ! * If we inserted the downlink for a child page, set NSN and clear ! * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know ! * to follow the rightlink if and only if they looked at the parent page ! * before we inserted the downlink. ! * ! * Note that we do this *after* writing the WAL record. That means that ! * the possible full page image in the WAL record does not include ! * these changes, and they must be replayed even if the page is restored ! * from the full page image. There's a chicken-and-egg problem: if we ! * updated the child pages first, we wouldn't know the recptr of the WAL ! * record we're about to write. ! */ ! if (BufferIsValid(leftchildbuf)) ! { ! Page leftpg = BufferGetPage(leftchildbuf); ! GistPageGetOpaque(leftpg)->nsn = recptr; ! GistClearFollowRight(leftpg); ! PageSetLSN(leftpg, recptr); ! PageSetTLI(leftpg, ThisTimeLineID); } ! ! END_CRIT_SECTION(); ! ! return is_split; } /* ! * Workhouse routine for doing insertion into a GiST index. Note that ! * this routine assumes it is invoked in a short-lived memory context, ! * so it does not bother releasing palloc'd allocations. */ static void ! gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; ! GISTInsertStack firststack; ! GISTInsertStack *stack; ! GISTInsertState state; ! bool xlocked = false; ! ! memset(&state, 0, sizeof(GISTInsertState)); ! state.freespace = freespace; ! state.r = r; ! ! /* Start from the root */ ! firststack.blkno = GIST_ROOT_BLKNO; ! firststack.lsn.xrecoff = 0; ! firststack.parent = NULL; ! state.stack = stack = &firststack; /* ! * Walk down along the path of smallest penalty, updating the parent ! * pointers with the key we're inserting as we go. If we crash in the ! * middle, the tree is consistent, although the possible parent updates ! * were a waste. */ for (;;) { ! if (XLogRecPtrIsInvalid(stack->lsn)) ! stack->buffer = ReadBuffer(state.r, stack->blkno); ! ! /* ! * Be optimistic and grab shared lock first. Swap it for an ! * exclusive lock later if we need to update the page. ! */ ! if (!xlocked) ! { ! LockBuffer(stack->buffer, GIST_SHARE); ! gistcheckpage(state.r, stack->buffer); ! } ! ! stack->page = (Page) BufferGetPage(stack->buffer); ! stack->lsn = PageGetLSN(stack->page); ! Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn)); ! /* ! * If this page was split but the downlink was never inserted to ! * the parent because the inserting backend crashed before doing ! * that, fix that now. ! */ ! if (GistFollowRight(stack->page)) ! { ! if (!xlocked) ! { ! LockBuffer(stack->buffer, GIST_UNLOCK); ! LockBuffer(stack->buffer, GIST_EXCLUSIVE); ! xlocked = true; ! /* someone might've completed the split when we unlocked */ ! if (!GistFollowRight(stack->page)) ! continue; ! } ! gistfixsplit(&state, giststate); ! UnlockReleaseBuffer(stack->buffer); ! xlocked = false; ! state.stack = stack = stack->parent; ! continue; ! } ! if (stack->blkno != GIST_ROOT_BLKNO && ! XLByteLT(stack->parent->lsn, ! GistPageGetOpaque(stack->page)->nsn)) { /* ! * Concurrent split detected. There's no guarantee that the ! * downlink for this page is consistent with the tuple we're ! * inserting anymore, so go back to parent and rechoose the ! * best child. */ ! UnlockReleaseBuffer(stack->buffer); ! xlocked = false; ! state.stack = stack = stack->parent; continue; } ! if (!GistPageIsLeaf(stack->page)) { /* ! * This is an internal page so continue to walk down the tree. ! * Find the child node that has the minimum insertion penalty. */ ! BlockNumber childblkno; ! IndexTuple newtup; ! GISTInsertStack *item; ! stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate); ! iid = PageGetItemId(stack->page, stack->childoffnum); ! idxtuple = (IndexTuple) PageGetItem(stack->page, iid); ! childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); ! /* ! * Check that it's not a leftover invalid tuple from pre-9.1 ! */ ! if (GistTupleIsInvalid(idxtuple)) ! ereport(ERROR, ! (errmsg("index \"%s\" contains an inner tuple marked as invalid", ! RelationGetRelationName(r)), ! errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), ! errhint("Please REINDEX it."))); ! /* ! * Check that the key representing the target child node is ! * consistent with the key we're inserting. Update it if it's not. ! */ ! newtup = gistgetadjusted(state.r, idxtuple, itup, giststate); ! if (newtup) { /* ! * Swap shared lock for an exclusive one. Beware, the page ! * may change while we unlock/lock the page... */ ! if (!xlocked) { ! LockBuffer(stack->buffer, GIST_UNLOCK); ! LockBuffer(stack->buffer, GIST_EXCLUSIVE); ! xlocked = true; ! stack->page = (Page) BufferGetPage(stack->buffer); + if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn)) + { + /* the page was changed while we unlocked it, retry */ + continue; + } + } /* ! * Update the tuple. ! * ! * gistinserthere() might have to split the page to make the ! * updated tuple fit. It will adjust the stack so that after ! * the call, we'll be holding a lock on the page containing ! * the tuple, which might have moved right. ! * ! * Except if this causes a root split, gistinserthere() ! * returns 'true'. In that case, stack only holds the new ! * root, and the child page was released. Have to start ! * all over. */ ! if (gistinserttuples(&state, stack, giststate, &newtup, 1, ! stack->childoffnum, InvalidBuffer)) ! { ! UnlockReleaseBuffer(stack->buffer); ! xlocked = false; ! state.stack = stack = stack->parent; ! continue; ! } } ! LockBuffer(stack->buffer, GIST_UNLOCK); ! xlocked = false; ! ! /* descend to the chosen child */ ! item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); ! item->blkno = childblkno; ! item->parent = stack; ! state.stack = stack = item; ! } ! else ! { ! /* ! * Leaf page. Insert the new key. We've already updated all the ! * parents on the way down, but we might have to split the page ! * if it doesn't fit. gistinserthere() will take care of that. ! */ ! ! /* ! * Swap shared lock for an exclusive one. Be careful, the page ! * may change while we unlock/lock the page... ! */ ! if (!xlocked) { ! LockBuffer(stack->buffer, GIST_UNLOCK); ! LockBuffer(stack->buffer, GIST_EXCLUSIVE); ! xlocked = true; ! stack->page = (Page) BufferGetPage(stack->buffer); ! stack->lsn = PageGetLSN(stack->page); ! if (stack->blkno == GIST_ROOT_BLKNO) ! { ! /* ! * the only page that can become inner instead of leaf ! * is the root page, so for root we should recheck it ! */ ! if (!GistPageIsLeaf(stack->page)) ! { ! /* ! * very rare situation: during unlock/lock index with ! * number of pages = 1 was increased ! */ ! LockBuffer(stack->buffer, GIST_UNLOCK); ! xlocked = false; ! continue; ! } ! /* ! * we don't need to check root split, because checking ! * leaf/inner is enough to recognize split for root ! */ ! } ! else if (GistFollowRight(stack->page) || ! XLByteLT(stack->parent->lsn, ! GistPageGetOpaque(stack->page)->nsn)) ! { ! /* ! * The page was split while we momentarily unlocked the ! * page. Go back to parent. ! */ ! UnlockReleaseBuffer(stack->buffer); ! xlocked = false; ! state.stack = stack = stack->parent; ! continue; ! } } ! /* now state.stack->(page, buffer and blkno) points to leaf page */ ! gistinserttuples(&state, stack, giststate, &itup, 1, ! InvalidOffsetNumber, InvalidBuffer); ! LockBuffer(stack->buffer, GIST_UNLOCK); ! ! /* Release any pins we might still hold before exiting */ ! for (; stack; stack = stack->parent) ! ReleaseBuffer(stack->buffer); break; } } } /* *************** *** 648,654 **** gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) * * returns from the beginning of closest parent; * ! * To prevent deadlocks, this should lock only one page simultaneously. */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child) --- 832,838 ---- * * returns from the beginning of closest parent; * ! * To prevent deadlocks, this should lock only one page at a time. */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child) *************** *** 683,688 **** gistFindPath(Relation r, BlockNumber child) --- 867,879 ---- top->lsn = PageGetLSN(page); + /* + * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a + * downlink. This should not normally happen.. + */ + if (GistFollowRight(page)) + elog(ERROR, "concurrent GiST page split was incomplete"); + if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) && GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ ) { *************** *** 711,718 **** gistFindPath(Relation r, BlockNumber child) ptr = top; while (ptr->parent) { - /* set child link */ - ptr->parent->child = ptr; /* move childoffnum.. */ if (ptr == top) { --- 902,907 ---- *************** *** 754,770 **** gistFindPath(Relation r, BlockNumber child) return NULL; } - /* ! * Returns X-locked parent of stack page */ - static void gistFindCorrectParent(Relation r, GISTInsertStack *child) { GISTInsertStack *parent = child->parent; - LockBuffer(parent->buffer, GIST_EXCLUSIVE); gistcheckpage(r, parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer); --- 943,958 ---- return NULL; } /* ! * Updates the stack so that child->parent is the correct parent of the ! * child. child->parent must be exclusively locked on entry, and will ! * remain so at exit, but it might not be the same page anymore. */ static void gistFindCorrectParent(Relation r, GISTInsertStack *child) { GISTInsertStack *parent = child->parent; gistcheckpage(r, parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer); *************** *** 836,918 **** gistFindCorrectParent(Relation r, GISTInsertStack *child) /* install new chain of parents to stack */ child->parent = parent; - parent->child = child; /* make recursive call to normal processing */ gistFindCorrectParent(r, child); } return; } ! void ! gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { ! int is_splitted; ! ItemId iid; ! IndexTuple oldtup, ! newtup; ! /* walk up */ ! while (true) { ! /* ! * After this call: 1. if child page was splited, then itup contains ! * keys for each page 2. if child page wasn't splited, then itup ! * contains additional for adjustment of current key ! */ ! ! if (state->stack->parent) { ! /* ! * X-lock parent page before proceed child, gistFindCorrectParent ! * should find and lock it ! */ ! gistFindCorrectParent(state->r, state->stack); } ! is_splitted = gistplacetopage(state, giststate); ! /* parent locked above, so release child buffer */ ! UnlockReleaseBuffer(state->stack->buffer); ! /* pop parent page from stack */ ! state->stack = state->stack->parent; - /* stack is void */ - if (!state->stack) - break; ! /* ! * child did not split, so we can check is it needed to update parent ! * tuple ! */ ! if (!is_splitted) ! { ! /* parent's tuple */ ! iid = PageGetItemId(state->stack->page, state->stack->childoffnum); ! oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); ! newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); ! ! if (!newtup) ! { /* not need to update key */ ! LockBuffer(state->stack->buffer, GIST_UNLOCK); ! break; ! } ! state->itup[0] = newtup; } ! } /* while */ ! /* release all parent buffers */ ! while (state->stack) { ! ReleaseBuffer(state->stack->buffer); ! state->stack = state->stack->parent; } ! /* say to xlog that insert is completed */ ! if (state->needInsertComplete && RelationNeedsWAL(state->r)) ! gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); } /* --- 1024,1254 ---- /* install new chain of parents to stack */ child->parent = parent; /* make recursive call to normal processing */ + LockBuffer(child->parent->buffer, GIST_EXCLUSIVE); gistFindCorrectParent(r, child); } return; } ! /* ! * Form a downlink pointer for the page in 'buf'. ! */ ! static IndexTuple ! gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate, ! GISTInsertStack *stack) { ! Page page = BufferGetPage(buf); ! OffsetNumber maxoff; ! OffsetNumber offset; ! IndexTuple downlink = NULL; ! maxoff = PageGetMaxOffsetNumber(page); ! for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset)) { ! IndexTuple ituple = (IndexTuple) ! PageGetItem(page, PageGetItemId(page, offset)); ! if (downlink == NULL) ! downlink = CopyIndexTuple(ituple); ! else { ! IndexTuple newdownlink; ! newdownlink = gistgetadjusted(rel, downlink, ituple, ! giststate); ! if (newdownlink) ! downlink = newdownlink; } ! } ! ! /* ! * If the page is completely empty, we can't form a meaningful ! * downlink for it. But we have to insert a downlink for the page. ! * Any key will do, as long as its consistent with the downlink of ! * parent page, so that we can legally insert it to the parent. ! * A minimal one that matches as few scans as possible would be best, ! * to keep scans from doing useless work, but we don't know how to ! * construct that. So we just use the downlink of the original page ! * that was split - that's as far from optimal as it can get but will ! * do.. ! */ ! if (!downlink) ! { ! ItemId iid; ! ! LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); ! gistFindCorrectParent(rel, stack); ! iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum); ! downlink = (IndexTuple) PageGetItem(stack->parent->page, iid); ! downlink = CopyIndexTuple(downlink); ! LockBuffer(stack->parent->buffer, GIST_UNLOCK); ! } ! ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf)); ! GistTupleSetValid(downlink); ! return downlink; ! } ! /* ! * Complete the incomplete split of state->stack->page. ! */ ! static void ! gistfixsplit(GISTInsertState *state, GISTSTATE *giststate) ! { ! GISTInsertStack *stack = state->stack; ! Buffer buf; ! Page page; ! List *splitinfo = NIL; ! ! elog(LOG, "fixing incomplete split in index \"%s\", block %u", ! RelationGetRelationName(state->r), stack->blkno); ! Assert(GistFollowRight(stack->page)); ! Assert(OffsetNumberIsValid(stack->parent->childoffnum)); ! ! buf = stack->buffer; ! ! /* ! * Read the chain of split pages, following the rightlinks. Construct ! * a downlink tuple for each page. ! */ ! for (;;) ! { ! GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo)); ! IndexTuple downlink; ! ! page = BufferGetPage(buf); ! ! /* Form the new downlink tuples to insert to parent */ ! downlink = gistformdownlink(state->r, buf, giststate, stack); ! ! si->buf = buf; ! si->downlink = downlink; ! ! splitinfo = lappend(splitinfo, si); ! ! if (GistFollowRight(page)) ! { ! /* lock next page */ ! buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink); ! LockBuffer(buf, GIST_EXCLUSIVE); } ! else ! break; ! } ! ! /* Insert the downlinks */ ! gistfinishsplit(state, stack, giststate, splitinfo); ! } ! ! /* ! * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples ! * replace an old tuple at oldoffnum. The caller must hold an exclusive lock ! * on the page. ! * ! * If leftchild is valid, we're inserting/updating the downlink for the ! * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and ! * update NSN on leftchild, atomically with the insertion of the downlink. ! * ! * Returns 'true' if the page had to be split. On return, we will continue ! * to hold an exclusive lock on state->stack->buffer, but if we had to split ! * the page, it might not contain the tuple we just inserted/updated. ! */ ! static bool ! gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, ! GISTSTATE *giststate, ! IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, ! Buffer leftchild) ! { ! List *splitinfo; ! bool is_split; ! ! is_split = gistplacetopage(state, giststate, stack->buffer, ! tuples, ntup, oldoffnum, ! leftchild, ! &splitinfo); ! if (splitinfo) ! gistfinishsplit(state, stack, giststate, splitinfo); ! ! return is_split; ! } ! ! /* ! * Finish an incomplete split by inserting/updating the downlinks in ! * parent page. 'splitinfo' contains all the child pages, exclusively-locked, ! * involved in the split, from left-to-right. ! */ ! static void ! gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, ! GISTSTATE *giststate, List *splitinfo) ! { ! ListCell *lc; ! List *reversed; ! GISTPageSplitInfo *right; ! GISTPageSplitInfo *left; ! IndexTuple tuples[2]; ! ! /* A split always contains at least two halves */ ! Assert(list_length(splitinfo) >= 2); ! ! /* ! * We need to insert downlinks for each new page, and update the ! * downlink for the original (leftmost) page in the split. Begin at ! * the rightmost page, inserting one downlink at a time until there's ! * only two pages left. Finally insert the downlink for the last new ! * page and update the downlink for the original page as one operation. ! */ ! /* for convenience, create a copy of the list in reverse order */ ! reversed = NIL; ! foreach(lc, splitinfo) { ! reversed = lcons(lfirst(lc), reversed); } ! LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); ! gistFindCorrectParent(state->r, stack); ! ! while(list_length(reversed) > 2) ! { ! right = (GISTPageSplitInfo *) linitial(reversed); ! left = (GISTPageSplitInfo *) lsecond(reversed); ! ! if (gistinserttuples(state, stack->parent, giststate, ! &right->downlink, 1, ! InvalidOffsetNumber, ! left->buf)) ! { ! /* ! * If the parent page was split, need to relocate the original ! * parent pointer. ! */ ! gistFindCorrectParent(state->r, stack); ! } ! UnlockReleaseBuffer(right->buf); ! reversed = list_delete_first(reversed); ! } ! ! right = (GISTPageSplitInfo *) linitial(reversed); ! left = (GISTPageSplitInfo *) lsecond(reversed); ! ! /* ! * Finally insert downlink for the remaining right page and update the ! * downlink for the original page to not contain the tuples that were ! * moved to the new pages. ! */ ! tuples[0] = left->downlink; ! tuples[1] = right->downlink; ! gistinserttuples(state, stack->parent, giststate, ! tuples, 2, ! stack->parent->childoffnum, ! left->buf); ! LockBuffer(stack->parent->buffer, GIST_UNLOCK); ! UnlockReleaseBuffer(right->buf); ! Assert(left->buf == stack->buffer); } /* *************** *** 963,970 **** gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nright; res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); ! res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false) ! : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) --- 1299,1305 ---- ROTATEDIST(res); res->block.num = v.splitVector.spl_nright; res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); ! res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false); } if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) *************** *** 986,1036 **** gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nleft; res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); ! res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false) ! : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } return res; } /* - * buffer must be pinned and locked by caller - */ - void - gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) - { - Page page; - - Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); - page = BufferGetPage(buffer); - - START_CRIT_SECTION(); - - GISTInitBuffer(buffer, 0); - gistfillbuffer(page, itup, len, FirstOffsetNumber); - - MarkBufferDirty(buffer); - - if (RelationNeedsWAL(r)) - { - XLogRecPtr recptr; - XLogRecData *rdata; - - rdata = formUpdateRdata(r->rd_node, buffer, - NULL, 0, - itup, len, key); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - } - else - PageSetLSN(page, GetXLogRecPtrForTemp()); - - END_CRIT_SECTION(); - } - - /* * Fill a GISTSTATE with information about the index */ void --- 1321,1333 ---- ROTATEDIST(res); res->block.num = v.splitVector.spl_nleft; res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); ! res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false); } return res; } /* * Fill a GISTSTATE with information about the index */ void *** a/src/backend/access/gist/gistget.c --- b/src/backend/access/gist/gistget.c *************** *** 254,262 **** gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances, page = BufferGetPage(buffer); opaque = GistPageGetOpaque(page); ! /* check if page split occurred since visit to parent */ if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) && ! XLByteLT(pageItem->data.parentlsn, opaque->nsn) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* There was a page split, follow right link to add pages */ --- 254,268 ---- page = BufferGetPage(buffer); opaque = GistPageGetOpaque(page); ! /* ! * Check if we need to follow the rightlink. We need to follow it if the ! * page was concurrently split since we visited the parent (in which case ! * parentlsn < nsn), or if the the system crashed after a page split but ! * before the downlink was inserted into the parent. ! */ if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) && ! (GistFollowRight(page) || ! XLByteLT(pageItem->data.parentlsn, opaque->nsn)) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* There was a page split, follow right link to add pages */ *** a/src/backend/access/gist/gistsplit.c --- b/src/backend/access/gist/gistsplit.c *************** *** 500,557 **** gistSplitHalf(GIST_SPLITVEC *v, int len) } /* - * if it was invalid tuple then we need special processing. - * We move all invalid tuples on right page. - * - * if there is no place on left page, gistSplit will be called one more - * time for left page. - * - * Normally, we never exec this code, but after crash replay it's possible - * to get 'invalid' tuples (probability is low enough) - */ - static void - gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len) - { - int i; - static OffsetNumber offInvTuples[MaxOffsetNumber]; - int nOffInvTuples = 0; - - for (i = 1; i <= len; i++) - if (GistTupleIsInvalid(itup[i - 1])) - offInvTuples[nOffInvTuples++] = i; - - if (nOffInvTuples == len) - { - /* corner case, all tuples are invalid */ - v->spl_rightvalid = v->spl_leftvalid = false; - gistSplitHalf(&v->splitVector, len); - } - else - { - GistSplitUnion gsvp; - - v->splitVector.spl_right = offInvTuples; - v->splitVector.spl_nright = nOffInvTuples; - v->spl_rightvalid = false; - - v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - v->splitVector.spl_nleft = 0; - for (i = 1; i <= len; i++) - if (!GistTupleIsInvalid(itup[i - 1])) - v->splitVector.spl_left[v->splitVector.spl_nleft++] = i; - v->spl_leftvalid = true; - - gsvp.equiv = NULL; - gsvp.attr = v->spl_lattr; - gsvp.len = v->splitVector.spl_nleft; - gsvp.entries = v->splitVector.spl_left; - gsvp.isnull = v->spl_lisnull; - - gistunionsubkeyvec(giststate, itup, &gsvp, 0); - } - } - - /* * trys to split page by attno key, in a case of null * values move its to separate page. */ --- 500,505 ---- *************** *** 568,579 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist Datum datum; bool IsNull; - if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) - { - gistSplitByInvalid(giststate, v, itup, len); - return; - } - datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, attno, &(entryvec->vector[i]), datum, r, page, i, --- 516,521 ---- *************** *** 582,589 **** gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist offNullTuples[nOffNullTuples++] = i; } - v->spl_leftvalid = v->spl_rightvalid = true; - if (nOffNullTuples == len) { /* --- 524,529 ---- *** a/src/backend/access/gist/gistutil.c --- b/src/backend/access/gist/gistutil.c *************** *** 152,158 **** gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) * invalid tuple. Resulting Datums aren't compressed. */ ! bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull) { --- 152,158 ---- * invalid tuple. Resulting Datums aren't compressed. */ ! void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull) { *************** *** 180,189 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke Datum datum; bool IsNull; - if (GistTupleIsInvalid(itvec[j])) - return FALSE; /* signals that union with invalid tuple => - * result is invalid */ - datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; --- 180,185 ---- *************** *** 218,225 **** gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke isnull[i] = FALSE; } } - - return TRUE; } /* --- 214,219 ---- *************** *** 231,238 **** gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); ! if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS)) ! return gist_form_invalid_tuple(InvalidBlockNumber); return gistFormTuple(giststate, r, attrS, isnullS, false); } --- 225,231 ---- { memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); ! gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS); return gistFormTuple(giststate, r, attrS, isnullS, false); } *************** *** 328,336 **** gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis IndexTuple newtup = NULL; int i; - if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup)) - return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid))); - gistDeCompressAtt(giststate, r, oldtup, NULL, (OffsetNumber) 0, oldentries, oldisnull); --- 321,326 ---- *************** *** 401,414 **** gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ int j; IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup)) - { - ereport(LOG, - (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery", - RelationGetRelationName(r)))); - continue; - } - sum_grow = 0; for (j = 0; j < r->rd_att->natts; j++) { --- 391,396 ---- *************** *** 521,527 **** gistFormTuple(GISTSTATE *giststate, Relation r, } res = index_form_tuple(giststate->tupdesc, compatt, isnull); ! GistTupleSetValid(res); return res; } --- 503,513 ---- } res = index_form_tuple(giststate->tupdesc, compatt, isnull); ! /* ! * The offset number on tuples on internal pages is unused. For historical ! * reasons, it is set 0xffff. ! */ ! ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff); return res; } *** a/src/backend/access/gist/gistvacuum.c --- b/src/backend/access/gist/gistvacuum.c *************** *** 26,38 **** #include "utils/memutils.h" - typedef struct GistBulkDeleteResult - { - IndexBulkDeleteResult std; /* common state */ - bool needReindex; - } GistBulkDeleteResult; - - /* * VACUUM cleanup: update FSM */ --- 26,31 ---- *************** *** 40,46 **** Datum gistvacuumcleanup(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); Relation rel = info->index; BlockNumber npages, blkno; --- 33,39 ---- gistvacuumcleanup(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); Relation rel = info->index; BlockNumber npages, blkno; *************** *** 56,65 **** gistvacuumcleanup(PG_FUNCTION_ARGS) /* Set up all-zero stats if gistbulkdelete wasn't called */ if (stats == NULL) { ! stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); /* use heap's tuple count */ ! stats->std.num_index_tuples = info->num_heap_tuples; ! stats->std.estimated_count = info->estimated_count; /* * XXX the above is wrong if index is partial. Would it be OK to just --- 49,58 ---- /* Set up all-zero stats if gistbulkdelete wasn't called */ if (stats == NULL) { ! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* use heap's tuple count */ ! stats->num_index_tuples = info->num_heap_tuples; ! stats->estimated_count = info->estimated_count; /* * XXX the above is wrong if index is partial. Would it be OK to just *************** *** 67,77 **** gistvacuumcleanup(PG_FUNCTION_ARGS) */ } - if (stats->needReindex) - ereport(NOTICE, - (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery", - RelationGetRelationName(rel)))); - /* * Need lock unless it's local to this backend. */ --- 60,65 ---- *************** *** 112,121 **** gistvacuumcleanup(PG_FUNCTION_ARGS) IndexFreeSpaceMapVacuum(info->index); /* return statistics */ ! stats->std.pages_free = totFreePages; if (needLock) LockRelationForExtension(rel, ExclusiveLock); ! stats->std.num_pages = RelationGetNumberOfBlocks(rel); if (needLock) UnlockRelationForExtension(rel, ExclusiveLock); --- 100,109 ---- IndexFreeSpaceMapVacuum(info->index); /* return statistics */ ! stats->pages_free = totFreePages; if (needLock) LockRelationForExtension(rel, ExclusiveLock); ! stats->num_pages = RelationGetNumberOfBlocks(rel); if (needLock) UnlockRelationForExtension(rel, ExclusiveLock); *************** *** 135,141 **** pushStackIfSplited(Page page, GistBDItem *stack) GISTPageOpaque opaque = GistPageGetOpaque(page); if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && ! XLByteLT(stack->parentlsn, opaque->nsn) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* split page detected, install right link to the stack */ --- 123,129 ---- GISTPageOpaque opaque = GistPageGetOpaque(page); if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && ! (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* split page detected, install right link to the stack */ *************** *** 162,168 **** Datum gistbulkdelete(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); Relation rel = info->index; --- 150,156 ---- gistbulkdelete(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); ! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); Relation rel = info->index; *************** *** 171,180 **** gistbulkdelete(PG_FUNCTION_ARGS) /* first time through? */ if (stats == NULL) ! stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); /* we'll re-count the tuples each time */ ! stats->std.estimated_count = false; ! stats->std.num_index_tuples = 0; stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); stack->blkno = GIST_ROOT_BLKNO; --- 159,168 ---- /* first time through? */ if (stats == NULL) ! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* we'll re-count the tuples each time */ ! stats->estimated_count = false; ! stats->num_index_tuples = 0; stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); stack->blkno = GIST_ROOT_BLKNO; *************** *** 232,241 **** gistbulkdelete(PG_FUNCTION_ARGS) { todelete[ntodelete] = i - ntodelete; ntodelete++; ! stats->std.tuples_removed += 1; } else ! stats->std.num_index_tuples += 1; } if (ntodelete) --- 220,229 ---- { todelete[ntodelete] = i - ntodelete; ntodelete++; ! stats->tuples_removed += 1; } else ! stats->num_index_tuples += 1; } if (ntodelete) *************** *** 250,271 **** gistbulkdelete(PG_FUNCTION_ARGS) if (RelationNeedsWAL(rel)) { - XLogRecData *rdata; XLogRecPtr recptr; - gistxlogPageUpdate *xlinfo; ! rdata = formUpdateRdata(rel->rd_node, buffer, todelete, ntodelete, ! NULL, 0, ! NULL); ! xlinfo = (gistxlogPageUpdate *) rdata->next->data; ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - pfree(xlinfo); - pfree(rdata); } else PageSetLSN(page, GetXLogRecPtrForTemp()); --- 238,250 ---- if (RelationNeedsWAL(rel)) { XLogRecPtr recptr; ! recptr = gistXLogUpdate(rel->rd_node, buffer, todelete, ntodelete, ! NULL, 0, InvalidBuffer); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } else PageSetLSN(page, GetXLogRecPtrForTemp()); *************** *** 293,299 **** gistbulkdelete(PG_FUNCTION_ARGS) stack->next = ptr; if (GistTupleIsInvalid(idxtuple)) ! stats->needReindex = true; } } --- 272,282 ---- stack->next = ptr; if (GistTupleIsInvalid(idxtuple)) ! ereport(LOG, ! (errmsg("index \"%s\" contains an inner tuple marked as invalid", ! RelationGetRelationName(rel)), ! errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), ! errhint("Please REINDEX it."))); } } *** a/src/backend/access/gist/gistxlog.c --- b/src/backend/access/gist/gistxlog.c *************** *** 20,34 **** #include "utils/memutils.h" #include "utils/rel.h" - - typedef struct - { - gistxlogPageUpdate *data; - int len; - IndexTuple *itup; - OffsetNumber *todelete; - } PageUpdateRecord; - typedef struct { gistxlogPage *header; --- 20,25 ---- *************** *** 41,184 **** typedef struct NewPage *page; } PageSplitRecord; - /* track for incomplete inserts, idea was taken from nbtxlog.c */ - - typedef struct gistIncompleteInsert - { - RelFileNode node; - BlockNumber origblkno; /* for splits */ - ItemPointerData key; - int lenblk; - BlockNumber *blkno; - XLogRecPtr lsn; - BlockNumber *path; - int pathlen; - } gistIncompleteInsert; - - static MemoryContext opCtx; /* working memory for operations */ - static MemoryContext insertCtx; /* holds incomplete_inserts list */ - static List *incomplete_inserts; - - - #define ItemPointerEQ(a, b) \ - ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) - static void ! pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, ! BlockNumber *blkno, int lenblk, ! PageSplitRecord *xlinfo /* to extract blkno info */ ) { ! MemoryContext oldCxt; ! gistIncompleteInsert *ninsert; ! if (!ItemPointerIsValid(&key)) /* ! * if key is null then we should not store insertion as incomplete, ! * because it's a vacuum operation.. */ ! return; ! ! oldCxt = MemoryContextSwitchTo(insertCtx); ! ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); ! ! ninsert->node = node; ! ninsert->key = key; ! ninsert->lsn = lsn; ! ! if (lenblk && blkno) ! { ! ninsert->lenblk = lenblk; ! ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); ! memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk); ! ninsert->origblkno = *blkno; ! } ! else ! { ! int i; ! ! Assert(xlinfo); ! ninsert->lenblk = xlinfo->data->npage; ! ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); ! for (i = 0; i < ninsert->lenblk; i++) ! ninsert->blkno[i] = xlinfo->page[i].header->blkno; ! ninsert->origblkno = xlinfo->data->origblkno; ! } ! Assert(ninsert->lenblk > 0); ! ! /* ! * Stick the new incomplete insert onto the front of the list, not the ! * back. This is so that gist_xlog_cleanup will process incompletions in ! * last-in-first-out order. ! */ ! incomplete_inserts = lcons(ninsert, incomplete_inserts); ! ! MemoryContextSwitchTo(oldCxt); ! } ! ! static void ! forgetIncompleteInsert(RelFileNode node, ItemPointerData key) ! { ! ListCell *l; ! ! if (!ItemPointerIsValid(&key)) ! return; ! ! if (incomplete_inserts == NIL) ! return; ! ! foreach(l, incomplete_inserts) ! { ! gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); ! ! if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) { ! /* found */ ! incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); ! pfree(insert->blkno); ! pfree(insert); ! break; ! } ! } ! } ! static void ! decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) ! { ! char *begin = XLogRecGetData(record), ! *ptr; ! int i = 0, ! addpath = 0; ! ! decoded->data = (gistxlogPageUpdate *) begin; ! ! if (decoded->data->ntodelete) ! { ! decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); ! addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); ! } ! else ! decoded->todelete = NULL; ! ! decoded->len = 0; ! ptr = begin + sizeof(gistxlogPageUpdate) + addpath; ! while (ptr - begin < record->xl_len) ! { ! decoded->len++; ! ptr += IndexTupleSize((IndexTuple) ptr); ! } ! ! decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); ! ! ptr = begin + sizeof(gistxlogPageUpdate) + addpath; ! while (ptr - begin < record->xl_len) ! { ! decoded->itup[i] = (IndexTuple) ptr; ! ptr += IndexTupleSize(decoded->itup[i]); ! i++; } } --- 32,68 ---- NewPage *page; } PageSplitRecord; static MemoryContext opCtx; /* working memory for operations */ + /* + * Replay the clearing of F_FOLLOW_RIGHT flag. + */ static void ! gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, ! BlockNumber leftblkno) { ! Buffer buffer; ! buffer = XLogReadBuffer(node, leftblkno, false); ! if (BufferIsValid(buffer)) ! { ! Page page = (Page) BufferGetPage(buffer); /* ! * Note that we still update the page even if page LSN is equal to the ! * LSN of this record, because the updated NSN is not included in the ! * full page image. */ ! if (!XLByteLT(lsn, PageGetLSN(page))) { ! GistPageGetOpaque(page)->nsn = lsn; ! GistClearFollowRight(page); ! PageSetLSN(page, lsn); ! PageSetTLI(page, ThisTimeLineID); ! MarkBufferDirty(buffer); ! } ! UnlockReleaseBuffer(buffer); } } *************** *** 186,214 **** decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) * redo any page update (except page split) */ static void ! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { ! gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); ! PageUpdateRecord xlrec; Buffer buffer; Page page; ! /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ ! forgetIncompleteInsert(xldata->node, xldata->key); ! if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) ! /* operation with root always finalizes insertion */ ! pushIncompleteInsert(xldata->node, lsn, xldata->key, ! &(xldata->blkno), 1, ! NULL); ! ! /* nothing else to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; ! decodePageUpdateRecord(&xlrec, record); ! ! buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 70,91 ---- * redo any page update (except page split) */ static void ! gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) { ! char *begin = XLogRecGetData(record); ! gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; Buffer buffer; Page page; + char *data; ! if (BlockNumberIsValid(xldata->leftchild)) ! gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); ! /* nothing more to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** *** 219,246 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) return; } ! if (isnewroot) ! GISTInitBuffer(buffer, 0); ! else if (xlrec.data->ntodelete) { int i; ! for (i = 0; i < xlrec.data->ntodelete; i++) ! PageIndexTupleDelete(page, xlrec.todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ ! if (xlrec.len > 0) ! gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); ! ! /* ! * special case: leafpage, nothing to insert, nothing to delete, then ! * vacuum marks page ! */ ! if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) ! GistClearTuplesDeleted(page); if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) --- 96,144 ---- return; } ! data = begin + sizeof(gistxlogPageUpdate); ! ! /* Delete old tuples */ ! if (xldata->ntodelete > 0) { int i; + OffsetNumber *todelete = (OffsetNumber *) data; + data += sizeof(OffsetNumber) * xldata->ntodelete; ! for (i = 0; i < xldata->ntodelete; i++) ! PageIndexTupleDelete(page, todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ ! if (data - begin < record->xl_len) ! { ! OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : ! OffsetNumberNext(PageGetMaxOffsetNumber(page)); ! while (data - begin < record->xl_len) ! { ! IndexTuple itup = (IndexTuple) data; ! Size sz = IndexTupleSize(itup); ! OffsetNumber l; ! data += sz; ! ! l = PageAddItem(page, (Item) itup, sz, off, false, false); ! if (l == InvalidOffsetNumber) ! elog(ERROR, "failed to add item to GiST index page, size %d bytes", ! (int) sz); ! off++; ! } ! } ! else ! { ! /* ! * special case: leafpage, nothing to insert, nothing to delete, then ! * vacuum marks page ! */ ! if (GistPageIsLeaf(page) && xldata->ntodelete == 0) ! GistClearTuplesDeleted(page); ! } if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) *************** *** 315,355 **** decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { PageSplitRecord xlrec; Buffer buffer; Page page; int i; ! int flags; decodePageSplitRecord(&xlrec, record); - flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* ok, clear buffer */ GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } - - forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - - pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, - NULL, 0, - &xlrec); } static void --- 213,279 ---- static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { + gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; Buffer buffer; Page page; int i; ! bool isrootsplit = false; + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); decodePageSplitRecord(&xlrec, record); /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; + int flags; + + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + Assert(i == 0); + isrootsplit = true; + } buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* ok, clear buffer */ + if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO) + flags = F_LEAF; + else + flags = 0; GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + GistClearFollowRight(page); + } + else + { + if (i < xlrec.data->npage - 1) + GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno; + else + GistPageGetOpaque(page)->rightlink = xldata->origrlink; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + if (i < xlrec.data->npage - 1 && !isrootsplit) + GistMarkFollowRight(page); + else + GistClearFollowRight(page); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } } static void *************** *** 372,395 **** gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } - static void - gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) - { - char *begin = XLogRecGetData(record), - *ptr; - gistxlogInsertComplete *xlrec; - - xlrec = (gistxlogInsertComplete *) begin; - - ptr = begin + sizeof(gistxlogInsertComplete); - while (ptr - begin < record->xl_len) - { - Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData)); - forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr)); - ptr += sizeof(ItemPointerData); - } - } - void gist_redo(XLogRecPtr lsn, XLogRecord *record) { --- 296,301 ---- *************** *** 401,430 **** gist_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: ! gistRedoPageUpdateRecord(lsn, record, false); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; - case XLOG_GIST_NEW_ROOT: - gistRedoPageUpdateRecord(lsn, record, true); - break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; - case XLOG_GIST_INSERT_COMPLETE: - gistRedoCompleteInsert(lsn, record); - break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } --- 307,329 ---- * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: ! gistRedoPageUpdateRecord(lsn, record); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } *************** *** 434,453 **** gist_redo(XLogRecPtr lsn, XLogRecord *record) } static void ! out_target(StringInfo buf, RelFileNode node, ItemPointerData key) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); - if (ItemPointerIsValid(&key)) - appendStringInfo(buf, "; tid %u/%u", - ItemPointerGetBlockNumber(&key), - ItemPointerGetOffsetNumber(&key)); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { ! out_target(buf, xlrec->node, xlrec->key); appendStringInfo(buf, "; block number %u", xlrec->blkno); } --- 333,348 ---- } static void ! out_target(StringInfo buf, RelFileNode node) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { ! out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u", xlrec->blkno); } *************** *** 463,469 **** static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); ! out_target(buf, xlrec->node, xlrec->key); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } --- 358,364 ---- out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); ! out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } *************** *** 482,491 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec) case XLOG_GIST_PAGE_DELETE: out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); break; - case XLOG_GIST_NEW_ROOT: - appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); - break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; --- 377,382 ---- *************** *** 495,909 **** gist_desc(StringInfo buf, uint8 xl_info, char *rec) ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; - case XLOG_GIST_INSERT_COMPLETE: - appendStringInfo(buf, "complete_insert: rel %u/%u/%u", - ((gistxlogInsertComplete *) rec)->node.spcNode, - ((gistxlogInsertComplete *) rec)->node.dbNode, - ((gistxlogInsertComplete *) rec)->node.relNode); - break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } - IndexTuple - gist_form_invalid_tuple(BlockNumber blkno) - { - /* - * we don't alloc space for null's bitmap, this is invalid tuple, be - * carefull in read and write code - */ - Size size = IndexInfoFindDataOffset(0); - IndexTuple tuple = (IndexTuple) palloc0(size); - - tuple->t_info |= size; - - ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); - GistTupleSetInvalid(tuple); - - return tuple; - } - - - static void - gistxlogFindPath(Relation index, gistIncompleteInsert *insert) - { - GISTInsertStack *top; - - insert->pathlen = 0; - insert->path = NULL; - - if ((top = gistFindPath(index, insert->origblkno)) != NULL) - { - int i; - GISTInsertStack *ptr; - - for (ptr = top; ptr; ptr = ptr->parent) - insert->pathlen++; - - insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); - - i = 0; - for (ptr = top; ptr; ptr = ptr->parent) - insert->path[i++] = ptr->blkno; - } - else - elog(ERROR, "lost parent for block %u", insert->origblkno); - } - - static SplitedPageLayout * - gistMakePageLayout(Buffer *buffers, int nbuffers) - { - SplitedPageLayout *res = NULL, - *resptr; - - while (nbuffers-- > 0) - { - Page page = BufferGetPage(buffers[nbuffers]); - IndexTuple *vec; - int veclen; - - resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout)); - - resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]); - resptr->block.num = PageGetMaxOffsetNumber(page); - - vec = gistextractpage(page, &veclen); - resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist)); - - resptr->next = res; - res = resptr; - } - - return res; - } - - /* - * Continue insert after crash. In normal situations, there aren't any - * incomplete inserts, but if a crash occurs partway through an insertion - * sequence, we'll need to finish making the index valid at the end of WAL - * replay. - * - * Note that we assume the index is now in a valid state, except for the - * unfinished insertion. In particular it's safe to invoke gistFindPath(); - * there shouldn't be any garbage pages for it to run into. - * - * To complete insert we can't use basic insertion algorithm because - * during insertion we can't call user-defined support functions of opclass. - * So, we insert 'invalid' tuples without real key and do it by separate algorithm. - * 'invalid' tuple should be updated by vacuum full. - */ - static void - gistContinueInsert(gistIncompleteInsert *insert) - { - IndexTuple *itup; - int i, - lenitup; - Relation index; - - index = CreateFakeRelcacheEntry(insert->node); - - /* - * needed vector itup never will be more than initial lenblkno+2, because - * during this processing Indextuple can be only smaller - */ - lenitup = insert->lenblk; - itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->lenblk; i++) - itup[i] = gist_form_invalid_tuple(insert->blkno[i]); - - /* - * any insertion of itup[] should make LOG message about - */ - - if (insert->origblkno == GIST_ROOT_BLKNO) - { - /* - * it was split root, so we should only make new root. it can't be - * simple insert into root, we should replace all content of root. - */ - Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true); - - gistnewroot(index, buffer, itup, lenitup, NULL); - UnlockReleaseBuffer(buffer); - } - else - { - Buffer *buffers; - Page *pages; - int numbuffer; - OffsetNumber *todelete; - - /* construct path */ - gistxlogFindPath(index, insert); - - Assert(insert->pathlen > 0); - - buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); - pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); - todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->pathlen; i++) - { - int j, - k, - pituplen = 0; - uint8 xlinfo; - XLogRecData *rdata; - XLogRecPtr recptr; - Buffer tempbuffer = InvalidBuffer; - int ntodelete = 0; - - numbuffer = 1; - buffers[0] = ReadBuffer(index, insert->path[i]); - LockBuffer(buffers[0], GIST_EXCLUSIVE); - - /* - * we check buffer, because we restored page earlier - */ - gistcheckpage(index, buffers[0]); - - pages[0] = BufferGetPage(buffers[0]); - Assert(!GistPageIsLeaf(pages[0])); - - pituplen = PageGetMaxOffsetNumber(pages[0]); - - /* find remove old IndexTuples to remove */ - for (j = 0; j < pituplen && ntodelete < lenitup; j++) - { - BlockNumber blkno; - ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); - IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); - - blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); - - for (k = 0; k < lenitup; k++) - if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) - { - todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; - ntodelete++; - break; - } - } - - if (ntodelete == 0) - elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)"); - - /* - * we check space with subtraction only first tuple to delete, - * hope, that wiil be enough space.... - */ - - if (gistnospace(pages[0], itup, lenitup, *todelete, 0)) - { - - /* no space left on page, so we must split */ - buffers[numbuffer] = ReadBuffer(index, P_NEW); - LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); - GISTInitBuffer(buffers[numbuffer], 0); - pages[numbuffer] = BufferGetPage(buffers[numbuffer]); - gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber); - numbuffer++; - - if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) - { - Buffer tmp; - - /* - * we split root, just copy content from root to new page - */ - - /* sanity check */ - if (i + 1 != insert->pathlen) - elog(PANIC, "unexpected pathlen in index \"%s\"", - RelationGetRelationName(index)); - - /* fill new page, root will be changed later */ - tempbuffer = ReadBuffer(index, P_NEW); - LockBuffer(tempbuffer, GIST_EXCLUSIVE); - memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer)); - - /* swap buffers[0] (was root) and temp buffer */ - tmp = buffers[0]; - buffers[0] = tempbuffer; - tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, - * it is still unchanged */ - - pages[0] = BufferGetPage(buffers[0]); - } - - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - - xlinfo = XLOG_GIST_PAGE_SPLIT; - rdata = formSplitRdata(index->rd_node, insert->path[i], - false, &(insert->key), - gistMakePageLayout(buffers, numbuffer)); - - } - else - { - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber); - - xlinfo = XLOG_GIST_PAGE_UPDATE; - rdata = formUpdateRdata(index->rd_node, buffers[0], - todelete, ntodelete, - itup, lenitup, &(insert->key)); - } - - /* - * use insert->key as mark for completion of insert (form*Rdata() - * above) for following possible replays - */ - - /* write pages, we should mark it dirty befor XLogInsert() */ - for (j = 0; j < numbuffer; j++) - { - GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; - MarkBufferDirty(buffers[j]); - } - recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata); - for (j = 0; j < numbuffer; j++) - { - PageSetLSN(pages[j], recptr); - PageSetTLI(pages[j], ThisTimeLineID); - } - - END_CRIT_SECTION(); - - lenitup = numbuffer; - for (j = 0; j < numbuffer; j++) - { - itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - UnlockReleaseBuffer(buffers[j]); - } - - if (tempbuffer != InvalidBuffer) - { - /* - * it was a root split, so fill it by new values - */ - gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); - UnlockReleaseBuffer(tempbuffer); - } - } - } - - FreeFakeRelcacheEntry(index); - - ereport(LOG, - (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", - insert->node.spcNode, insert->node.dbNode, insert->node.relNode), - errdetail("Incomplete insertion detected during crash replay."))); - } - void gist_xlog_startup(void) { - incomplete_inserts = NIL; - insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST recovery temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { - ListCell *l; - MemoryContext oldCxt; - - oldCxt = MemoryContextSwitchTo(opCtx); - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - gistContinueInsert(insert); - MemoryContextReset(opCtx); - } - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(opCtx); - MemoryContextDelete(insertCtx); - } - - bool - gist_safe_restartpoint(void) - { - if (incomplete_inserts) - return false; - return true; } ! ! XLogRecData * ! formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ! ItemPointer key, SplitedPageLayout *dist) { XLogRecData *rdata; ! gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; int npage = 0, ! cur = 1; ! ptr = dist; ! while (ptr) ! { npage++; - ptr = ptr->next; - } rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); ! xlrec->node = node; ! xlrec->origblkno = blkno; ! xlrec->origleaf = page_is_leaf; ! xlrec->npage = (uint16) npage; ! if (key) ! xlrec->key = *key; ! else ! ItemPointerSetInvalid(&(xlrec->key)); ! rdata[0].buffer = InvalidBuffer; ! rdata[0].data = (char *) xlrec; rdata[0].len = sizeof(gistxlogPageSplit); ! rdata[0].next = NULL; ! ptr = dist; ! while (ptr) { rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); - rdata[cur - 1].next = &(rdata[cur]); cur++; rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].next = NULL; cur++; - ptr = ptr->next; } ! return rdata; } /* ! * Construct the rdata array for an XLOG record describing a page update ! * (deletion and/or insertion of tuples on a single index page). * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides --- 386,487 ---- ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } void gist_xlog_startup(void) { opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { MemoryContextDelete(opCtx); } ! /* ! * Write WAL record of a page split. ! */ ! XLogRecPtr ! gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ! SplitedPageLayout *dist, ! BlockNumber origrlink, GistNSN orignsn, ! Buffer leftchildbuf) { XLogRecData *rdata; ! gistxlogPageSplit xlrec; SplitedPageLayout *ptr; int npage = 0, ! cur; ! XLogRecPtr recptr; ! for (ptr = dist; ptr; ptr = ptr->next) npage++; rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); ! xlrec.node = node; ! xlrec.origblkno = blkno; ! xlrec.origrlink = origrlink; ! xlrec.orignsn = orignsn; ! xlrec.origleaf = page_is_leaf; ! xlrec.npage = (uint16) npage; ! xlrec.leftchild = ! BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; ! rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(gistxlogPageSplit); ! rdata[0].buffer = InvalidBuffer; ! cur = 1; ! ! /* ! * Include a full page image of the child buf. (only necessary if a ! * checkpoint happened since the child page was split) ! */ ! if (BufferIsValid(leftchildbuf)) { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + + for (ptr = dist; ptr; ptr = ptr->next) + { + rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); cur++; + rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; cur++; } + rdata[cur - 1].next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ! pfree(rdata); ! return recptr; } /* ! * Write XLOG record describing a page update. The update can include any ! * number of deletions and/or insertions of tuples on a single index page. ! * ! * If this update inserts a downlink for a split page, also record that ! * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set. * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides *************** *** 911,937 **** formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ ! XLogRecData * ! formUpdateRdata(RelFileNode node, Buffer buffer, ! OffsetNumber *todelete, int ntodelete, ! IndexTuple *itup, int ituplen, ItemPointer key) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; ! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; ! ! if (key) ! xlrec->key = *key; ! else ! ItemPointerSetInvalid(&(xlrec->key)); rdata[0].buffer = buffer; rdata[0].buffer_std = true; --- 489,514 ---- * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ ! XLogRecPtr ! gistXLogUpdate(RelFileNode node, Buffer buffer, ! OffsetNumber *todelete, int ntodelete, ! IndexTuple *itup, int ituplen, ! Buffer leftchildbuf) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; + XLogRecPtr recptr; ! rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; ! xlrec->leftchild = ! BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; rdata[0].buffer = buffer; rdata[0].buffer_std = true; *************** *** 945,957 **** formUpdateRdata(RelFileNode node, Buffer buffer, rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; ! rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); rdata[2].buffer = buffer; rdata[2].buffer_std = true; - rdata[2].next = NULL; - /* new tuples */ cur = 3; for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); --- 522,534 ---- rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; ! rdata[2].len = sizeof(OffsetNumber) * ntodelete; rdata[2].buffer = buffer; rdata[2].buffer_std = true; cur = 3; + + /* new tuples */ for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); *************** *** 959,996 **** formUpdateRdata(RelFileNode node, Buffer buffer, rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; - rdata[cur].next = NULL; cur++; } ! return rdata; ! } ! ! XLogRecPtr ! gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) ! { ! gistxlogInsertComplete xlrec; ! XLogRecData rdata[2]; ! XLogRecPtr recptr; ! ! Assert(len > 0); ! xlrec.node = node; ! ! rdata[0].buffer = InvalidBuffer; ! rdata[0].data = (char *) &xlrec; ! rdata[0].len = sizeof(gistxlogInsertComplete); ! rdata[0].next = &(rdata[1]); ! ! rdata[1].buffer = InvalidBuffer; ! rdata[1].data = (char *) keys; ! rdata[1].len = sizeof(ItemPointerData) * len; ! rdata[1].next = NULL; ! ! START_CRIT_SECTION(); ! ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); ! END_CRIT_SECTION(); return recptr; } --- 536,561 ---- rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; cur++; } ! /* ! * Include a full page image of the child buf. (only necessary if ! * a checkpoint happened since the child page was split) ! */ ! if (BufferIsValid(leftchildbuf)) ! { ! rdata[cur - 1].next = &(rdata[cur]); ! rdata[cur].data = NULL; ! rdata[cur].len = 0; ! rdata[cur].buffer = leftchildbuf; ! rdata[cur].buffer_std = true; ! cur++; ! } ! rdata[cur - 1].next = NULL; ! recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + pfree(rdata); return recptr; } *** a/src/backend/access/transam/rmgr.c --- b/src/backend/access/transam/rmgr.c *************** *** 40,45 **** const RmgrData RmgrTable[RM_MAX_ID + 1] = { {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, ! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint}, {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} }; --- 40,45 ---- {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, ! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL}, {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} }; *** a/src/include/access/gist.h --- b/src/include/access/gist.h *************** *** 58,66 **** /* * Page opaque data in a GiST index page. */ ! #define F_LEAF (1 << 0) ! #define F_DELETED (1 << 1) ! #define F_TUPLES_DELETED (1 << 2) typedef XLogRecPtr GistNSN; --- 58,67 ---- /* * Page opaque data in a GiST index page. */ ! #define F_LEAF (1 << 0) /* leaf page */ ! #define F_DELETED (1 << 1) /* the page has been deleted */ ! #define F_TUPLES_DELETED (1 << 2) /* some tuples on the page are dead */ ! #define F_FOLLOW_RIGHT (1 << 3) /* page to the right has no downlink */ typedef XLogRecPtr GistNSN; *************** *** 132,137 **** typedef struct GISTENTRY --- 133,142 ---- #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED) #define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED) + #define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT) + #define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT) + #define GistClearFollowRight(page) ( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT) + /* * Vector of GISTENTRY structs; user-defined methods union and picksplit * take it as one of their arguments *** a/src/include/access/gist_private.h --- b/src/include/access/gist_private.h *************** *** 132,140 **** typedef GISTScanOpaqueData *GISTScanOpaque; /* XLog stuff */ #define XLOG_GIST_PAGE_UPDATE 0x00 ! #define XLOG_GIST_NEW_ROOT 0x20 #define XLOG_GIST_PAGE_SPLIT 0x30 ! #define XLOG_GIST_INSERT_COMPLETE 0x40 #define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_PAGE_DELETE 0x60 --- 132,140 ---- /* XLog stuff */ #define XLOG_GIST_PAGE_UPDATE 0x00 ! /* #define XLOG_GIST_NEW_ROOT 0x20 */ /* not used anymore */ #define XLOG_GIST_PAGE_SPLIT 0x30 ! /* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */ #define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_PAGE_DELETE 0x60 *************** *** 144,152 **** typedef struct gistxlogPageUpdate BlockNumber blkno; /* ! * It used to identify completeness of insert. Sets to leaf itup */ ! ItemPointerData key; /* number of deleted offsets */ uint16 ntodelete; --- 144,153 ---- BlockNumber blkno; /* ! * If this operation completes a page split, by inserting a downlink for ! * the split page, leftchild points to the left half of the split. */ ! BlockNumber leftchild; /* number of deleted offsets */ uint16 ntodelete; *************** *** 160,170 **** typedef struct gistxlogPageSplit { RelFileNode node; BlockNumber origblkno; /* splitted page */ bool origleaf; /* was splitted page a leaf page? */ - uint16 npage; ! /* see comments on gistxlogPageUpdate */ ! ItemPointerData key; /* * follow: 1. gistxlogPage and array of IndexTupleData per page --- 161,172 ---- { RelFileNode node; BlockNumber origblkno; /* splitted page */ + BlockNumber origrlink; /* rightlink of the page before split */ + GistNSN orignsn; /* NSN of the page before split */ bool origleaf; /* was splitted page a leaf page? */ ! BlockNumber leftchild; /* like in gistxlogPageUpdate */ ! uint16 npage; /* # of pages in the split */ /* * follow: 1. gistxlogPage and array of IndexTupleData per page *************** *** 177,188 **** typedef struct gistxlogPage int num; /* number of index tuples following */ } gistxlogPage; - typedef struct gistxlogInsertComplete - { - RelFileNode node; - /* follows ItemPointerData key to clean */ - } gistxlogInsertComplete; - typedef struct gistxlogPageDelete { RelFileNode node; --- 179,184 ---- *************** *** 206,212 **** typedef struct SplitedPageLayout * GISTInsertStack used for locking buffers and transfer arguments during * insertion */ - typedef struct GISTInsertStack { /* current page */ --- 202,207 ---- *************** *** 215,221 **** typedef struct GISTInsertStack Page page; /* ! * log sequence number from page->lsn to recognize page update and * compare it with page's nsn to recognize page split */ GistNSN lsn; --- 210,216 ---- Page page; /* ! * log sequence number from page->lsn to recognize page update and * compare it with page's nsn to recognize page split */ GistNSN lsn; *************** *** 223,231 **** typedef struct GISTInsertStack /* child's offset */ OffsetNumber childoffnum; ! /* pointer to parent and child */ struct GISTInsertStack *parent; - struct GISTInsertStack *child; /* for gistFindPath */ struct GISTInsertStack *next; --- 218,225 ---- /* child's offset */ OffsetNumber childoffnum; ! /* pointer to parent */ struct GISTInsertStack *parent; /* for gistFindPath */ struct GISTInsertStack *next; *************** *** 238,249 **** typedef struct GistSplitVector Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in * spl_left */ bool spl_lisnull[INDEX_MAX_KEYS]; - bool spl_leftvalid; Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in * spl_right */ bool spl_risnull[INDEX_MAX_KEYS]; - bool spl_rightvalid; bool *spl_equiv; /* equivalent tuples which can be freely * distributed between left and right pages */ --- 232,241 ---- *************** *** 252,279 **** typedef struct GistSplitVector typedef struct { Relation r; - IndexTuple *itup; /* in/out, points to compressed entry */ - int ituplen; /* length of itup */ Size freespace; /* free space to be left */ - GISTInsertStack *stack; - bool needInsertComplete; ! /* pointer to heap tuple */ ! ItemPointerData key; } GISTInsertState; /* root page of a gist index */ #define GIST_ROOT_BLKNO 0 /* ! * mark tuples on inner pages during recovery */ #define TUPLE_IS_VALID 0xffff #define TUPLE_IS_INVALID 0xfffe #define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) #define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) - #define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID ) /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); --- 244,283 ---- typedef struct { Relation r; Size freespace; /* free space to be left */ ! GISTInsertStack *stack; } GISTInsertState; /* root page of a gist index */ #define GIST_ROOT_BLKNO 0 /* ! * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner ! * pages to finish crash recovery of incomplete page splits. If a crash ! * happened in the middle of a page split, so that the downlink pointers were ! * not yet inserted, crash recovery inserted a special downlink pointer. The ! * semantics of an invalid tuple was that it if you encounter one in a scan, ! * it must always be followed, because we don't know if the tuples on the ! * child page match or not. ! * ! * We no longer create such invalid tuples, we now mark the left-half of such ! * an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the ! * split properly the next time we need to insert on that page. To retain ! * on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as ! * the offset number of all inner tuples. If we encounter any invalid tuples ! * with 0xfffe during insertion, we throw an error, though scans still handle ! * them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1 ! * gist index which already has invalid tuples in it because of a crash. That ! * should be rare, and you are recommended to REINDEX anyway if you have any ! * invalid tuples in an index, so throwing an error is as far as we go with ! * supporting that. */ #define TUPLE_IS_VALID 0xffff #define TUPLE_IS_INVALID 0xfffe #define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) #define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); *************** *** 281,288 **** extern Datum gistinsert(PG_FUNCTION_ARGS); extern MemoryContext createTempGistContext(void); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); - extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); - extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key); extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate); --- 285,290 ---- *************** *** 294,311 **** extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); - extern bool gist_safe_restartpoint(void); - extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); - - extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key); ! extern XLogRecData *formSplitRdata(RelFileNode node, ! BlockNumber blkno, bool page_is_leaf, ! ItemPointer key, SplitedPageLayout *dist); ! extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); --- 296,312 ---- extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); ! extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer, ! OffsetNumber *todelete, int ntodelete, ! IndexTuple *itup, int ntup, ! Buffer leftchild); ! extern XLogRecPtr gistXLogSplit(RelFileNode node, ! BlockNumber blkno, bool page_is_leaf, ! SplitedPageLayout *dist, ! BlockNumber origrlink, GistNSN oldnsn, ! Buffer leftchild); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); *************** *** 357,363 **** extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, extern float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, GISTENTRY *key2, bool isNull2); ! extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull); extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b); extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, --- 358,364 ---- extern float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, GISTENTRY *key2, bool isNull2); ! extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull); extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b); extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,