📄 nbtinsert.c
字号:
for (;;) { BlockNumber rblkno = lpageop->btpo_next; rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE); page = BufferGetPage(rbuf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(lpageop)) break; if (P_RIGHTMOST(lpageop)) elog(ERROR, "fell off the end of index \"%s\"", RelationGetRelationName(rel)); } _bt_relbuf(rel, buf); buf = rbuf; movedright = true; vacuumed = false; } /* * Now we are on the right page, so find the insert position. If we moved * right at all, we know we should insert at the start of the page. If we * didn't move right, we can use the firstlegaloff hint if the caller * supplied one, unless we vacuumed the page which might have moved tuples * around making the hint invalid. If we didn't move right or can't use * the hint, find the position by searching. */ if (movedright) newitemoff = P_FIRSTDATAKEY(lpageop); else if (firstlegaloff != InvalidOffsetNumber && !vacuumed) newitemoff = firstlegaloff; else newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false); *bufptr = buf; *offsetptr = newitemoff;}/*---------- * _bt_insertonpg() -- Insert a tuple on a particular page in the index. * * This recursive procedure does the following things: * * + if necessary, splits the target page (making sure that the * split is equitable as far as post-insert free space goes). * + inserts the tuple. * + if the page was split, pops the parent stack, and finds the * right place to insert the new child pointer (by walking * right using information stored in the parent stack). * + invokes itself with the appropriate tuple for the right * child page on the parent. * + updates the metapage if a true root or fast root is split. * * On entry, we must have the right buffer in which to do the * insertion, and the buffer must be pinned and write-locked. On return, * we will have dropped both the pin and the lock on the buffer. * * The locking interactions in this code are critical. You should * grok Lehman and Yao's paper before making any changes. In addition, * you need to understand how we disambiguate duplicate keys in this * implementation, in order to be able to find our location using * L&Y "move right" operations. Since we may insert duplicate user * keys, and since these dups may propagate up the tree, we use the * 'afteritem' parameter to position ourselves correctly for the * insertion on internal pages. *---------- */static void_bt_insertonpg(Relation rel, Buffer buf, BTStack stack, IndexTuple itup, OffsetNumber newitemoff, bool split_only_page){ Page page; BTPageOpaque lpageop; OffsetNumber firstright = InvalidOffsetNumber; Size itemsz; page = BufferGetPage(buf); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); itemsz = IndexTupleDSize(*itup); itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we * need to be consistent */ /* * Do we need to split the page to fit the item on it? * * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result, * so this comparison is correct even though we appear to be accounting * only for the item and not for its line pointer. */ if (PageGetFreeSpace(page) < itemsz) { bool is_root = P_ISROOT(lpageop); bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop); bool newitemonleft; Buffer rbuf; /* Choose the split point */ firstright = _bt_findsplitloc(rel, page, newitemoff, itemsz, &newitemonleft); /* split the buffer into left and right halves */ rbuf = _bt_split(rel, buf, firstright, newitemoff, itemsz, itup, newitemonleft); /*---------- * By here, * * + our target page has been split; * + the original tuple has been inserted; * + we have write locks on both the old (left half) * and new (right half) buffers, after the split; and * + we know the key we want to insert into the parent * (it's the "high key" on the left child page). * * We're ready to do the parent insertion. We need to hold onto the * locks for the child pages until we locate the parent, but we can * release them before doing the actual insertion (see Lehman and Yao * for the reasoning). *---------- */ _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only); } else { Buffer metabuf = InvalidBuffer; Page metapg = NULL; BTMetaPageData *metad = NULL; OffsetNumber itup_off; BlockNumber itup_blkno; itup_off = newitemoff; itup_blkno = BufferGetBlockNumber(buf); /* * If we are doing this insert because we split a page that was the * only one on its tree level, but was not the root, it may have been * the "fast root". We need to ensure that the fast root link points * at or above the current page. We can safely acquire a lock on the * metapage here --- see comments for _bt_newroot(). */ if (split_only_page) { Assert(!P_ISLEAF(lpageop)); metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); if (metad->btm_fastlevel >= lpageop->btpo.level) { /* no update wanted */ _bt_relbuf(rel, metabuf); metabuf = InvalidBuffer; } } /* Do the update. No ereport(ERROR) until changes are logged */ START_CRIT_SECTION(); _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page"); MarkBufferDirty(buf); if (BufferIsValid(metabuf)) { metad->btm_fastroot = itup_blkno; metad->btm_fastlevel = lpageop->btpo.level; MarkBufferDirty(metabuf); } /* XLOG stuff */ if (!rel->rd_istemp) { xl_btree_insert xlrec; BlockNumber xldownlink; xl_btree_metadata xlmeta; uint8 xlinfo; XLogRecPtr recptr; XLogRecData rdata[4]; XLogRecData *nextrdata; IndexTupleData trunctuple; xlrec.target.node = rel->rd_node; ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfBtreeInsert; rdata[0].buffer = InvalidBuffer; rdata[0].next = nextrdata = &(rdata[1]); if (P_ISLEAF(lpageop)) xlinfo = XLOG_BTREE_INSERT_LEAF; else { xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid)); Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); nextrdata->data = (char *) &xldownlink; nextrdata->len = sizeof(BlockNumber); nextrdata->buffer = InvalidBuffer; nextrdata->next = nextrdata + 1; nextrdata++; xlinfo = XLOG_BTREE_INSERT_UPPER; } if (BufferIsValid(metabuf)) { xlmeta.root = metad->btm_root; xlmeta.level = metad->btm_level; xlmeta.fastroot = metad->btm_fastroot; xlmeta.fastlevel = metad->btm_fastlevel; nextrdata->data = (char *) &xlmeta; nextrdata->len = sizeof(xl_btree_metadata); nextrdata->buffer = InvalidBuffer; nextrdata->next = nextrdata + 1; nextrdata++; xlinfo = XLOG_BTREE_INSERT_META; } /* Read comments in _bt_pgaddtup */ if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) { trunctuple = *itup; trunctuple.t_info = sizeof(IndexTupleData); nextrdata->data = (char *) &trunctuple; nextrdata->len = sizeof(IndexTupleData); } else { nextrdata->data = (char *) itup; nextrdata->len = IndexTupleDSize(*itup); } nextrdata->buffer = buf; nextrdata->buffer_std = true; nextrdata->next = NULL; recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); if (BufferIsValid(metabuf)) { PageSetLSN(metapg, recptr); PageSetTLI(metapg, ThisTimeLineID); } PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); /* release buffers; send out relcache inval if metapage changed */ if (BufferIsValid(metabuf)) { if (!InRecovery) CacheInvalidateRelcache(rel); _bt_relbuf(rel, metabuf); } _bt_relbuf(rel, buf); }}/* * _bt_split() -- split a page in the btree. * * On entry, buf is the page to split, and is pinned and write-locked. * firstright is the item index of the first item to be moved to the * new right page. newitemoff etc. tell us about the new item that * must be inserted along with the data from the old page. * * Returns the new right sibling of buf, pinned and write-locked. * The pin and lock on buf are maintained. */static Buffer_bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem, bool newitemonleft){ Buffer rbuf; Page origpage; Page leftpage, rightpage; BTPageOpaque ropaque, lopaque, oopaque; Buffer sbuf = InvalidBuffer; Page spage = NULL; BTPageOpaque sopaque = NULL; Size itemsz; ItemId itemid; IndexTuple item; OffsetNumber leftoff, rightoff; OffsetNumber maxoff; OffsetNumber i; bool isroot; rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); rightpage = BufferGetPage(rbuf); _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* rightpage was already initialized by _bt_getbuf */ /* * Copy the original page's LSN and TLI into leftpage, which will become * the updated version of the page. We need this because XLogInsert will * examine these fields and possibly dump them in a page image. */ PageSetLSN(leftpage, PageGetLSN(origpage)); PageSetTLI(leftpage, PageGetTLI(origpage)); /* init btree private data */ oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); isroot = P_ISROOT(oopaque); /* if we're splitting this page, it won't be the root when we're done */ /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */ lopaque->btpo_flags = oopaque->btpo_flags; lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE); ropaque->btpo_flags = lopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; lopaque->btpo_next = BufferGetBlockNumber(rbuf); ropaque->btpo_prev = BufferGetBlockNumber(buf); ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; /* Since we already have write-lock on both pages, ok to read cycleid */ lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel); ropaque->btpo_cycleid = lopaque->btpo_cycleid; /* * If the page we're splitting is not the rightmost page at its level in * the tree, then the first entry on the page is the high key for the * page. We need to copy that to the right half. Otherwise (meaning the * rightmost page case), all the items on the right half will be user * data. */ rightoff = P_HIKEY; if (!P_RIGHTMOST(oopaque)) { itemid = PageGetItemId(origpage, P_HIKEY); itemsz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, false, false) == InvalidOffsetNumber) elog(PANIC, "failed to add hikey to the right sibling" " while splitting block %u of index \"%s\"", BufferGetBlockNumber(buf), RelationGetRelationName(rel)); rightoff = OffsetNumberNext(rightoff); } /* * The "high key" for the new left page will be the first key that's going * to go into the new right page. This might be either the existing data * item at position firstright, or the incoming tuple. */ leftoff = P_HIKEY; if (!newitemonleft && newitemoff == firstright) { /* incoming tuple will become first on right page */ itemsz = newitemsz; item = newitem; } else { /* existing item at firstright will become first on right page */ itemid = PageGetItemId(origpage, firstright); itemsz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(origpage, itemid); } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, false, false) == InvalidOffsetNumber) elog(PANIC, "failed to add hikey to the left sibling" " while splitting block %u of index \"%s\"", BufferGetBlockNumber(buf), RelationGetRelationName(rel)); leftoff = OffsetNumberNext(leftoff); /* * Now transfer all the data items to the appropriate page. * * Note: we *must* insert at least the right page's items in item-number * order, for the benefit of _bt_restore_page(). */ maxoff = PageGetMaxOffsetNumber(origpage); for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i)) { itemid = PageGetItemId(origpage, i); itemsz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(origpage, itemid); /* does new item belong before this one? */ if (i == newitemoff) { if (newitemonleft) { _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, "left sibling"); leftoff = OffsetNumberNext(leftoff); } else { _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, "right sibling"); rightoff = OffsetNumberNext(rightoff); } } /* decide which page to put it on */ if (i < firstright) { _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff, "left sibling"); leftoff = OffsetNumberNext(leftoff); } else { _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff, "right sibling"); rightoff = OffsetNumberNext(rightoff); } } /* cope with possibility that newitem goes at the end */ if (i <= newitemoff) { /* * Can't have newitemonleft here; that would imply we were told to put * *everything* on the left page, which cannot fit (if it could, we'd * not be splitting the page). */ Assert(!newitemonleft); _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, "right sibling"); rightoff = OffsetNumberNext(rightoff); } /* * We have to grab the right sibling (if any) and fix the prev pointer * there. We are guaranteed that this is deadlock-free since no other * writer will be holding a lock on that page and trying to move left, and * all readers release locks on a page before trying to fetch its * neighbors. */ if (!P_RIGHTMOST(ropaque)) { sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); if (sopaque->btpo_prev != ropaque->btpo_prev) elog(PANIC, "right sibling's left-link doesn't match: " "block %u links to %u instead of expected %u in index \"%s\"", ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev, RelationGetRelationName(rel)); /* * Check to see if we can set the SPLIT_END flag in the right-hand * split page; this can save some I/O for vacuum since it need not * proceed to the right sibling. We can set the flag if the right * sibling has a different cycleid: that means it could not be part of * a group of pages that were all split off from the same ancestor * page. If you're confused, imagine that page A splits to A B and * then again, yielding A C B, while vacuum is in progress. Tuples * originally in A could now be in either B or C, hence vacuum must * examine both pages. But if D, our right sibling, has a different * cycleid then it could not contain any tuples that were in A when * the vacuum started. */ if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -