I wrote:
> it did occur to me that there is a simple way to ameliorate this
> problem: we could rearrange the code in _bt_pagedel() so it checks for
> this case before entering its critical section. Then, corruption of
> this kind is at least only an ERROR not a PANIC.
In particular, I propose the attached patch, which gets rid of
unnecessary PANIC cases in _bt_split as well as _bt_pagedel;
all of these get reported from the field every now and then.
In passing this also makes the error messages out of _bt_split
a bit more detailed.
FWIW, I think the original rationale for PANIC here was so we could
capture a core dump for study; but since no one has ever yet cooperated
by providing such a dump, it seems like not panicking is a better plan.
Barring objections, I plan to back-patch this as far as it will
conveniently go (looks like 8.2 or 8.3 or so).
regards, tom lane
Index: src/backend/access/nbtree/nbtinsert.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.178
diff -c -r1.178 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c 28 Mar 2010 09:27:01 -0000 1.178
--- src/backend/access/nbtree/nbtinsert.c 28 Aug 2010 23:31:44 -0000
***************
*** 74,82 ****
static void _bt_checksplitloc(FindSplitData *state,
OffsetNumber firstoldonright, bool newitemonleft,
int dataitemstoleft, Size firstoldonrightsz);
! static void _bt_pgaddtup(Relation rel, Page page,
! Size itemsize, IndexTuple itup,
! OffsetNumber itup_off, const char *where);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
--- 74,81 ----
static void _bt_checksplitloc(FindSplitData *state,
OffsetNumber firstoldonright, bool newitemonleft,
int dataitemstoleft, Size firstoldonrightsz);
! static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
! OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
***************
*** 753,759 ****
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
! _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
MarkBufferDirty(buf);
--- 752,760 ----
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
! if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
! elog(PANIC, "failed to add new item to block %u in index \"%s\"",
! itup_blkno, RelationGetRelationName(rel));
MarkBufferDirty(buf);
***************
*** 879,884 ****
--- 880,887 ----
Page origpage;
Page leftpage,
rightpage;
+ BlockNumber origpagenumber,
+ rightpagenumber;
BTPageOpaque ropaque,
lopaque,
oopaque;
***************
*** 894,904 ****
--- 897,923 ----
OffsetNumber i;
bool isroot;
+ /* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+ /*
+ * origpage is the original page to be split. leftpage is a temporary
+ * buffer that receives the left-sibling data, which will be copied back
+ * into origpage on success. rightpage is the new page that receives
+ * the right-sibling data. If we fail before reaching the critical
+ * section, origpage hasn't been modified and leftpage is only workspace.
+ * In principle we shouldn't need to worry about rightpage either,
+ * because it hasn't been linked into the btree page structure; but to
+ * avoid leaving possibly-confusing junk behind, we are careful to reinit
+ * rightpage to empty before throwing any error.
+ */
origpage = BufferGetPage(buf);
leftpage = PageGetTempPage(origpage);
rightpage = BufferGetPage(rbuf);
+ origpagenumber = BufferGetBlockNumber(buf);
+ rightpagenumber = BufferGetBlockNumber(rbuf);
+
_bt_pageinit(leftpage, BufferGetPageSize(buf));
/* rightpage was already initialized by _bt_getbuf */
***************
*** 923,930 ****
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
lopaque->btpo_prev = oopaque->btpo_prev;
! lopaque->btpo_next = BufferGetBlockNumber(rbuf);
! ropaque->btpo_prev = BufferGetBlockNumber(buf);
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/* Since we already have write-lock on both pages, ok to read cycleid */
--- 942,949 ----
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
lopaque->btpo_prev = oopaque->btpo_prev;
! lopaque->btpo_next = rightpagenumber;
! ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/* Since we already have write-lock on both pages, ok to read cycleid */
***************
*** 947,955 ****
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
false, false) == InvalidOffsetNumber)
! elog(PANIC, "failed to add hikey to the right sibling"
" while splitting block %u of index \"%s\"",
! BufferGetBlockNumber(buf), RelationGetRelationName(rel));
rightoff = OffsetNumberNext(rightoff);
}
--- 966,977 ----
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
false, false) == InvalidOffsetNumber)
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add hikey to the right sibling"
" while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
rightoff = OffsetNumberNext(rightoff);
}
***************
*** 974,982 ****
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
! elog(PANIC, "failed to add hikey to the left sibling"
" while splitting block %u of index \"%s\"",
! BufferGetBlockNumber(buf), RelationGetRelationName(rel));
leftoff = OffsetNumberNext(leftoff);
/*
--- 996,1007 ----
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add hikey to the left sibling"
" while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
leftoff = OffsetNumberNext(leftoff);
/*
***************
*** 998,1011 ****
{
if (newitemonleft)
{
! _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
! "left sibling");
leftoff = OffsetNumberNext(leftoff);
}
else
{
! _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
! "right sibling");
rightoff = OffsetNumberNext(rightoff);
}
}
--- 1023,1046 ----
{
if (newitemonleft)
{
! if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add new item to the left sibling"
! " while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
leftoff = OffsetNumberNext(leftoff);
}
else
{
! if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add new item to the right sibling"
! " while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
rightoff = OffsetNumberNext(rightoff);
}
}
***************
*** 1013,1026 ****
/* decide which page to put it on */
if (i < firstright)
{
! _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
! "left sibling");
leftoff = OffsetNumberNext(leftoff);
}
else
{
! _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
! "right sibling");
rightoff = OffsetNumberNext(rightoff);
}
}
--- 1048,1071 ----
/* decide which page to put it on */
if (i < firstright)
{
! if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add old item to the left sibling"
! " while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
leftoff = OffsetNumberNext(leftoff);
}
else
{
! if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add old item to the right sibling"
! " while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
rightoff = OffsetNumberNext(rightoff);
}
}
***************
*** 1034,1041 ****
* not be splitting the page).
*/
Assert(!newitemonleft);
! _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
! "right sibling");
rightoff = OffsetNumberNext(rightoff);
}
--- 1079,1091 ----
* not be splitting the page).
*/
Assert(!newitemonleft);
! if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "failed to add new item to the right sibling"
! " while splitting block %u of index \"%s\"",
! origpagenumber, RelationGetRelationName(rel));
! }
rightoff = OffsetNumberNext(rightoff);
}
***************
*** 1047,1062 ****
* neighbors.
*/
! if (!P_RIGHTMOST(ropaque))
{
! sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
! if (sopaque->btpo_prev != ropaque->btpo_prev)
! elog(PANIC, "right sibling's left-link doesn't match: "
! "block %u links to %u instead of expected %u in index \"%s\"",
! ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
RelationGetRelationName(rel));
/*
* Check to see if we can set the SPLIT_END flag in the right-hand
--- 1097,1115 ----
* neighbors.
*/
! if (!P_RIGHTMOST(oopaque))
{
! sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
! if (sopaque->btpo_prev != origpagenumber)
! {
! _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! elog(ERROR, "right sibling's left-link doesn't match: "
! "block %u links to %u instead of expected %u in index \"%s\"",
! oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
RelationGetRelationName(rel));
+ }
/*
* Check to see if we can set the SPLIT_END flag in the right-hand
***************
*** 1081,1088 ****
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
! * scribbling on the original page yet, and we don't care about the new
! * sibling until it's linked into the btree.
*/
START_CRIT_SECTION();
--- 1134,1140 ----
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
! * scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
***************
*** 1094,1112 ****
* (in the page management code) that the center of a page always be
* clean, and the most efficient way to guarantee this is just to compact
* the data by reinserting it into a new left page. (XXX the latter
! * comment is probably obsolete.)
*
* We need to do this before writing the WAL record, so that XLogInsert
* can WAL log an image of the page if necessary.
*/
PageRestoreTempPage(leftpage, origpage);
MarkBufferDirty(buf);
MarkBufferDirty(rbuf);
if (!P_RIGHTMOST(ropaque))
{
! sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
MarkBufferDirty(sbuf);
}
--- 1146,1166 ----
* (in the page management code) that the center of a page always be
* clean, and the most efficient way to guarantee this is just to compact
* the data by reinserting it into a new left page. (XXX the latter
! * comment is probably obsolete; but in any case it's good to not scribble
! * on the original page until we enter the critical section.)
*
* We need to do this before writing the WAL record, so that XLogInsert
* can WAL log an image of the page if necessary.
*/
PageRestoreTempPage(leftpage, origpage);
+ /* leftpage, lopaque must not be used below here */
MarkBufferDirty(buf);
MarkBufferDirty(rbuf);
if (!P_RIGHTMOST(ropaque))
{
! sopaque->btpo_prev = rightpagenumber;
MarkBufferDirty(sbuf);
}
***************
*** 1120,1127 ****
XLogRecData *lastrdata;
xlrec.node = rel->rd_node;
! xlrec.leftsib = BufferGetBlockNumber(buf);
! xlrec.rightsib = BufferGetBlockNumber(rbuf);
xlrec.rnext = ropaque->btpo_next;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
--- 1174,1181 ----
XLogRecData *lastrdata;
xlrec.node = rel->rd_node;
! xlrec.leftsib = origpagenumber;
! xlrec.rightsib = rightpagenumber;
xlrec.rnext = ropaque->btpo_next;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
***************
*** 1920,1932 ****
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
! static void
! _bt_pgaddtup(Relation rel,
! Page page,
Size itemsize,
IndexTuple itup,
! OffsetNumber itup_off,
! const char *where)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTupleData trunctuple;
--- 1974,1984 ----
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
! static bool
! _bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,
! OffsetNumber itup_off)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTupleData trunctuple;
***************
*** 1941,1948 ****
if (PageAddItem(page, (Item) itup, itemsize, itup_off,
false, false) == InvalidOffsetNumber)
! elog(PANIC, "failed to add item to the %s in index \"%s\"",
! where, RelationGetRelationName(rel));
}
/*
--- 1993,2001 ----
if (PageAddItem(page, (Item) itup, itemsize, itup_off,
false, false) == InvalidOffsetNumber)
! return false;
!
! return true;
}
/*
Index: src/backend/access/nbtree/nbtpage.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v
retrieving revision 1.123
diff -c -r1.123 nbtpage.c
*** src/backend/access/nbtree/nbtpage.c 6 Jul 2010 19:18:55 -0000 1.123
--- src/backend/access/nbtree/nbtpage.c 28 Aug 2010 23:31:44 -0000
***************
*** 1253,1258 ****
--- 1253,1290 ----
}
/*
+ * Check to make sure the index items we're about to delete/overwrite
+ * contain what we expect. This can fail if the index has become
+ * corrupt for some reason. We want to throw any error before entering
+ * the critical section --- otherwise it'd be a PANIC.
+ *
+ * The test on the target item is just an Assert because _bt_getstackbuf
+ * should have guaranteed it has the expected contents. The test on the
+ * next-child downlink is known to fail in the field, though.
+ */
+ page = BufferGetPage(pbuf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ #ifdef USE_ASSERT_CHECKING
+ itemid = PageGetItemId(page, poffset);
+ itup = (IndexTuple) PageGetItem(page, itemid);
+ Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
+ #endif
+
+ if (!parent_half_dead)
+ {
+ OffsetNumber nextoffset;
+
+ nextoffset = OffsetNumberNext(poffset);
+ itemid = PageGetItemId(page, nextoffset);
+ itup = (IndexTuple) PageGetItem(page, itemid);
+ if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
+ elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"",
+ rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)),
+ parent, RelationGetRelationName(rel));
+ }
+
+ /*
* Here we begin doing the deletion.
*/
***************
*** 1265,1272 ****
* to copy the right sibling's downlink over the target downlink, and then
* delete the following item.
*/
- page = BufferGetPage(pbuf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (parent_half_dead)
{
PageIndexTupleDelete(page, poffset);
--- 1297,1302 ----
***************
*** 1278,1294 ****
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
- Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
- /* This part is just for double-checking */
- itemid = PageGetItemId(page, nextoffset);
- itup = (IndexTuple) PageGetItem(page, itemid);
- if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
- elog(PANIC, "right sibling %u of block %u is not next child of %u in index \"%s\"",
- rightsib, target, BufferGetBlockNumber(pbuf),
- RelationGetRelationName(rel));
PageIndexTupleDelete(page, nextoffset);
}
--- 1308,1316 ----