📄 procarray.c
字号:
} LWLockRelease(ProcArrayLock); /* * If none of the relevant caches overflowed, we know the Xid is not * running without looking at pg_subtrans. */ if (nxids == 0) { xc_no_overflow_inc(); return false; } /* * Step 3: have to check pg_subtrans. * * At this point, we know it's either a subtransaction of one of the Xids * in xids[], or it's not running. If it's an already-failed * subtransaction, we want to say "not running" even though its parent may * still be running. So first, check pg_clog to see if it's been aborted. */ xc_slow_answer_inc(); if (TransactionIdDidAbort(xid)) return false; /* * It isn't aborted, so check whether the transaction tree it belongs to * is still running (or, more precisely, whether it was running when we * held ProcArrayLock). */ topxid = SubTransGetTopmostTransaction(xid); Assert(TransactionIdIsValid(topxid)); if (!TransactionIdEquals(topxid, xid)) { for (i = 0; i < nxids; i++) { if (TransactionIdEquals(xids[i], topxid)) return true; } } return false;}/* * TransactionIdIsActive -- is xid the top-level XID of an active backend? * * This differs from TransactionIdIsInProgress in that it ignores prepared * transactions. Also, we ignore subtransactions since that's not needed * for current uses. */boolTransactionIdIsActive(TransactionId xid){ bool result = false; ProcArrayStruct *arrayP = procArray; int i; /* * Don't bother checking a transaction older than RecentXmin; it could not * possibly still be running. */ if (TransactionIdPrecedes(xid, RecentXmin)) return false; LWLockAcquire(ProcArrayLock, LW_SHARED); for (i = 0; i < arrayP->numProcs; i++) { volatile PGPROC *proc = arrayP->procs[i]; /* Fetch xid just once - see GetNewTransactionId */ TransactionId pxid = proc->xid; if (!TransactionIdIsValid(pxid)) continue; if (proc->pid == 0) continue; /* ignore prepared transactions */ if (TransactionIdEquals(pxid, xid)) { result = true; break; } } LWLockRelease(ProcArrayLock); return result;}/* * GetOldestXmin -- returns oldest transaction that was running * when any current transaction was started. * * If allDbs is TRUE then all backends are considered; if allDbs is FALSE * then only backends running in my own database are considered. * * If ignoreVacuum is TRUE then backends with the PROC_IN_VACUUM flag set are * ignored. * * This is used by VACUUM to decide which deleted tuples must be preserved * in a table. allDbs = TRUE is needed for shared relations, but allDbs = * FALSE is sufficient for non-shared relations, since only backends in my * own database could ever see the tuples in them. Also, we can ignore * concurrently running lazy VACUUMs because (a) they must be working on other * tables, and (b) they don't need to do snapshot-based lookups. * * This is also used to determine where to truncate pg_subtrans. allDbs * must be TRUE for that case, and ignoreVacuum FALSE. * * Note: we include all currently running xids in the set of considered xids. * This ensures that if a just-started xact has not yet set its snapshot, * when it does set the snapshot it cannot set xmin less than what we compute. * See notes in src/backend/access/transam/README. */TransactionIdGetOldestXmin(bool allDbs, bool ignoreVacuum){ ProcArrayStruct *arrayP = procArray; TransactionId result; int index; LWLockAcquire(ProcArrayLock, LW_SHARED); /* * We initialize the MIN() calculation with latestCompletedXid + 1. This * is a lower bound for the XIDs that might appear in the ProcArray later, * and so protects us against overestimating the result due to future * additions. */ result = ShmemVariableCache->latestCompletedXid; Assert(TransactionIdIsNormal(result)); TransactionIdAdvance(result); for (index = 0; index < arrayP->numProcs; index++) { volatile PGPROC *proc = arrayP->procs[index]; if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM)) continue; if (allDbs || proc->databaseId == MyDatabaseId) { /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = proc->xid; /* First consider the transaction's own Xid, if any */ if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; /* * Also consider the transaction's Xmin, if set. * * We must check both Xid and Xmin because a transaction might * have an Xmin but not (yet) an Xid; conversely, if it has an * Xid, that could determine some not-yet-set Xmin. */ xid = proc->xmin; /* Fetch just once */ if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; } } LWLockRelease(ProcArrayLock); return result;}/* * GetSnapshotData -- returns information about running transactions. * * The returned snapshot includes xmin (lowest still-running xact ID), * xmax (highest completed xact ID + 1), and a list of running xact IDs * in the range xmin <= xid < xmax. It is used as follows: * All xact IDs < xmin are considered finished. * All xact IDs >= xmax are considered still running. * For an xact ID xmin <= xid < xmax, consult list to see whether * it is considered running or not. * This ensures that the set of transactions seen as "running" by the * current xact will not change after it takes the snapshot. * * All running top-level XIDs are included in the snapshot, except for lazy * VACUUM processes. We also try to include running subtransaction XIDs, * but since PGPROC has only a limited cache area for subxact XIDs, full * information may not be available. If we find any overflowed subxid arrays, * we have to mark the snapshot's subxid data as overflowed, and extra work * will need to be done to determine what's running (see XidInMVCCSnapshot() * in tqual.c). * * We also update the following backend-global variables: * TransactionXmin: the oldest xmin of any snapshot in use in the * current transaction (this is the same as MyProc->xmin). This * is just the xmin computed for the first, serializable snapshot. * RecentXmin: the xmin computed for the most recent snapshot. XIDs * older than this are known not running any more. * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all * running transactions, except those running LAZY VACUUM). This is * the same computation done by GetOldestXmin(true, true). */SnapshotGetSnapshotData(Snapshot snapshot, bool serializable){ ProcArrayStruct *arrayP = procArray; TransactionId xmin; TransactionId xmax; TransactionId globalxmin; int index; int count = 0; int subcount = 0; Assert(snapshot != NULL); /* Serializable snapshot must be computed before any other... */ Assert(serializable ? !TransactionIdIsValid(MyProc->xmin) : TransactionIdIsValid(MyProc->xmin)); /* * Allocating space for maxProcs xids is usually overkill; numProcs would * be sufficient. But it seems better to do the malloc while not holding * the lock, so we can't look at numProcs. Likewise, we allocate much * more subxip storage than is probably needed. * * This does open a possibility for avoiding repeated malloc/free: since * maxProcs does not change at runtime, we can simply reuse the previous * xip arrays if any. (This relies on the fact that all callers pass * static SnapshotData structs.) */ if (snapshot->xip == NULL) { /* * First call for this snapshot */ snapshot->xip = (TransactionId *) malloc(arrayP->maxProcs * sizeof(TransactionId)); if (snapshot->xip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); Assert(snapshot->subxip == NULL); snapshot->subxip = (TransactionId *) malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId)); if (snapshot->subxip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } /* * It is sufficient to get shared lock on ProcArrayLock, even if we are * going to set MyProc->xmin. */ LWLockAcquire(ProcArrayLock, LW_SHARED); /* xmax is always latestCompletedXid + 1 */ xmax = ShmemVariableCache->latestCompletedXid; Assert(TransactionIdIsNormal(xmax)); TransactionIdAdvance(xmax); /* initialize xmin calculation with xmax */ globalxmin = xmin = xmax; /* * Spin over procArray checking xid, xmin, and subxids. The goal is to * gather all active xids, find the lowest xmin, and try to record * subxids. */ for (index = 0; index < arrayP->numProcs; index++) { volatile PGPROC *proc = arrayP->procs[index]; TransactionId xid; /* Ignore procs running LAZY VACUUM */ if (proc->vacuumFlags & PROC_IN_VACUUM) continue; /* Update globalxmin to be the smallest valid xmin */ xid = proc->xmin; /* fetch just once */ if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, globalxmin)) globalxmin = xid; /* Fetch xid just once - see GetNewTransactionId */ xid = proc->xid; /* * If the transaction has been assigned an xid < xmax we add it to the * snapshot, and update xmin if necessary. There's no need to store * XIDs >= xmax, since we'll treat them as running anyway. We don't * bother to examine their subxids either. * * We don't include our own XID (if any) in the snapshot, but we must * include it into xmin. */ if (TransactionIdIsNormal(xid)) { if (TransactionIdFollowsOrEquals(xid, xmax)) continue; if (proc != MyProc) snapshot->xip[count++] = xid; if (TransactionIdPrecedes(xid, xmin)) xmin = xid; } /* * Save subtransaction XIDs if possible (if we've already overflowed, * there's no point). Note that the subxact XIDs must be later than * their parent, so no need to check them against xmin. We could * filter against xmax, but it seems better not to do that much work * while holding the ProcArrayLock. * * The other backend can add more subxids concurrently, but cannot * remove any. Hence it's important to fetch nxids just once. Should * be safe to use memcpy, though. (We needn't worry about missing any * xids added concurrently, because they must postdate xmax.) * * Again, our own XIDs are not included in the snapshot. */ if (subcount >= 0 && proc != MyProc) { if (proc->subxids.overflowed) subcount = -1; /* overflowed */ else { int nxids = proc->subxids.nxids; if (nxids > 0) { memcpy(snapshot->subxip + subcount, (void *) proc->subxids.xids, nxids * sizeof(TransactionId)); subcount += nxids; } } } } if (serializable) MyProc->xmin = TransactionXmin = xmin; LWLockRelease(ProcArrayLock); /* * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. */ if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; /* Update global variables too */ RecentGlobalXmin = globalxmin; RecentXmin = xmin; snapshot->xmin = xmin; snapshot->xmax = xmax; snapshot->xcnt = count; snapshot->subxcnt = subcount; snapshot->curcid = GetCurrentCommandId(false); return snapshot;}/* * GetTransactionsInCommit -- Get the XIDs of transactions that are committing * * Constructs an array of XIDs of transactions that are currently in commit * critical sections, as shown by having inCommit set in their PGPROC entries. * * *xids_p is set to a palloc'd array that should be freed by the caller. * The return value is the number of valid entries. * * Note that because backends set or clear inCommit without holding any lock, * the result is somewhat indeterminate, but we don't really care. Even in * a multiprocessor with delayed writes to shared memory, it should be certain * that setting of inCommit will propagate to shared memory when the backend * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if * it's already inserted its commit record. Whether it takes a little while * for clearing of inCommit to propagate is unimportant for correctness. */intGetTransactionsInCommit(TransactionId **xids_p){ ProcArrayStruct *arrayP = procArray; TransactionId *xids; int nxids; int index; xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId)); nxids = 0; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { volatile PGPROC *proc = arrayP->procs[index]; /* Fetch xid just once - see GetNewTransactionId */ TransactionId pxid = proc->xid; if (proc->inCommit && TransactionIdIsValid(pxid)) xids[nxids++] = pxid; } LWLockRelease(ProcArrayLock); *xids_p = xids; return nxids;}/* * HaveTransactionsInCommit -- Are any of the specified XIDs in commit? * * This is used with the results of GetTransactionsInCommit to see if any * of the specified XIDs are still in their commit critical sections. * * Note: this is O(N^2) in the number of xacts that are/were in commit, but * those numbers should be small enough for it not to be a problem. */boolHaveTransactionsInCommit(TransactionId *xids, int nxids){ bool result = false; ProcArrayStruct *arrayP = procArray; int index; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { volatile PGPROC *proc = arrayP->procs[index]; /* Fetch xid just once - see GetNewTransactionId */ TransactionId pxid = proc->xid; if (proc->inCommit && TransactionIdIsValid(pxid)) { int i; for (i = 0; i < nxids; i++) { if (xids[i] == pxid)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -