📄 multixact.c
字号:
* transaction ID so it can't be a multixact member. Therefore, if we * read a zero from the members array, just ignore it. * * This is all pretty messy, but the mess occurs only in infrequent corner * cases, so it seems better than holding the MultiXactGenLock for a long * time on every multixact creation. */retry: LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE); pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; offset = *offptr; Assert(offset != 0); /* * Use the same increment rule as GetNewMultiXactId(), that is, don't * handle wraparound explicitly until needed. */ tmpMXact = multi + 1; if (nextMXact == tmpMXact) { /* Corner case 1: there is no next multixact */ length = nextOffset - offset; } else { MultiXactOffset nextMXOffset; /* handle wraparound if needed */ if (tmpMXact < FirstMultiXactId) tmpMXact = FirstMultiXactId; prev_pageno = pageno; pageno = MultiXactIdToOffsetPage(tmpMXact); entryno = MultiXactIdToOffsetEntry(tmpMXact); if (pageno != prev_pageno) slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; nextMXOffset = *offptr; if (nextMXOffset == 0) { /* Corner case 2: next multixact is still being filled in */ LWLockRelease(MultiXactOffsetControlLock); pg_usleep(1000L); goto retry; } length = nextMXOffset - offset; } LWLockRelease(MultiXactOffsetControlLock); ptr = (TransactionId *) palloc(length * sizeof(TransactionId)); *xids = ptr; /* Now get the members themselves. */ LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE); truelength = 0; prev_pageno = -1; for (i = 0; i < length; i++, offset++) { TransactionId *xactptr; pageno = MXOffsetToMemberPage(offset); entryno = MXOffsetToMemberEntry(offset); if (pageno != prev_pageno) { slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } xactptr = (TransactionId *) MultiXactMemberCtl->shared->page_buffer[slotno]; xactptr += entryno; if (!TransactionIdIsValid(*xactptr)) { /* Corner case 3: we must be looking at unused slot zero */ Assert(offset == 0); continue; } ptr[truelength++] = *xactptr; } LWLockRelease(MultiXactMemberControlLock); /* * Copy the result into the local cache. */ mXactCachePut(multi, truelength, ptr); debug_elog3(DEBUG2, "GetMembers: no cache for %s", mxid_to_string(multi, truelength, ptr)); return truelength;}/* * mXactCacheGetBySet * returns a MultiXactId from the cache based on the set of * TransactionIds that compose it, or InvalidMultiXactId if * none matches. * * This is helpful, for example, if two transactions want to lock a huge * table. By using the cache, the second will use the same MultiXactId * for the majority of tuples, thus keeping MultiXactId usage low (saving * both I/O and wraparound issues). * * NB: the passed xids[] array will be sorted in-place. */static MultiXactIdmXactCacheGetBySet(int nxids, TransactionId *xids){ mXactCacheEnt *entry; debug_elog3(DEBUG2, "CacheGet: looking for %s", mxid_to_string(InvalidMultiXactId, nxids, xids)); /* sort the array so comparison is easy */ qsort(xids, nxids, sizeof(TransactionId), xidComparator); for (entry = MXactCache; entry != NULL; entry = entry->next) { if (entry->nxids != nxids) continue; /* We assume the cache entries are sorted */ if (memcmp(xids, entry->xids, nxids * sizeof(TransactionId)) == 0) { debug_elog3(DEBUG2, "CacheGet: found %u", entry->multi); return entry->multi; } } debug_elog2(DEBUG2, "CacheGet: not found :-("); return InvalidMultiXactId;}/* * mXactCacheGetById * returns the composing TransactionId set from the cache for a * given MultiXactId, if present. * * If successful, *xids is set to the address of a palloc'd copy of the * TransactionId set. Return value is number of members, or -1 on failure. */static intmXactCacheGetById(MultiXactId multi, TransactionId **xids){ mXactCacheEnt *entry; debug_elog3(DEBUG2, "CacheGet: looking for %u", multi); for (entry = MXactCache; entry != NULL; entry = entry->next) { if (entry->multi == multi) { TransactionId *ptr; Size size; size = sizeof(TransactionId) * entry->nxids; ptr = (TransactionId *) palloc(size); *xids = ptr; memcpy(ptr, entry->xids, size); debug_elog3(DEBUG2, "CacheGet: found %s", mxid_to_string(multi, entry->nxids, entry->xids)); return entry->nxids; } } debug_elog2(DEBUG2, "CacheGet: not found"); return -1;}/* * mXactCachePut * Add a new MultiXactId and its composing set into the local cache. */static voidmXactCachePut(MultiXactId multi, int nxids, TransactionId *xids){ mXactCacheEnt *entry; debug_elog3(DEBUG2, "CachePut: storing %s", mxid_to_string(multi, nxids, xids)); if (MXactContext == NULL) { /* The cache only lives as long as the current transaction */ debug_elog2(DEBUG2, "CachePut: initializing memory context"); MXactContext = AllocSetContextCreate(TopTransactionContext, "MultiXact Cache Context", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); } entry = (mXactCacheEnt *) MemoryContextAlloc(MXactContext, offsetof(mXactCacheEnt, xids) + nxids * sizeof(TransactionId)); entry->multi = multi; entry->nxids = nxids; memcpy(entry->xids, xids, nxids * sizeof(TransactionId)); /* mXactCacheGetBySet assumes the entries are sorted, so sort them */ qsort(entry->xids, nxids, sizeof(TransactionId), xidComparator); entry->next = MXactCache; MXactCache = entry;}/* * xidComparator * qsort comparison function for XIDs * * We don't need to use wraparound comparison for XIDs, and indeed must * not do so since that does not respect the triangle inequality! Any * old sort order will do. */static intxidComparator(const void *arg1, const void *arg2){ TransactionId xid1 = *(const TransactionId *) arg1; TransactionId xid2 = *(const TransactionId *) arg2; if (xid1 > xid2) return 1; if (xid1 < xid2) return -1; return 0;}#ifdef MULTIXACT_DEBUGstatic char *mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids){ char *str = palloc(15 * (nxids + 1) + 4); int i; snprintf(str, 47, "%u %d[%u", multi, nxids, xids[0]); for (i = 1; i < nxids; i++) snprintf(str + strlen(str), 17, ", %u", xids[i]); strcat(str, "]"); return str;}#endif/* * AtEOXact_MultiXact * Handle transaction end for MultiXact * * This is called at top transaction commit or abort (we don't care which). */voidAtEOXact_MultiXact(void){ /* * Reset our OldestMemberMXactId and OldestVisibleMXactId values, both of * which should only be valid while within a transaction. * * We assume that storing a MultiXactId is atomic and so we need not take * MultiXactGenLock to do this. */ OldestMemberMXactId[MyBackendId] = InvalidMultiXactId; OldestVisibleMXactId[MyBackendId] = InvalidMultiXactId; /* * Discard the local MultiXactId cache. Since MXactContext was created as * a child of TopTransactionContext, we needn't delete it explicitly. */ MXactContext = NULL; MXactCache = NULL;}/* * Initialization of shared memory for MultiXact. We use two SLRU areas, * thus double memory. Also, reserve space for the shared MultiXactState * struct and the per-backend MultiXactId arrays (two of those, too). */SizeMultiXactShmemSize(void){ Size size;#define SHARED_MULTIXACT_STATE_SIZE \ add_size(sizeof(MultiXactStateData), \ mul_size(sizeof(MultiXactId) * 2, MaxBackends)) size = SHARED_MULTIXACT_STATE_SIZE; size = add_size(size, SimpleLruShmemSize(NUM_MXACTOFFSET_BUFFERS, 0)); size = add_size(size, SimpleLruShmemSize(NUM_MXACTMEMBER_BUFFERS, 0)); return size;}voidMultiXactShmemInit(void){ bool found; debug_elog2(DEBUG2, "Shared Memory Init for MultiXact"); MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes; MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes; SimpleLruInit(MultiXactOffsetCtl, "MultiXactOffset Ctl", NUM_MXACTOFFSET_BUFFERS, 0, MultiXactOffsetControlLock, "pg_multixact/offsets"); SimpleLruInit(MultiXactMemberCtl, "MultiXactMember Ctl", NUM_MXACTMEMBER_BUFFERS, 0, MultiXactMemberControlLock, "pg_multixact/members"); /* Initialize our shared state struct */ MultiXactState = ShmemInitStruct("Shared MultiXact State", SHARED_MULTIXACT_STATE_SIZE, &found); if (!IsUnderPostmaster) { Assert(!found); /* Make sure we zero out the per-backend state */ MemSet(MultiXactState, 0, SHARED_MULTIXACT_STATE_SIZE); } else Assert(found); /* * Set up array pointers. Note that perBackendXactIds[0] is wasted space * since we only use indexes 1..MaxBackends in each array. */ OldestMemberMXactId = MultiXactState->perBackendXactIds; OldestVisibleMXactId = OldestMemberMXactId + MaxBackends;}/* * This func must be called ONCE on system install. It creates the initial * MultiXact segments. (The MultiXacts directories are assumed to have been * created by initdb, and MultiXactShmemInit must have been called already.) */voidBootStrapMultiXact(void){ int slotno; LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE); /* Create and zero the first page of the offsets log */ slotno = ZeroMultiXactOffsetPage(0, false); /* Make sure it's written out */ SimpleLruWritePage(MultiXactOffsetCtl, slotno, NULL); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); LWLockRelease(MultiXactOffsetControlLock); LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE); /* Create and zero the first page of the members log */ slotno = ZeroMultiXactMemberPage(0, false); /* Make sure it's written out */ SimpleLruWritePage(MultiXactMemberCtl, slotno, NULL); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); LWLockRelease(MultiXactMemberControlLock);}/* * Initialize (or reinitialize) a page of MultiXactOffset to zeroes. * If writeXlog is TRUE, also emit an XLOG record saying we did this. * * The page is not actually written, just set up in shared memory. * The slot number of the new page is returned. * * Control lock must be held at entry, and will be held at exit. */static intZeroMultiXactOffsetPage(int pageno, bool writeXlog){ int slotno; slotno = SimpleLruZeroPage(MultiXactOffsetCtl, pageno); if (writeXlog) WriteMZeroPageXlogRec(pageno, XLOG_MULTIXACT_ZERO_OFF_PAGE); return slotno;}/* * Ditto, for MultiXactMember */static intZeroMultiXactMemberPage(int pageno, bool writeXlog){ int slotno; slotno = SimpleLruZeroPage(MultiXactMemberCtl, pageno); if (writeXlog) WriteMZeroPageXlogRec(pageno, XLOG_MULTIXACT_ZERO_MEM_PAGE); return slotno;}/* * This must be called ONCE during postmaster or standalone-backend startup. * * StartupXLOG has already established nextMXact/nextOffset by calling * MultiXactSetNextMXact and/or MultiXactAdvanceNextMXact. Note that we * may already have replayed WAL data into the SLRU files. * * We don't need any locks here, really; the SLRU locks are taken * only because slru.c expects to be called with locks held. */voidStartupMultiXact(void){ MultiXactId multi = MultiXactState->nextMXact; MultiXactOffset offset = MultiXactState->nextOffset; int pageno; int entryno; /* Clean up offsets state */ LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE); /* * Initialize our idea of the latest page number. */ pageno = MultiXactIdToOffsetPage(multi); MultiXactOffsetCtl->shared->latest_page_number = pageno; /* * Zero out the remainder of the current offsets page. See notes in * StartupCLOG() for motivation. */ entryno = MultiXactIdToOffsetEntry(multi); if (entryno != 0) { int slotno; MultiXactOffset *offptr; slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset))); MultiXactOffsetCtl->shared->page_dirty[slotno] = true; } LWLockRelease(MultiXactOffsetControlLock); /* And the same for members */ LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE); /* * Initialize our idea of the latest page number. */ pageno = MXOffsetToMemberPage(offset); MultiXactMemberCtl->shared->latest_page_number = pageno; /* * Zero out the remainder of the current members page. See notes in * StartupCLOG() for motivation. */ entryno = MXOffsetToMemberEntry(offset); if (entryno != 0) { int slotno; TransactionId *xidptr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -