📄 lock.c
字号:
elog(PANIC, "too many conflicting locks found"); return vxids;}/* * AtPrepare_Locks * Do the preparatory work for a PREPARE: make 2PC state file records * for all locks currently held. * * Non-transactional locks are ignored, as are VXID locks. * * There are some special cases that we error out on: we can't be holding * any session locks (should be OK since only VACUUM uses those) and we * can't be holding any locks on temporary objects (since that would mess * up the current backend if it tries to exit before the prepared xact is * committed). */voidAtPrepare_Locks(void){ HASH_SEQ_STATUS status; LOCALLOCK *locallock; /* * We don't need to touch shared memory for this --- all the necessary * state information is in the locallock table. */ hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { TwoPhaseLockRecord record; LOCALLOCKOWNER *lockOwners = locallock->lockOwners; int i; /* Ignore nontransactional locks */ if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional) continue; /* * Ignore VXID locks. We don't want those to be held by prepared * transactions, since they aren't meaningful after a restart. */ if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) continue; /* Ignore it if we don't actually hold the lock */ if (locallock->nLocks <= 0) continue; /* Scan to verify there are no session locks */ for (i = locallock->numLockOwners - 1; i >= 0; i--) { /* elog not ereport since this should not happen */ if (lockOwners[i].owner == NULL) elog(ERROR, "cannot PREPARE when session locks exist"); } /* * Create a 2PC record. */ memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG)); record.lockmode = locallock->tag.mode; RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0, &record, sizeof(TwoPhaseLockRecord)); }}/* * PostPrepare_Locks * Clean up after successful PREPARE * * Here, we want to transfer ownership of our locks to a dummy PGPROC * that's now associated with the prepared transaction, and we want to * clean out the corresponding entries in the LOCALLOCK table. * * Note: by removing the LOCALLOCK entries, we are leaving dangling * pointers in the transaction's resource owner. This is OK at the * moment since resowner.c doesn't try to free locks retail at a toplevel * transaction commit or abort. We could alternatively zero out nLocks * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll, * but that probably costs more cycles. */voidPostPrepare_Locks(TransactionId xid){ PGPROC *newproc = TwoPhaseGetDummyProc(xid); HASH_SEQ_STATUS status; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; bool found; int partition; /* This is a critical section: any error means big trouble */ START_CRIT_SECTION(); /* * First we run through the locallock table and get rid of unwanted * entries, then we scan the process's proclocks and transfer them to the * target proc. * * We do this separately because we may have multiple locallock entries * pointing to the same proclock, and we daren't end up with any dangling * pointers. */ hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { if (locallock->proclock == NULL || locallock->lock == NULL) { /* * We must've run out of shared memory while trying to set up this * lock. Just forget the local entry. */ Assert(locallock->nLocks == 0); RemoveLocalLock(locallock); continue; } /* Ignore nontransactional locks */ if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional) continue; /* Ignore VXID locks */ if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) continue; /* We already checked there are no session locks */ /* Mark the proclock to show we need to release this lockmode */ if (locallock->nLocks > 0) locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); /* And remove the locallock hashtable entry */ RemoveLocalLock(locallock); } /* * Now, scan each lock partition separately. */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLockId partitionLock = FirstLockMgrLock + partition; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); if (!proclock) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); while (proclock) { PROCLOCK *nextplock; LOCKMASK holdMask; PROCLOCK *newproclock; /* Get link first, since we may unlink/delete this proclock */ nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); Assert(proclock->tag.myProc == MyProc); lock = proclock->tag.myLock; /* Ignore nontransactional locks */ if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional) goto next_item; /* Ignore VXID locks */ if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION) goto next_item; PROCLOCK_PRINT("PostPrepare_Locks", proclock); LOCK_PRINT("PostPrepare_Locks", lock, 0); Assert(lock->nRequested >= 0); Assert(lock->nGranted >= 0); Assert(lock->nGranted <= lock->nRequested); Assert((proclock->holdMask & ~lock->grantMask) == 0); /* * Since there were no session locks, we should be releasing all * locks */ if (proclock->releaseMask != proclock->holdMask) elog(PANIC, "we seem to have dropped a bit somewhere"); holdMask = proclock->holdMask; /* * We cannot simply modify proclock->tag.myProc to reassign * ownership of the lock, because that's part of the hash key and * the proclock would then be in the wrong hash chain. So, unlink * and delete the old proclock; create a new one with the right * contents; and link it into place. We do it in this order to be * certain we won't run out of shared memory (the way dynahash.c * works, the deleted object is certain to be available for * reallocation). */ SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); if (!hash_search(LockMethodProcLockHash, (void *) &(proclock->tag), HASH_REMOVE, NULL)) elog(PANIC, "proclock table corrupted"); /* * Create the hash key for the new proclock table. */ proclocktag.myLock = lock; proclocktag.myProc = newproc; newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash, (void *) &proclocktag, HASH_ENTER_NULL, &found); if (!newproclock) ereport(PANIC, /* should not happen */ (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errdetail("Not enough memory for reassigning the prepared transaction's locks."))); /* * If new, initialize the new entry */ if (!found) { newproclock->holdMask = 0; newproclock->releaseMask = 0; /* Add new proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink); SHMQueueInsertBefore(&(newproc->myProcLocks[partition]), &newproclock->procLink); PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock); } else { PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock); Assert((newproclock->holdMask & ~lock->grantMask) == 0); } /* * Pass over the identified lock ownership. */ Assert((newproclock->holdMask & holdMask) == 0); newproclock->holdMask |= holdMask; next_item: proclock = nextplock; } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); } /* loop over partitions */ END_CRIT_SECTION();}/* * Estimate shared-memory space used for lock tables */SizeLockShmemSize(void){ Size size = 0; long max_table_size; /* lock hash table */ max_table_size = NLOCKENTS(); size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK))); /* proclock hash table */ max_table_size *= 2; size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK))); /* * Since NLOCKENTS is only an estimate, add 10% safety margin. */ size = add_size(size, size / 10); return size;}/* * GetLockStatusData - Return a summary of the lock manager's internal * status, for use in a user-level reporting function. * * The return data consists of an array of PROCLOCK objects, with the * associated PGPROC and LOCK objects for each. Note that multiple * copies of the same PGPROC and/or LOCK objects are likely to appear. * It is the caller's responsibility to match up duplicates if wanted. * * The design goal is to hold the LWLocks for as short a time as possible; * thus, this function simply makes a copy of the necessary data and releases * the locks, allowing the caller to contemplate and format the data for as * long as it pleases. */LockData *GetLockStatusData(void){ LockData *data; PROCLOCK *proclock; HASH_SEQ_STATUS seqstat; int els; int el; int i; data = (LockData *) palloc(sizeof(LockData)); /* * Acquire lock on the entire shared lock data structure. We can't * operate one partition at a time if we want to deliver a self-consistent * view of the state. * * Since this is a read-only operation, we take shared instead of * exclusive lock. There's not a whole lot of point to this, because all * the normal operations require exclusive lock, but it doesn't hurt * anything either. It will at least allow two backends to do * GetLockStatusData in parallel. * * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); /* Now we can safely count the number of proclocks */ els = hash_get_num_entries(LockMethodProcLockHash); data->nelements = els; data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els); data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els); data->locks = (LOCK *) palloc(sizeof(LOCK) * els); /* Now scan the tables to copy the data */ hash_seq_init(&seqstat, LockMethodProcLockHash); el = 0; while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat))) { PGPROC *proc = proclock->tag.myProc; LOCK *lock = proclock->tag.myLock; memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK)); memcpy(&(data->procs[el]), proc, sizeof(PGPROC)); memcpy(&(data->locks[el]), lock, sizeof(LOCK)); el++; } /* * And release locks. We do this in reverse order for two reasons: (1) * Anyone else who needs more than one of the locks will be trying to lock * them in increasing order; we don't want to release the other process * until it can get all the locks it needs. (2) This avoids O(N^2) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) LWLockRelease(FirstLockMgrLock + i); Assert(el == data->nelements); return data;}/* Provide the textual name of any lock mode */const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode){ Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods)); Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes); return LockMethods[lockmethodid]->lockModeNames[mode];}#ifdef LOCK_DEBUG/* * Dump all locks in the given proc's myProcLocks lists. * * Caller is responsible for having acquired appropriate LWLocks. */voidDumpLocks(PGPROC *proc){ SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int i; if (proc == NULL) return; if (proc->waitLock) LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); for (i = 0; i < NUM_LOCK_PARTITIONS; i++) { procLocks = &(proc->myProcLocks[i]); proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) { Assert(proclock->tag.myProc == proc); lock = proclock->tag.myLock; PROCLOCK_PRINT("DumpLocks", proclock); LOCK_PRINT("DumpLocks", lock, 0); proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); } }}/* * Dump all lmgr locks. * * Caller is responsible for having acquired appropriate LWLocks. */voidDumpAllLocks(void){ PGPROC *proc; PROCLOCK *proclock; LOCK *lock; HASH_SEQ_STATUS status; proc = MyProc; if (proc && proc->waitLock) LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0); hash_seq_init(&status, LockMethodProcLockHash); while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL) { PROCLOCK_PRINT("DumpAllLocks", proclock); lock = proclock->tag.myLo
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -