📄 nbtinsert.c
字号:
rightoff; OffsetNumber start; OffsetNumber maxoff; OffsetNumber i; rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); rightpage = BufferGetPage(rbuf); _bt_pageinit(rightpage, BufferGetPageSize(rbuf)); _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* init btree private data */ oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); /* if we're splitting this page, it won't be the root when we're done */ oopaque->btpo_flags &= ~BTP_ROOT; oopaque->btpo_flags &= ~BTP_CHAIN; lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; ropaque->btpo_prev = BufferGetBlockNumber(buf); lopaque->btpo_next = BufferGetBlockNumber(rbuf); ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent; /* * If the page we're splitting is not the rightmost page at its level * in the tree, then the first (0) entry on the page is the high key * for the page. We need to copy that to the right half. Otherwise * (meaning the rightmost page case), we should treat the line * pointers beginning at zero as user data. * * We leave a blank space at the start of the line table for the left * page. We'll come back later and fill it in with the high key item * we get from the right key. */ leftoff = P_FIRSTKEY; ropaque->btpo_next = oopaque->btpo_next; if (!P_RIGHTMOST(oopaque)) { /* splitting a non-rightmost page, start at the first data item */ start = P_FIRSTKEY; itemid = PageGetItemId(origpage, P_HIKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add hikey to the right sibling"); rightoff = P_FIRSTKEY; } else { /* splitting a rightmost page, "high key" is the first data item */ start = P_HIKEY; /* the new rightmost page will not have a high key */ rightoff = P_HIKEY; } maxoff = PageGetMaxOffsetNumber(origpage); if (firstright == InvalidOffsetNumber) { Size llimit = PageGetFreeSpace(leftpage) / 2; firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit); } for (i = start; i <= maxoff; i = OffsetNumberNext(i)) { itemid = PageGetItemId(origpage, i); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(origpage, itemid); /* decide which page to put it on */ if (i < firstright) { if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item to the left sibling"); leftoff = OffsetNumberNext(leftoff); } else { if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item to the right sibling"); rightoff = OffsetNumberNext(rightoff); } } /* * Okay, page has been split, high key on right page is correct. Now * set the high key on the left page to be the min key on the right * page. */ if (P_RIGHTMOST(ropaque)) itemid = PageGetItemId(rightpage, P_HIKEY); else itemid = PageGetItemId(rightpage, P_FIRSTKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(rightpage, itemid); /* * We left a hole for the high key on the left page; fill it. The * modal crap is to tell the page manager to put the new item on the * page and not screw around with anything else. Whoever designed * this interface has presumably crawled back into the dung heap they * came from. No one here will admit to it. */ PageManagerModeSet(OverwritePageManagerMode); if (PageAddItem(leftpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add hikey to the left sibling"); PageManagerModeSet(ShufflePageManagerMode); /* * By here, the original data page has been split into two new halves, * and these are correct. The algorithm requires that the left page * never move during a split, so we copy the new left page back on top * of the original. Note that this is not a waste of time, since we * also require (in the page management code) that the center of a * page always be clean, and the most efficient way to guarantee this * is just to compact the data by reinserting it into a new left page. */ PageRestoreTempPage(leftpage, origpage); /* write these guys out */ _bt_wrtnorelbuf(rel, rbuf); _bt_wrtnorelbuf(rel, buf); /* * Finally, we need to grab the right sibling (if any) and fix the * prev pointer there. We are guaranteed that this is deadlock-free * since no other writer will be moving holding a lock on that page * and trying to move left, and all readers release locks on a page * before trying to fetch its neighbors. */ if (!P_RIGHTMOST(ropaque)) { sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); sopaque->btpo_prev = BufferGetBlockNumber(rbuf); /* write and release the old right sibling */ _bt_wrtbuf(rel, sbuf); } /* split's done */ return rbuf;}/* * _bt_findsplitloc() -- find a safe place to split a page. * * In order to guarantee the proper handling of searches for duplicate * keys, the first duplicate in the chain must either be the first * item on the page after the split, or the entire chain must be on * one of the two pages. That is, * [1 2 2 2 3 4 5] * must become * [1] [2 2 2 3 4 5] * or * [1 2 2 2] [3 4 5] * but not * [1 2 2] [2 3 4 5]. * However, * [2 2 2 2 2 3 4] * may be split as * [2 2 2 2] [2 3 4]. */static OffsetNumber_bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit){ OffsetNumber i; OffsetNumber saferight; ItemId nxtitemid, safeitemid; BTItem safeitem, nxtitem; Size nbytes; int natts; if (start >= maxoff) elog(FATAL, "btree: cannot split if start (%d) >= maxoff (%d)", start, maxoff); natts = rel->rd_rel->relnatts; saferight = start; safeitemid = PageGetItemId(page, saferight); nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData); safeitem = (BTItem) PageGetItem(page, safeitemid); i = OffsetNumberNext(start); while (nbytes < llimit) { /* check the next item on the page */ nxtitemid = PageGetItemId(page, i); nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData)); nxtitem = (BTItem) PageGetItem(page, nxtitemid); /* * Test against last known safe item: if the tuple we're looking * at isn't equal to the last safe one we saw, then it's our new * safe tuple. */ if (!_bt_itemcmp(rel, natts, safeitem, nxtitem, BTEqualStrategyNumber)) { safeitem = nxtitem; saferight = i; } if (i < maxoff) i = OffsetNumberNext(i); else break; } /* * If the chain of dups starts at the beginning of the page and * extends past the halfway mark, we can split it in the middle. */ if (saferight == start) saferight = i; if (saferight == maxoff && (maxoff - start) > 1) saferight = start + (maxoff - start) / 2; return saferight;}/* * _bt_newroot() -- Create a new root page for the index. * * We've just split the old root page and need to create a new one. * In order to do this, we add a new root page to the file, then lock * the metadata page and update it. This is guaranteed to be deadlock- * free, because all readers release their locks on the metadata page * before trying to lock the root, and all writers lock the root before * trying to lock the metadata page. We have a write lock on the old * root page, so we have not introduced any cycles into the waits-for * graph. * * On entry, lbuf (the old root) and rbuf (its new peer) are write- * locked. We don't drop the locks in this routine; that's done by * the caller. On exit, a new root page exists with entries for the * two new children. The new root page is neither pinned nor locked. */static void_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf){ Buffer rootbuf; Page lpage, rpage, rootpage; BlockNumber lbkno, rbkno; BlockNumber rootbknum; BTPageOpaque rootopaque; ItemId itemid; BTItem item; Size itemsz; BTItem new_item; /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootpage = BufferGetPage(rootbuf); rootbknum = BufferGetBlockNumber(rootbuf); _bt_pageinit(rootpage, BufferGetPageSize(rootbuf)); /* set btree special data */ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE; rootopaque->btpo_flags |= BTP_ROOT; /* * Insert the internal tuple pointers. */ lbkno = BufferGetBlockNumber(lbuf); rbkno = BufferGetBlockNumber(rbuf); lpage = BufferGetPage(lbuf); rpage = BufferGetPage(rbuf); ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent = ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent = rootbknum; /* * step over the high key on the left page while building the left * page pointer. */ itemid = PageGetItemId(lpage, P_FIRSTKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(lpage, itemid); new_item = _bt_formitem(&(item->bti_itup)); ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY); /* * insert the left page pointer into the new root page. the root page * is the rightmost page on its level so the "high key" item is the * first data item. */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add leftkey to new root page"); pfree(new_item); /* * the right page is the rightmost page on the second level, so the * "high key" item is the first data item on that page as well. */ itemid = PageGetItemId(rpage, P_HIKEY); itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(rpage, itemid); new_item = _bt_formitem(&(item->bti_itup)); ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY); /* * insert the right page pointer into the new root page. */ if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add rightkey to new root page"); pfree(new_item); /* write and let go of the root buffer */ _bt_wrtbuf(rel, rootbuf); /* update metadata page with new root block number */ _bt_metaproot(rel, rootbknum, 0); _bt_wrtbuf(rel, lbuf); _bt_wrtbuf(rel, rbuf);}/* * _bt_pgaddtup() -- add a tuple to a particular page in the index. * * This routine adds the tuple to the page as requested, and keeps the * write lock and reference associated with the page's buffer. It is * an error to call pgaddtup() without a write lock and reference. If * afteritem is non-null, it's the item that we expect our new item * to follow. Otherwise, we do a binary search for the correct place * and insert the new item there. */static OffsetNumber_bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem){ OffsetNumber itup_off; OffsetNumber first; Page page; BTPageOpaque opaque; BTItem chkitem; page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); first = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; if (afteritem == (BTItem) NULL) itup_off = _bt_binsrch(rel, buf, keysz, itup_scankey, BT_INSERTION); else { itup_off = first; do { chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, itup_off)); itup_off = OffsetNumberNext(itup_off); } while (!BTItemSame(chkitem, afteritem)); } if (PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED) == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item to the page"); /* write the buffer, but hold our lock */ _bt_wrtnorelbuf(rel, buf); return itup_off;}/* * _bt_goesonpg() -- Does a new tuple belong on this page? * * This is part of the complexity introduced by allowing duplicate * keys into the index. The tuple belongs on this page if: * * + there is no page to the right of this one; or * + it is less than the high key on the page; or * + the item it is to follow ("afteritem") appears on this * page. */static bool_bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -