📄 nbtinsert.c
字号:
/*------------------------------------------------------------------------- * * nbtinsert.c * Item insertion in Lehman and Yao btrees for Postgres. * * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.164.2.1 2008/06/11 08:40:32 heikki Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/heapam.h"#include "access/nbtree.h"#include "access/transam.h"#include "miscadmin.h"#include "utils/inval.h"typedef struct{ /* context data for _bt_checksplitloc */ Size newitemsz; /* size of new item to be inserted */ int fillfactor; /* needed when splitting rightmost page */ bool is_leaf; /* T if splitting a leaf page */ bool is_rightmost; /* T if splitting a rightmost page */ OffsetNumber newitemoff; /* where the new item is to be inserted */ int leftspace; /* space available for items on left page */ int rightspace; /* space available for items on right page */ int olddataitemstotal; /* space taken by old items */ bool have_split; /* found a valid split? */ /* these fields valid only if have_split is true */ bool newitemonleft; /* new item on left or right of best split */ OffsetNumber firstright; /* best split point */ int best_delta; /* best size delta so far */} FindSplitData;static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);static TransactionId _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, Buffer buf, OffsetNumber ioffset, ScanKey itup_scankey);static void _bt_findinsertloc(Relation rel, Buffer *bufptr, OffsetNumber *offsetptr, int keysz, ScanKey scankey, IndexTuple newtup);static void _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, IndexTuple itup, OffsetNumber newitemoff, bool split_only_page);static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem, bool newitemonleft);static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber newitemoff, Size newitemsz, bool *newitemonleft);static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstoldonright, bool newitemonleft, int dataitemstoleft, Size firstoldonrightsz);static void _bt_pgaddtup(Relation rel, Page page, Size itemsize, IndexTuple itup, OffsetNumber itup_off, const char *where);static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);static void _bt_vacuum_one_page(Relation rel, Buffer buffer);/* * _bt_doinsert() -- Handle insertion of a single index tuple in the tree. * * This routine is called by the public interface routines, btbuild * and btinsert. By here, itup is filled in, including the TID. */void_bt_doinsert(Relation rel, IndexTuple itup, bool index_is_unique, Relation heapRel){ int natts = rel->rd_rel->relnatts; ScanKey itup_scankey; BTStack stack; Buffer buf; OffsetNumber offset; /* we need an insertion scan key to do our search, so build one */ itup_scankey = _bt_mkscankey(rel, itup);top: /* find the first page containing this key */ stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE); offset = InvalidOffsetNumber; /* trade in our read lock for a write lock */ LockBuffer(buf, BUFFER_LOCK_UNLOCK); LockBuffer(buf, BT_WRITE); /* * If the page was split between the time that we surrendered our read * lock and acquired our write lock, then this page may no longer be the * right place for the key we want to insert. In this case, we need to * move right in the tree. See Lehman and Yao for an excruciatingly * precise description. */ buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE); /* * If we're not allowing duplicates, make sure the key isn't already in * the index. * * NOTE: obviously, _bt_check_unique can only detect keys that are already * in the index; so it cannot defend against concurrent insertions of the * same key. We protect against that by means of holding a write lock on * the target page. Any other would-be inserter of the same key must * acquire a write lock on the same target page, so only one would-be * inserter can be making the check at one time. Furthermore, once we are * past the check we hold write locks continuously until we have performed * our insertion, so no later inserter can fail to see our insertion. * (This requires some care in _bt_insertonpg.) * * If we must wait for another xact, we release the lock while waiting, * and then must start over completely. */ if (index_is_unique) { TransactionId xwait; offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey); if (TransactionIdIsValid(xwait)) { /* Have to wait for the other guy ... */ _bt_relbuf(rel, buf); XactLockTableWait(xwait); /* start over... */ _bt_freestack(stack); goto top; } } /* do the insertion */ _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup); _bt_insertonpg(rel, buf, stack, itup, offset, false); /* be tidy */ _bt_freestack(stack); _bt_freeskey(itup_scankey);}/* * _bt_check_unique() -- Check for violation of unique index constraint * * offset points to the first possible item that could conflict. It can * also point to end-of-page, which means that the first tuple to check * is the first tuple on the next page. * * Returns InvalidTransactionId if there is no conflict, else an xact ID * we must wait for to see if it commits a conflicting tuple. If an actual * conflict is detected, no return --- just ereport(). */static TransactionId_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, Buffer buf, OffsetNumber offset, ScanKey itup_scankey){ TupleDesc itupdesc = RelationGetDescr(rel); int natts = rel->rd_rel->relnatts; SnapshotData SnapshotDirty; OffsetNumber maxoff; Page page; BTPageOpaque opaque; Buffer nbuf = InvalidBuffer; InitDirtySnapshot(SnapshotDirty); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); maxoff = PageGetMaxOffsetNumber(page); /* * Scan over all equal tuples, looking for live conflicts. */ for (;;) { ItemId curitemid; IndexTuple curitup; BlockNumber nblkno; /* * make sure the offset points to an actual item before trying to * examine it... */ if (offset <= maxoff) { curitemid = PageGetItemId(page, offset); /* * We can skip items that are marked killed. * * Formerly, we applied _bt_isequal() before checking the kill * flag, so as to fall out of the item loop as soon as possible. * However, in the presence of heavy update activity an index may * contain many killed items with the same key; running * _bt_isequal() on each killed item gets expensive. Furthermore * it is likely that the non-killed version of each key appears * first, so that we didn't actually get to exit any sooner * anyway. So now we just advance over killed items as quickly as * we can. We only apply _bt_isequal() when we get to a non-killed * item or the end of the page. */ if (!ItemIdIsDead(curitemid)) { ItemPointerData htid; bool all_dead; /* * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's * how we handling NULLs - and so we must not use _bt_compare * in real comparison, but only for ordering/finding items on * pages. - vadim 03/24/97 */ if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) break; /* we're past all the equal tuples */ /* okay, we gotta fetch the heap tuple ... */ curitup = (IndexTuple) PageGetItem(page, curitemid); htid = curitup->t_tid; /* * We check the whole HOT-chain to see if there is any tuple * that satisfies SnapshotDirty. This is necessary because we * have just a single index entry for the entire chain. */ if (heap_hot_search(&htid, heapRel, &SnapshotDirty, &all_dead)) { /* it is a duplicate */ TransactionId xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ? SnapshotDirty.xmin : SnapshotDirty.xmax; /* * If this tuple is being updated by other transaction * then we have to wait for its commit/abort. */ if (TransactionIdIsValid(xwait)) { if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf); /* Tell _bt_doinsert to wait... */ return xwait; } /* * Otherwise we have a definite conflict. But before * complaining, look to see if the tuple we want to insert * is itself now committed dead --- if so, don't complain. * This is a waste of time in normal scenarios but we must * do it to support CREATE INDEX CONCURRENTLY. * * We must follow HOT-chains here because during * concurrent index build, we insert the root TID though * the actual tuple may be somewhere in the HOT-chain. * While following the chain we might not stop at the * exact tuple which triggered the insert, but that's OK * because if we find a live tuple anywhere in this chain, * we have a unique key conflict. The other live tuple is * not part of this chain because it had a different index * entry. */ htid = itup->t_tid; if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL)) { /* Normal case --- it's still live */ } else { /* * It's been deleted, so no error, and no need to * continue searching */ break; } ereport(ERROR, (errcode(ERRCODE_UNIQUE_VIOLATION), errmsg("duplicate key value violates unique constraint \"%s\"", RelationGetRelationName(rel)))); } else if (all_dead) { /* * The conflicting tuple (or whole HOT chain) is dead to * everyone, so we may as well mark the index entry * killed. */ ItemIdMarkDead(curitemid); opaque->btpo_flags |= BTP_HAS_GARBAGE; /* be sure to mark the proper buffer dirty... */ if (nbuf != InvalidBuffer) SetBufferCommitInfoNeedsSave(nbuf); else SetBufferCommitInfoNeedsSave(buf); } } } /* * Advance to next tuple to continue checking. */ if (offset < maxoff) offset = OffsetNumberNext(offset); else { /* If scankey == hikey we gotta check the next page too */ if (P_RIGHTMOST(opaque)) break; if (!_bt_isequal(itupdesc, page, P_HIKEY, natts, itup_scankey)) break; /* Advance to next non-dead page --- there must be one */ for (;;) { nblkno = opaque->btpo_next; nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ); page = BufferGetPage(nbuf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(opaque)) break; if (P_RIGHTMOST(opaque)) elog(ERROR, "fell off the end of index \"%s\"", RelationGetRelationName(rel)); } maxoff = PageGetMaxOffsetNumber(page); offset = P_FIRSTDATAKEY(opaque); } } if (nbuf != InvalidBuffer) _bt_relbuf(rel, nbuf); return InvalidTransactionId;}/* * _bt_findinsertloc() -- Finds an insert location for a tuple * * If the new key is equal to one or more existing keys, we can * legitimately place it anywhere in the series of equal keys --- in fact, * if the new key is equal to the page's "high key" we can place it on * the next page. If it is equal to the high key, and there's not room * to insert the new tuple on the current page without splitting, then * we can move right hoping to find more free space and avoid a split. * (We should not move right indefinitely, however, since that leads to * O(N^2) insertion behavior in the presence of many equal keys.) * Once we have chosen the page to put the key on, we'll insert it before * any existing equal keys because of the way _bt_binsrch() works. * * If there's not enough room in the space, we try to make room by * removing any LP_DEAD tuples. * * On entry, *buf and *offsetptr point to the first legal position * where the new tuple could be inserted. The caller should hold an * exclusive lock on *buf. *offsetptr can also be set to * InvalidOffsetNumber, in which case the function will search for the * right location within the page if needed. On exit, they point to the * chosen insert location. If _bt_findinsertloc decides to move right, * the lock and pin on the original page will be released and the new * page returned to the caller is exclusively locked instead. * * newtup is the new tuple we're inserting, and scankey is an insertion * type scan key for it. */static void_bt_findinsertloc(Relation rel, Buffer *bufptr, OffsetNumber *offsetptr, int keysz, ScanKey scankey, IndexTuple newtup){ Buffer buf = *bufptr; Page page = BufferGetPage(buf); Size itemsz; BTPageOpaque lpageop; bool movedright, vacuumed; OffsetNumber newitemoff; OffsetNumber firstlegaloff = *offsetptr; lpageop = (BTPageOpaque) PageGetSpecialPointer(page); itemsz = IndexTupleDSize(*newtup); itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we * need to be consistent */ /* * Check whether the item can fit on a btree page at all. (Eventually, we * ought to try to apply TOAST methods if not.) We actually need to be * able to fit three items on every page, so restrict any one item to 1/3 * the per-page available space. Note that at this point, itemsz doesn't * include the ItemId. */ if (itemsz > BTMaxItemSize(page)) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("index row size %lu exceeds btree maximum, %lu", (unsigned long) itemsz, (unsigned long) BTMaxItemSize(page)), errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n" "Consider a function index of an MD5 hash of the value, " "or use full text indexing."))); /*---------- * If we will need to split the page to put the item on this page, * check whether we can put the tuple somewhere to the right, * instead. Keep scanning right until we * (a) find a page with enough free space, * (b) reach the last page where the tuple can legally go, or * (c) get tired of searching. * (c) is not flippant; it is important because if there are many * pages' worth of equal keys, it's better to split one of the early * pages than to scan all the way to the end of the run of equal keys * on every insert. We implement "get tired" as a random choice, * since stopping after scanning a fixed number of pages wouldn't work * well (we'd never reach the right-hand side of previously split * pages). Currently the probability of moving right is set at 0.99, * which may seem too high to change the behavior much, but it does an * excellent job of preventing O(N^2) behavior with many equal keys. *---------- */ movedright = false; vacuumed = false; while (PageGetFreeSpace(page) < itemsz) { Buffer rbuf; /* * before considering moving right, see if we can obtain enough space * by erasing LP_DEAD items */ if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop)) { _bt_vacuum_one_page(rel, buf); /* * remember that we vacuumed this page, because that makes the * hint supplied by the caller invalid */ vacuumed = true; if (PageGetFreeSpace(page) >= itemsz) break; /* OK, now we have enough space */ } /* * nope, so check conditions (b) and (c) enumerated above */ if (P_RIGHTMOST(lpageop) || _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 || random() <= (MAX_RANDOM_VALUE / 100)) break; /* * step right to next non-dead page * * must write-lock that page before releasing write lock on current * page; else someone else's _bt_check_unique scan could fail to see * our insertion. write locks on intermediate dead pages won't do * because we don't know when they will get de-linked from the tree. */ rbuf = InvalidBuffer;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -