📄 nbtinsert.c
字号:
/*------------------------------------------------------------------------- * * btinsert.c * Item insertion in Lehman and Yao btrees for Postgres. * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /usr/local/cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.42.2.2 1999/09/01 17:54:00 scrappy Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/heapam.h"#include "access/nbtree.h"static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright);static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);/* * _bt_doinsert() -- Handle insertion of a single btitem in the tree. * * This routine is called by the public interface routines, btbuild * and btinsert. By here, btitem is filled in, and has a unique * (xid, seqno) pair. */InsertIndexResult_bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel){ ScanKey itup_scankey; IndexTuple itup; BTStack stack; Buffer buf; BlockNumber blkno; int natts = rel->rd_rel->relnatts; InsertIndexResult res; Buffer buffer; itup = &(btitem->bti_itup); /* we need a scan key to do our search, so build one */ itup_scankey = _bt_mkscankey(rel, itup); /* find the page containing this key */ stack = _bt_search(rel, natts, itup_scankey, &buf); /* trade in our read lock for a write lock */ LockBuffer(buf, BUFFER_LOCK_UNLOCK); LockBuffer(buf, BT_WRITE);l1: /* * If the page was split between the time that we surrendered our read * lock and acquired our write lock, then this page may no longer be * the right place for the key we want to insert. In this case, we * need to move right in the tree. See Lehman and Yao for an * excruciatingly precise description. */ buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE); blkno = BufferGetBlockNumber(buf); /* if we're not allowing duplicates, make sure the key isn't */ /* already in the node */ if (index_is_unique) { OffsetNumber offset, maxoff; Page page; page = BufferGetPage(buf); maxoff = PageGetMaxOffsetNumber(page); offset = _bt_binsrch(rel, buf, natts, itup_scankey, BT_DESCENT); /* make sure the offset we're given points to an actual */ /* key on the page before trying to compare it */ if (!PageIsEmpty(page) && offset <= maxoff) { TupleDesc itupdesc; BTItem cbti; HeapTupleData htup; BTPageOpaque opaque; Buffer nbuf; BlockNumber nblkno; bool chtup = true; itupdesc = RelationGetDescr(rel); nbuf = InvalidBuffer; opaque = (BTPageOpaque) PageGetSpecialPointer(page); /* * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's * how we handling NULLs - and so we must not use _bt_compare * in real comparison, but only for ordering/finding items on * pages. - vadim 03/24/97 * * while ( !_bt_compare (rel, itupdesc, page, natts, * itup_scankey, offset) ) */ while (_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) { /* they're equal */ /* * Have to check is inserted heap tuple deleted one (i.e. * just moved to another place by vacuum)! */ if (chtup) { htup.t_self = btitem->bti_itup.t_tid; heap_fetch(heapRel, SnapshotDirty, &htup, &buffer); if (htup.t_data == NULL) /* YES! */ break; /* Live tuple was inserted */ ReleaseBuffer(buffer); chtup = false; } cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset)); htup.t_self = cbti->bti_itup.t_tid; heap_fetch(heapRel, SnapshotDirty, &htup, &buffer); if (htup.t_data != NULL) /* it is a duplicate */ { TransactionId xwait = (TransactionIdIsValid(SnapshotDirty->xmin)) ? SnapshotDirty->xmin : SnapshotDirty->xmax; /* * If this tuple is being updated by other transaction * then we have to wait for its commit/abort. */ ReleaseBuffer(buffer); if (TransactionIdIsValid(xwait)) { if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf, BT_READ); _bt_relbuf(rel, buf, BT_WRITE); XactLockTableWait(xwait); buf = _bt_getbuf(rel, blkno, BT_WRITE); goto l1;/* continue from the begin */ } elog(ERROR, "Cannot insert a duplicate key into a unique index"); } /* htup null so no buffer to release */ /* get next offnum */ if (offset < maxoff) offset = OffsetNumberNext(offset); else { /* move right ? */ if (P_RIGHTMOST(opaque)) break; if (!_bt_isequal(itupdesc, page, P_HIKEY, natts, itup_scankey)) break; /* * min key of the right page is the same, ooh - so * many dead duplicates... */ nblkno = opaque->btpo_next; if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf, BT_READ); for (nbuf = InvalidBuffer;;) { nbuf = _bt_getbuf(rel, nblkno, BT_READ); page = BufferGetPage(nbuf); maxoff = PageGetMaxOffsetNumber(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page); offset = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; if (!PageIsEmpty(page) && offset <= maxoff) { /* Found some key */ break; } else { /* Empty or "pseudo"-empty page - get next */ nblkno = opaque->btpo_next; _bt_relbuf(rel, nbuf, BT_READ); nbuf = InvalidBuffer; if (nblkno == P_NONE) break; } } if (nbuf == InvalidBuffer) break; } } if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf, BT_READ); } } /* do the insertion */ res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, (BTItem) NULL); /* be tidy */ _bt_freestack(stack); _bt_freeskey(itup_scankey); return res;}/* * _bt_insertonpg() -- Insert a tuple on a particular page in the index. * * This recursive procedure does the following things: * * + if necessary, splits the target page. * + finds the right place to insert the tuple (taking into * account any changes induced by a split). * + inserts the tuple. * + if the page was split, pops the parent stack, and finds the * right place to insert the new child pointer (by walking * right using information stored in the parent stack). * + invoking itself with the appropriate tuple for the right * child page on the parent. * * On entry, we must have the right buffer on which to do the * insertion, and the buffer must be pinned and locked. On return, * we will have dropped both the pin and the write lock on the buffer. * * The locking interactions in this code are critical. You should * grok Lehman and Yao's paper before making any changes. In addition, * you need to understand how we disambiguate duplicate keys in this * implementation, in order to be able to find our location using * L&Y "move right" operations. Since we may insert duplicate user * keys, and since these dups may propogate up the tree, we use the * 'afteritem' parameter to position ourselves correctly for the * insertion on internal pages. */static InsertIndexResult_bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem){ InsertIndexResult res; Page page; BTPageOpaque lpageop; BlockNumber itup_blkno; OffsetNumber itup_off; OffsetNumber firstright = InvalidOffsetNumber; int itemsz; bool do_split = false; bool keys_equal = false; page = BufferGetPage(buf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); itemsz = IndexTupleDSize(btitem->bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do * this but we need to be * consistent */ /* * If we have to insert item on the leftmost page which is the first * page in the chain of duplicates then: 1. if scankey == hikey (i.e. * - new duplicate item) then insert it here; 2. if scankey < hikey * then: 2.a if there is duplicate key(s) here - we force splitting; * 2.b else - we may "eat" this page from duplicates chain. */ if (lpageop->btpo_flags & BTP_CHAIN) { OffsetNumber maxoff = PageGetMaxOffsetNumber(page); ItemId hitemid; BTItem hitem; Assert(!P_RIGHTMOST(lpageop)); hitemid = PageGetItemId(page, P_HIKEY); hitem = (BTItem) PageGetItem(page, hitemid); if (maxoff > P_HIKEY && !_bt_itemcmp(rel, keysz, hitem, (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)), BTEqualStrategyNumber)) elog(FATAL, "btree: bad key on the page in the chain of duplicates"); if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid, BTEqualStrategyNumber)) { if (!P_LEFTMOST(lpageop)) elog(FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates"); if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid, BTLessStrategyNumber)) elog(FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates"); if (maxoff > P_HIKEY) /* have duplicate(s) */ { firstright = P_FIRSTKEY; do_split = true; } else/* "eat" page */ { Buffer pbuf; Page ppage; itup_blkno = BufferGetBlockNumber(buf); itup_off = PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED); if (itup_off == InvalidOffsetNumber) elog(FATAL, "btree: failed to add item"); lpageop->btpo_flags &= ~BTP_CHAIN; pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); ppage = BufferGetPage(pbuf); PageIndexTupleDelete(ppage, stack->bts_offset); pfree(stack->bts_btitem); stack->bts_btitem = _bt_formitem(&(btitem->bti_itup)); ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), itup_blkno, P_HIKEY); _bt_wrtbuf(rel, buf); res = _bt_insertonpg(rel, pbuf, stack->bts_parent, keysz, scankey, stack->bts_btitem, NULL); ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); return res; } } else { keys_equal = true; if (PageGetFreeSpace(page) < itemsz) do_split = true; } } else if (PageGetFreeSpace(page) < itemsz) do_split = true; else if (PageGetFreeSpace(page) < 3 * itemsz + 2 * sizeof(ItemIdData)) { OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY; OffsetNumber maxoff = PageGetMaxOffsetNumber(page); ItemId itid; BTItem previtem, chkitem; Size maxsize; Size currsize; itid = PageGetItemId(page, offnum); previtem = (BTItem) PageGetItem(page, itid); maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData)); for (offnum = OffsetNumberNext(offnum); offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itid = PageGetItemId(page, offnum); chkitem = (BTItem) PageGetItem(page, itid); if (!_bt_itemcmp(rel, keysz, previtem, chkitem, BTEqualStrategyNumber)) { if (currsize > maxsize) maxsize = currsize; currsize = 0; previtem = chkitem; } currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData)); } if (currsize > maxsize) maxsize = currsize; maxsize += sizeof(PageHeaderData) + MAXALIGN(sizeof(BTPageOpaqueData)); if (maxsize >= PageGetPageSize(page) / 2) do_split = true; } if (do_split) { Buffer rbuf; Page rpage; BTItem ritem; BlockNumber rbknum; BTPageOpaque rpageop; Buffer pbuf; Page ppage; BTPageOpaque ppageop; BlockNumber bknum = BufferGetBlockNumber(buf); BTItem lowLeftItem; OffsetNumber maxoff; bool shifted = false; bool left_chained = (lpageop->btpo_flags & BTP_CHAIN) ? true : false; bool is_root = lpageop->btpo_flags & BTP_ROOT; /* * Instead of splitting leaf page in the chain of duplicates * by new duplicate, insert it into some right page. */ if ((lpageop->btpo_flags & BTP_CHAIN) && (lpageop->btpo_flags & BTP_LEAF) && keys_equal) { rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); rpage = BufferGetPage(rbuf); rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); /* * some checks */ if (!P_RIGHTMOST(rpageop)) /* non-rightmost page */ { /* If we have the same hikey here then * it's yet another page in chain. */ if (_bt_skeycmp(rel, keysz, scankey, rpage, PageGetItemId(rpage, P_HIKEY), BTEqualStrategyNumber)) { if (!(rpageop->btpo_flags & BTP_CHAIN)) elog(FATAL, "btree: lost page in the chain of duplicates");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -