📄 nbtutils.c
字号:
/* * Test whether an indextuple satisfies a row-comparison scan condition. * * Return true if so, false if not. If not, also clear *continuescan if * it's not possible for any future tuples in the current scan direction * to pass the qual. * * This is a subroutine for _bt_checkkeys, which see for more info. */static bool_bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, ScanDirection dir, bool *continuescan){ ScanKey subkey = (ScanKey) DatumGetPointer(skey->sk_argument); int32 cmpresult = 0; bool result; /* First subkey should be same as the header says */ Assert(subkey->sk_attno == skey->sk_attno); /* Loop over columns of the row condition */ for (;;) { Datum datum; bool isNull; Assert(subkey->sk_flags & SK_ROW_MEMBER); datum = index_getattr(tuple, subkey->sk_attno, tupdesc, &isNull); if (isNull) { if (subkey->sk_flags & SK_BT_NULLS_FIRST) { /* * Since NULLs are sorted before non-NULLs, we know we have * reached the lower limit of the range of values for this * index attr. On a backward scan, we can stop if this qual is * one of the "must match" subset. On a forward scan, * however, we should keep going. */ if ((subkey->sk_flags & SK_BT_REQBKWD) && ScanDirectionIsBackward(dir)) *continuescan = false; } else { /* * Since NULLs are sorted after non-NULLs, we know we have * reached the upper limit of the range of values for this * index attr. On a forward scan, we can stop if this qual is * one of the "must match" subset. On a backward scan, * however, we should keep going. */ if ((subkey->sk_flags & SK_BT_REQFWD) && ScanDirectionIsForward(dir)) *continuescan = false; } /* * In any case, this indextuple doesn't match the qual. */ return false; } if (subkey->sk_flags & SK_ISNULL) { /* * Unlike the simple-scankey case, this isn't a disallowed case. * But it can never match. If all the earlier row comparison * columns are required for the scan direction, we can stop the * scan, because there can't be another tuple that will succeed. */ if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument)) subkey--; if ((subkey->sk_flags & SK_BT_REQFWD) && ScanDirectionIsForward(dir)) *continuescan = false; else if ((subkey->sk_flags & SK_BT_REQBKWD) && ScanDirectionIsBackward(dir)) *continuescan = false; return false; } /* Perform the test --- three-way comparison not bool operator */ cmpresult = DatumGetInt32(FunctionCall2(&subkey->sk_func, datum, subkey->sk_argument)); if (subkey->sk_flags & SK_BT_DESC) cmpresult = -cmpresult; /* Done comparing if unequal, else advance to next column */ if (cmpresult != 0) break; if (subkey->sk_flags & SK_ROW_END) break; subkey++; } /* * At this point cmpresult indicates the overall result of the row * comparison, and subkey points to the deciding column (or the last * column if the result is "="). */ switch (subkey->sk_strategy) { /* EQ and NE cases aren't allowed here */ case BTLessStrategyNumber: result = (cmpresult < 0); break; case BTLessEqualStrategyNumber: result = (cmpresult <= 0); break; case BTGreaterEqualStrategyNumber: result = (cmpresult >= 0); break; case BTGreaterStrategyNumber: result = (cmpresult > 0); break; default: elog(ERROR, "unrecognized RowCompareType: %d", (int) subkey->sk_strategy); result = 0; /* keep compiler quiet */ break; } if (!result) { /* * Tuple fails this qual. If it's a required qual for the current * scan direction, then we can conclude no further tuples will pass, * either. Note we have to look at the deciding column, not * necessarily the first or last column of the row condition. */ if ((subkey->sk_flags & SK_BT_REQFWD) && ScanDirectionIsForward(dir)) *continuescan = false; else if ((subkey->sk_flags & SK_BT_REQBKWD) && ScanDirectionIsBackward(dir)) *continuescan = false; } return result;}/* * _bt_killitems - set LP_DEAD state for items an indexscan caller has * told us were killed * * scan->so contains information about the current page and killed tuples * thereon (generally, this should only be called if so->numKilled > 0). * * The caller must have pin on so->currPos.buf, but may or may not have * read-lock, as indicated by haveLock. Note that we assume read-lock * is sufficient for setting LP_DEAD status (which is only a hint). * * We match items by heap TID before assuming they are the right ones to * delete. We cope with cases where items have moved right due to insertions. * If an item has moved off the current page due to a split, we'll fail to * find it and do nothing (this is not an error case --- we assume the item * will eventually get marked in a future indexscan). Note that because we * hold pin on the target page continuously from initially reading the items * until applying this function, VACUUM cannot have deleted any items from * the page, and so there is no need to search left from the recorded offset. * (This observation also guarantees that the item is still the right one * to delete, which might otherwise be questionable since heap TIDs can get * recycled.) */void_bt_killitems(IndexScanDesc scan, bool haveLock){ BTScanOpaque so = (BTScanOpaque) scan->opaque; Page page; BTPageOpaque opaque; OffsetNumber minoff; OffsetNumber maxoff; int i; bool killedsomething = false; Assert(BufferIsValid(so->currPos.buf)); if (!haveLock) LockBuffer(so->currPos.buf, BT_READ); page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); for (i = 0; i < so->numKilled; i++) { int itemIndex = so->killedItems[i]; BTScanPosItem *kitem = &so->currPos.items[itemIndex]; OffsetNumber offnum = kitem->indexOffset; Assert(itemIndex >= so->currPos.firstItem && itemIndex <= so->currPos.lastItem); if (offnum < minoff) continue; /* pure paranoia */ while (offnum <= maxoff) { ItemId iid = PageGetItemId(page, offnum); IndexTuple ituple = (IndexTuple) PageGetItem(page, iid); if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid)) { /* found the item */ ItemIdMarkDead(iid); killedsomething = true; break; /* out of inner search loop */ } offnum = OffsetNumberNext(offnum); } } /* * Since this can be redone later if needed, it's treated the same as a * commit-hint-bit status update for heap tuples: we mark the buffer dirty * but don't make a WAL log entry. * * Whenever we mark anything LP_DEAD, we also set the page's * BTP_HAS_GARBAGE flag, which is likewise just a hint. */ if (killedsomething) { opaque->btpo_flags |= BTP_HAS_GARBAGE; SetBufferCommitInfoNeedsSave(so->currPos.buf); } if (!haveLock) LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); /* * Always reset the scan state, so we don't look for same items on other * pages. */ so->numKilled = 0;}/* * The following routines manage a shared-memory area in which we track * assignment of "vacuum cycle IDs" to currently-active btree vacuuming * operations. There is a single counter which increments each time we * start a vacuum to assign it a cycle ID. Since multiple vacuums could * be active concurrently, we have to track the cycle ID for each active * vacuum; this requires at most MaxBackends entries (usually far fewer). * We assume at most one vacuum can be active for a given index. * * Access to the shared memory area is controlled by BtreeVacuumLock. * In principle we could use a separate lmgr locktag for each index, * but a single LWLock is much cheaper, and given the short time that * the lock is ever held, the concurrency hit should be minimal. */typedef struct BTOneVacInfo{ LockRelId relid; /* global identifier of an index */ BTCycleId cycleid; /* cycle ID for its active VACUUM */} BTOneVacInfo;typedef struct BTVacInfo{ BTCycleId cycle_ctr; /* cycle ID most recently assigned */ int num_vacuums; /* number of currently active VACUUMs */ int max_vacuums; /* allocated length of vacuums[] array */ BTOneVacInfo vacuums[1]; /* VARIABLE LENGTH ARRAY */} BTVacInfo;static BTVacInfo *btvacinfo;/* * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index, * or zero if there is no active VACUUM * * Note: for correct interlocking, the caller must already hold pin and * exclusive lock on each buffer it will store the cycle ID into. This * ensures that even if a VACUUM starts immediately afterwards, it cannot * process those pages until the page split is complete. */BTCycleId_bt_vacuum_cycleid(Relation rel){ BTCycleId result = 0; int i; /* Share lock is enough since this is a read-only operation */ LWLockAcquire(BtreeVacuumLock, LW_SHARED); for (i = 0; i < btvacinfo->num_vacuums; i++) { BTOneVacInfo *vac = &btvacinfo->vacuums[i]; if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) { result = vac->cycleid; break; } } LWLockRelease(BtreeVacuumLock); return result;}/* * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation * * Note: the caller must guarantee that it will eventually call * _bt_end_vacuum, else we'll permanently leak an array slot. To ensure * that this happens even in elog(FATAL) scenarios, the appropriate coding * is not just a PG_TRY, but * PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel)) */BTCycleId_bt_start_vacuum(Relation rel){ BTCycleId result; int i; BTOneVacInfo *vac; LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE); /* * Assign the next cycle ID, being careful to avoid zero as well as the * reserved high values. */ result = ++(btvacinfo->cycle_ctr); if (result == 0 || result > MAX_BT_CYCLE_ID) result = btvacinfo->cycle_ctr = 1; /* Let's just make sure there's no entry already for this index */ for (i = 0; i < btvacinfo->num_vacuums; i++) { vac = &btvacinfo->vacuums[i]; if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) { /* * Unlike most places in the backend, we have to explicitly * release our LWLock before throwing an error. This is because * we expect _bt_end_vacuum() to be called before transaction * abort cleanup can run to release LWLocks. */ LWLockRelease(BtreeVacuumLock); elog(ERROR, "multiple active vacuums for index \"%s\"", RelationGetRelationName(rel)); } } /* OK, add an entry */ if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums) { LWLockRelease(BtreeVacuumLock); elog(ERROR, "out of btvacinfo slots"); } vac = &btvacinfo->vacuums[btvacinfo->num_vacuums]; vac->relid = rel->rd_lockInfo.lockRelId; vac->cycleid = result; btvacinfo->num_vacuums++; LWLockRelease(BtreeVacuumLock); return result;}/* * _bt_end_vacuum --- mark a btree VACUUM operation as done * * Note: this is deliberately coded not to complain if no entry is found; * this allows the caller to put PG_TRY around the start_vacuum operation. */void_bt_end_vacuum(Relation rel){ int i; LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE); /* Find the array entry */ for (i = 0; i < btvacinfo->num_vacuums; i++) { BTOneVacInfo *vac = &btvacinfo->vacuums[i]; if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) { /* Remove it by shifting down the last entry */ *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1]; btvacinfo->num_vacuums--; break; } } LWLockRelease(BtreeVacuumLock);}/* * _bt_end_vacuum wrapped as an on_shmem_exit callback function */void_bt_end_vacuum_callback(int code, Datum arg){ _bt_end_vacuum((Relation) DatumGetPointer(arg));}/* * BTreeShmemSize --- report amount of shared memory space needed */SizeBTreeShmemSize(void){ Size size; size = offsetof(BTVacInfo, vacuums[0]); size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo))); return size;}/* * BTreeShmemInit --- initialize this module's shared memory */voidBTreeShmemInit(void){ bool found; btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State", BTreeShmemSize(), &found); if (!IsUnderPostmaster) { /* Initialize shared memory area */ Assert(!found); /* * It doesn't really matter what the cycle counter starts at, but * having it always start the same doesn't seem good. Seed with * low-order bits of time() instead. */ btvacinfo->cycle_ctr = (BTCycleId) time(NULL); btvacinfo->num_vacuums = 0; btvacinfo->max_vacuums = MaxBackends; } else Assert(found);}Datumbtoptions(PG_FUNCTION_ARGS){ Datum reloptions = PG_GETARG_DATUM(0); bool validate = PG_GETARG_BOOL(1); bytea *result; result = default_reloptions(reloptions, validate, BTREE_MIN_FILLFACTOR, BTREE_DEFAULT_FILLFACTOR); if (result) PG_RETURN_BYTEA_P(result); PG_RETURN_NULL();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -