📄 procarray.c
字号:
* current xact will not change after it takes the snapshot. * * Note that only top-level XIDs are included in the snapshot. We can * still apply the xmin and xmax limits to subtransaction XIDs, but we * need to work a bit harder to see if XIDs in [xmin..xmax) are running. * * We also update the following backend-global variables: * TransactionXmin: the oldest xmin of any snapshot in use in the * current transaction (this is the same as MyProc->xmin). This * is just the xmin computed for the first, serializable snapshot. * RecentXmin: the xmin computed for the most recent snapshot. XIDs * older than this are known not running any more. * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all * running transactions). This is the same computation done by * GetOldestXmin(TRUE). *---------- */SnapshotGetSnapshotData(Snapshot snapshot, bool serializable){ ProcArrayStruct *arrayP = procArray; TransactionId xmin; TransactionId xmax; TransactionId globalxmin; int index; int count = 0; Assert(snapshot != NULL); /* Serializable snapshot must be computed before any other... */ Assert(serializable ? !TransactionIdIsValid(MyProc->xmin) : TransactionIdIsValid(MyProc->xmin)); /* * Allocating space for maxProcs xids is usually overkill; numProcs would * be sufficient. But it seems better to do the malloc while not holding * the lock, so we can't look at numProcs. * * This does open a possibility for avoiding repeated malloc/free: since * maxProcs does not change at runtime, we can simply reuse the previous * xip array if any. (This relies on the fact that all callers pass * static SnapshotData structs.) */ if (snapshot->xip == NULL) { /* * First call for this snapshot */ snapshot->xip = (TransactionId *) malloc(arrayP->maxProcs * sizeof(TransactionId)); if (snapshot->xip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } globalxmin = xmin = GetTopTransactionId(); /* * If we are going to set MyProc->xmin then we'd better get exclusive * lock; if not, this is a read-only operation so it can be shared. */ LWLockAcquire(ProcArrayLock, serializable ? LW_EXCLUSIVE : LW_SHARED); /*-------------------- * Unfortunately, we have to call ReadNewTransactionId() after acquiring * ProcArrayLock above. It's not good because ReadNewTransactionId() does * LWLockAcquire(XidGenLock), but *necessary*. We need to be sure that * no transactions exit the set of currently-running transactions * between the time we fetch xmax and the time we finish building our * snapshot. Otherwise we could have a situation like this: * * 1. Tx Old is running (in Read Committed mode). * 2. Tx S reads new transaction ID into xmax, then * is swapped out before acquiring ProcArrayLock. * 3. Tx New gets new transaction ID (>= S' xmax), * makes changes and commits. * 4. Tx Old changes some row R changed by Tx New and commits. * 5. Tx S finishes getting its snapshot data. It sees Tx Old as * done, but sees Tx New as still running (since New >= xmax). * * Now S will see R changed by both Tx Old and Tx New, *but* does not * see other changes made by Tx New. If S is supposed to be in * Serializable mode, this is wrong. * * By locking ProcArrayLock before we read xmax, we ensure that TX Old * cannot exit the set of running transactions seen by Tx S. Therefore * both Old and New will be seen as still running => no inconsistency. *-------------------- */ xmax = ReadNewTransactionId(); for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = proc->xid; /* * Ignore my own proc (dealt with my xid above), procs not running a * transaction, and xacts started since we read the next transaction * ID. There's no need to store XIDs above what we got from * ReadNewTransactionId, since we'll treat them as running anyway. We * also assume that such xacts can't compute an xmin older than ours, * so they needn't be considered in computing globalxmin. */ if (proc == MyProc || !TransactionIdIsNormal(xid) || TransactionIdFollowsOrEquals(xid, xmax)) continue; if (TransactionIdPrecedes(xid, xmin)) xmin = xid; snapshot->xip[count] = xid; count++; /* Update globalxmin to be the smallest valid xmin */ xid = proc->xmin; if (TransactionIdIsNormal(xid)) if (TransactionIdPrecedes(xid, globalxmin)) globalxmin = xid; } if (serializable) MyProc->xmin = TransactionXmin = xmin; LWLockRelease(ProcArrayLock); /* * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. */ if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; /* Update global variables too */ RecentGlobalXmin = globalxmin; RecentXmin = xmin; snapshot->xmin = xmin; snapshot->xmax = xmax; snapshot->xcnt = count; snapshot->curcid = GetCurrentCommandId(); return snapshot;}/* * DatabaseHasActiveBackends -- are there any backends running in the given DB * * If 'ignoreMyself' is TRUE, ignore this particular backend while checking * for backends in the target database. * * This function is used to interlock DROP DATABASE against there being * any active backends in the target DB --- dropping the DB while active * backends remain would be a Bad Thing. Note that we cannot detect here * the possibility of a newly-started backend that is trying to connect * to the doomed database, so additional interlocking is needed during * backend startup. */boolDatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself){ bool result = false; ProcArrayStruct *arrayP = procArray; int index; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; if (proc->databaseId == databaseId) { if (ignoreMyself && proc == MyProc) continue; result = true; break; } } LWLockRelease(ProcArrayLock); return result;}/* * BackendPidGetProc -- get a backend's PGPROC given its PID * * Returns NULL if not found. Note that it is up to the caller to be * sure that the question remains meaningful for long enough for the * answer to be used ... */PGPROC *BackendPidGetProc(int pid){ PGPROC *result = NULL; ProcArrayStruct *arrayP = procArray; int index; if (pid == 0) /* never match dummy PGPROCs */ return NULL; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; if (proc->pid == pid) { result = proc; break; } } LWLockRelease(ProcArrayLock); return result;}/* * BackendXidGetPid -- get a backend's pid given its XID * * Returns 0 if not found or it's a prepared transaction. Note that * it is up to the caller to be sure that the question remains * meaningful for long enough for the answer to be used ... * * Only main transaction Ids are considered. This function is mainly * useful for determining what backend owns a lock. */intBackendXidGetPid(TransactionId xid){ int result = 0; ProcArrayStruct *arrayP = procArray; int index; if (xid == InvalidTransactionId) /* never match invalid xid */ return 0; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; if (proc->xid == xid) { result = proc->pid; break; } } LWLockRelease(ProcArrayLock); return result;}/* * IsBackendPid -- is a given pid a running backend */boolIsBackendPid(int pid){ return (BackendPidGetProc(pid) != NULL);}/* * CountActiveBackends --- count backends (other than myself) that are in * active transactions. This is used as a heuristic to decide if * a pre-XLOG-flush delay is worthwhile during commit. * * Do not count backends that are blocked waiting for locks, since they are * not going to get to run until someone else commits. */intCountActiveBackends(void){ ProcArrayStruct *arrayP = procArray; int count = 0; int index; /* * Note: for speed, we don't acquire ProcArrayLock. This is a little bit * bogus, but since we are only testing fields for zero or nonzero, it * should be OK. The result is only used for heuristic purposes anyway... */ for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; if (proc == MyProc) continue; /* do not count myself */ if (proc->pid == 0) continue; /* do not count prepared xacts */ if (proc->xid == InvalidTransactionId) continue; /* do not count if not in a transaction */ if (proc->waitLock != NULL) continue; /* do not count if blocked on a lock */ count++; } return count;}/* * CountDBBackends --- count backends that are using specified database */intCountDBBackends(Oid databaseid){ ProcArrayStruct *arrayP = procArray; int count = 0; int index; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; if (proc->pid == 0) continue; /* do not count prepared xacts */ if (proc->databaseId == databaseid) count++; } LWLockRelease(ProcArrayLock); return count;}/* * CountUserBackends --- count backends that are used by specified user */intCountUserBackends(Oid roleid){ ProcArrayStruct *arrayP = procArray; int count = 0; int index; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; if (proc->pid == 0) continue; /* do not count prepared xacts */ if (proc->roleId == roleid) count++; } LWLockRelease(ProcArrayLock); return count;}#define XidCacheRemove(i) \ do { \ MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \ MyProc->subxids.nxids--; \ } while (0)/* * XidCacheRemoveRunningXids * * Remove a bunch of TransactionIds from the list of known-running * subtransactions for my backend. Both the specified xid and those in * the xids[] array (of length nxids) are removed from the subxids cache. */voidXidCacheRemoveRunningXids(TransactionId xid, int nxids, TransactionId *xids){ int i, j; Assert(!TransactionIdEquals(xid, InvalidTransactionId)); /* * We must hold ProcArrayLock exclusively in order to remove transactions * from the PGPROC array. (See notes in GetSnapshotData.) It's possible * this could be relaxed since we know this routine is only used to abort * subtransactions, but pending closer analysis we'd best be conservative. */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); /* * Under normal circumstances xid and xids[] will be in increasing order, * as will be the entries in subxids. Scan backwards to avoid O(N^2) * behavior when removing a lot of xids. */ for (i = nxids - 1; i >= 0; i--) { TransactionId anxid = xids[i]; for (j = MyProc->subxids.nxids - 1; j >= 0; j--) { if (TransactionIdEquals(MyProc->subxids.xids[j], anxid)) { XidCacheRemove(j); break; } } /* * Ordinarily we should have found it, unless the cache has * overflowed. However it's also possible for this routine to be * invoked multiple times for the same subtransaction, in case of an * error during AbortSubTransaction. So instead of Assert, emit a * debug warning. */ if (j < 0 && !MyProc->subxids.overflowed) elog(WARNING, "did not find subXID %u in MyProc", anxid); } for (j = MyProc->subxids.nxids - 1; j >= 0; j--) { if (TransactionIdEquals(MyProc->subxids.xids[j], xid)) { XidCacheRemove(j); break; } } /* Ordinarily we should have found it, unless the cache has overflowed */ if (j < 0 && !MyProc->subxids.overflowed) elog(WARNING, "did not find subXID %u in MyProc", xid); LWLockRelease(ProcArrayLock);}#ifdef XIDCACHE_DEBUG/* * Print stats about effectiveness of XID cache */static voidDisplayXidCache(void){ fprintf(stderr, "XidCache: xmin: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n", xc_by_recent_xmin, xc_by_main_xid, xc_by_child_xid, xc_slow_answer);}#endif /* XIDCACHE_DEBUG */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -