📄 sinval.c
字号:
/*------------------------------------------------------------------------- * * sinval.c * POSTGRES shared cache invalidation communication code. * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.61 2003/10/01 21:30:52 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "storage/proc.h"#include "storage/sinval.h"#include "storage/sinvaladt.h"#include "utils/tqual.h"#include "miscadmin.h"/****************************************************************************//* CreateSharedInvalidationState() Initialize SI buffer *//* *//* should be called only by the POSTMASTER *//****************************************************************************/voidCreateSharedInvalidationState(int maxBackends){ /* SInvalLock must be initialized already, during LWLock init */ SIBufferInit(maxBackends);}/* * InitBackendSharedInvalidationState * Initialize new backend's state info in buffer segment. */voidInitBackendSharedInvalidationState(void){ int flag; LWLockAcquire(SInvalLock, LW_EXCLUSIVE); flag = SIBackendInit(shmInvalBuffer); LWLockRelease(SInvalLock); if (flag < 0) /* unexpected problem */ elog(FATAL, "shared cache invalidation initialization failed"); if (flag == 0) /* expected problem: MaxBackends exceeded */ ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already")));}/* * SendSharedInvalidMessage * Add a shared-cache-invalidation message to the global SI message queue. */voidSendSharedInvalidMessage(SharedInvalidationMessage *msg){ bool insertOK; LWLockAcquire(SInvalLock, LW_EXCLUSIVE); insertOK = SIInsertDataEntry(shmInvalBuffer, msg); LWLockRelease(SInvalLock); if (!insertOK) elog(DEBUG4, "SI buffer overflow");}/* * ReceiveSharedInvalidMessages * Process shared-cache-invalidation messages waiting for this backend * * NOTE: it is entirely possible for this routine to be invoked recursively * as a consequence of processing inside the invalFunction or resetFunction. * Hence, we must be holding no SI resources when we call them. The only * bad side-effect is that SIDelExpiredDataEntries might be called extra * times on the way out of a nested call. */voidReceiveSharedInvalidMessages( void (*invalFunction) (SharedInvalidationMessage *msg), void (*resetFunction) (void)){ SharedInvalidationMessage data; int getResult; bool gotMessage = false; for (;;) { /* * We can run SIGetDataEntry in parallel with other backends * running SIGetDataEntry for themselves, since each instance will * modify only fields of its own backend's ProcState, and no * instance will look at fields of other backends' ProcStates. We * express this by grabbing SInvalLock in shared mode. Note that * this is not exactly the normal (read-only) interpretation of a * shared lock! Look closely at the interactions before allowing * SInvalLock to be grabbed in shared mode for any other reason! * * The routines later in this file that use shared mode are okay with * this, because they aren't looking at the ProcState fields * associated with SI message transfer; they only use the * ProcState array as an easy way to find all the PGPROC * structures. */ LWLockAcquire(SInvalLock, LW_SHARED); getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data); LWLockRelease(SInvalLock); if (getResult == 0) break; /* nothing more to do */ if (getResult < 0) { /* got a reset message */ elog(DEBUG4, "cache state reset"); resetFunction(); } else { /* got a normal data message */ invalFunction(&data); } gotMessage = true; } /* If we got any messages, try to release dead messages */ if (gotMessage) { LWLockAcquire(SInvalLock, LW_EXCLUSIVE); SIDelExpiredDataEntries(shmInvalBuffer); LWLockRelease(SInvalLock); }}/****************************************************************************//* Functions that need to scan the PGPROC structures of all running backends. *//* It's a bit strange to keep these in sinval.c, since they don't have any *//* direct relationship to shared-cache invalidation. But the procState *//* array in the SI segment is the only place in the system where we have *//* an array of per-backend data, so it is the most convenient place to keep *//* pointers to the backends' PGPROC structures. We used to implement these *//* functions with a slow, ugly search through the ShmemIndex hash table --- *//* now they are simple loops over the SI ProcState array. *//****************************************************************************//* * DatabaseHasActiveBackends -- are there any backends running in the given DB * * If 'ignoreMyself' is TRUE, ignore this particular backend while checking * for backends in the target database. * * This function is used to interlock DROP DATABASE against there being * any active backends in the target DB --- dropping the DB while active * backends remain would be a Bad Thing. Note that we cannot detect here * the possibility of a newly-started backend that is trying to connect * to the doomed database, so additional interlocking is needed during * backend startup. */boolDatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself){ bool result = false; SISeg *segP = shmInvalBuffer; ProcState *stateP = segP->procState; int index; LWLockAcquire(SInvalLock, LW_SHARED); for (index = 0; index < segP->lastBackend; index++) { SHMEM_OFFSET pOffset = stateP[index].procStruct; if (pOffset != INVALID_OFFSET) { PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset); if (proc->databaseId == databaseId) { if (ignoreMyself && proc == MyProc) continue; result = true; break; } } } LWLockRelease(SInvalLock); return result;}/* * TransactionIdIsInProgress -- is given transaction running by some backend */boolTransactionIdIsInProgress(TransactionId xid){ bool result = false; SISeg *segP = shmInvalBuffer; ProcState *stateP = segP->procState; int index; LWLockAcquire(SInvalLock, LW_SHARED); for (index = 0; index < segP->lastBackend; index++) { SHMEM_OFFSET pOffset = stateP[index].procStruct; if (pOffset != INVALID_OFFSET) { PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset); /* Fetch xid just once - see GetNewTransactionId */ TransactionId pxid = proc->xid; if (TransactionIdEquals(pxid, xid)) { result = true; break; } } } LWLockRelease(SInvalLock); return result;}/* * GetOldestXmin -- returns oldest transaction that was running * when any current transaction was started. * * If allDbs is TRUE then all backends are considered; if allDbs is FALSE * then only backends running in my own database are considered. * * This is used by VACUUM to decide which deleted tuples must be preserved * in a table. allDbs = TRUE is needed for shared relations, but allDbs = * FALSE is sufficient for non-shared relations, since only backends in my * own database could ever see the tuples in them. * * Note: we include the currently running xids in the set of considered xids. * This ensures that if a just-started xact has not yet set its snapshot, * when it does set the snapshot it cannot set xmin less than what we compute. */TransactionIdGetOldestXmin(bool allDbs){ SISeg *segP = shmInvalBuffer; ProcState *stateP = segP->procState; TransactionId result; int index; result = GetCurrentTransactionId(); LWLockAcquire(SInvalLock, LW_SHARED); for (index = 0; index < segP->lastBackend; index++) { SHMEM_OFFSET pOffset = stateP[index].procStruct; if (pOffset != INVALID_OFFSET) { PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset); if (allDbs || proc->databaseId == MyDatabaseId) { /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = proc->xid; if (TransactionIdIsNormal(xid)) { if (TransactionIdPrecedes(xid, result)) result = xid; xid = proc->xmin; if (TransactionIdIsNormal(xid)) if (TransactionIdPrecedes(xid, result)) result = xid; } } } } LWLockRelease(SInvalLock); return result;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -