📄 lock.c
字号:
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held * * In the normal case where dontWait=false and the caller doesn't need to * distinguish a freshly acquired lock from one already taken earlier in * this same transaction, there is no need to examine the return value. * * Side Effects: The lock is acquired and recorded in lock tables. * * NOTE: if we wait for the lock, there is no way to abort the wait * short of aborting the transaction. */LockAcquireResultLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait){ LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; bool found; ResourceOwner owner; uint32 hashcode; uint32 proclock_hashcode; int partition; LWLockId partitionLock; int status; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode);#ifdef LOCK_DEBUG if (LOCK_DEBUG_ENABLED(locktag)) elog(LOG, "LockAcquire: lock [%u,%u] %s", locktag->locktag_field1, locktag->locktag_field2, lockMethodTable->lockModeNames[lockmode]);#endif /* Session locks are never transactional, else check table */ if (!sessionLock && lockMethodTable->transactional) owner = CurrentResourceOwner; else owner = NULL; /* * Find or create a LOCALLOCK entry for this lock and lockmode */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, (void *) &localtag, HASH_ENTER, &found); /* * if it's a new locallock object, initialize it */ if (!found) { locallock->lock = NULL; locallock->proclock = NULL; locallock->hashcode = LockTagHashCode(&(localtag.lock)); locallock->nLocks = 0; locallock->numLockOwners = 0; locallock->maxLockOwners = 8; locallock->lockOwners = NULL; locallock->lockOwners = (LOCALLOCKOWNER *) MemoryContextAlloc(TopMemoryContext, locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); } else { /* Make sure there will be room to remember the lock */ if (locallock->numLockOwners >= locallock->maxLockOwners) { int newsize = locallock->maxLockOwners * 2; locallock->lockOwners = (LOCALLOCKOWNER *) repalloc(locallock->lockOwners, newsize * sizeof(LOCALLOCKOWNER)); locallock->maxLockOwners = newsize; } } /* * If we already hold the lock, we can just increase the count locally. */ if (locallock->nLocks > 0) { GrantLockLocal(locallock, owner); return LOCKACQUIRE_ALREADY_HELD; } /* * Otherwise we've got to mess with the shared lock table. */ hashcode = locallock->hashcode; partition = LockHashPartition(hashcode); partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* * Find or create a lock with this tag. * * Note: if the locallock object already existed, it might have a pointer * to the lock already ... but we probably should not assume that that * pointer is valid, since a lock object with no locks can go away * anytime. */ lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (void *) locktag, hashcode, HASH_ENTER_NULL, &found); if (!lock) { LWLockRelease(partitionLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); } locallock->lock = lock; /* * if it's a new lock object, initialize it */ if (!found) { lock->grantMask = 0; lock->waitMask = 0; SHMQueueInit(&(lock->procLocks)); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES); LOCK_PRINT("LockAcquire: new", lock, lockmode); } else { LOCK_PRINT("LockAcquire: found", lock, lockmode); Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); } /* * Create the hash key for the proclock table. */ proclocktag.myLock = lock; proclocktag.myProc = MyProc; proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); /* * Find or create a proclock entry with this tag */ proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, (void *) &proclocktag, proclock_hashcode, HASH_ENTER_NULL, &found); if (!proclock) { /* Ooops, not enough shmem for the proclock */ if (lock->nRequested == 0) { /* * There are no other requestors of this lock, so garbage-collect * the lock object. We *must* do this to avoid a permanent leak * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ Assert(SHMQueueEmpty(&(lock->procLocks))); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, HASH_REMOVE, NULL)) elog(PANIC, "lock table corrupted"); } LWLockRelease(partitionLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); } locallock->proclock = proclock; /* * If new, initialize the new entry */ if (!found) { proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]), &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else { PROCLOCK_PRINT("LockAcquire: found", proclock); Assert((proclock->holdMask & ~lock->grantMask) == 0);#ifdef CHECK_DEADLOCK_RISK /* * Issue warning if we already hold a lower-level lock on this object * and do not hold a lock of the requested level or higher. This * indicates a deadlock-prone coding practice (eg, we'd have a * deadlock if another backend were following the same code path at * about the same time). * * This is not enabled by default, because it may generate log entries * about user-level coding practices that are in fact safe in context. * It can be enabled to help find system-level problems. * * XXX Doing numeric comparison on the lockmodes is a hack; it'd be * better to use a table. For now, though, this works. */ { int i; for (i = lockMethodTable->numLockModes; i > 0; i--) { if (proclock->holdMask & LOCKBIT_ON(i)) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ elog(LOG, "deadlock risk: raising lock level" " from %s to %s on object %u/%u/%u", lockMethodTable->lockModeNames[i], lockMethodTable->lockModeNames[lockmode], lock->tag.locktag_field1, lock->tag.locktag_field2, lock->tag.locktag_field3); break; } } }#endif /* CHECK_DEADLOCK_RISK */ } /* * lock->nRequested and lock->requested[] count the total number of * requests, whether granted or waiting, so increment those immediately. * The other counts don't increment till we get the lock. */ lock->nRequested++; lock->requested[lockmode]++; Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* * We shouldn't already hold the desired lock; else locallock table is * broken. */ if (proclock->holdMask & LOCKBIT_ON(lockmode)) elog(ERROR, "lock %s on object %u/%u/%u is already held", lockMethodTable->lockModeNames[lockmode], lock->tag.locktag_field1, lock->tag.locktag_field2, lock->tag.locktag_field3); /* * If lock requested conflicts with locks requested by waiters, must join * wait queue. Otherwise, check for conflict with already-held locks. * (That's last because most complex check.) */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) status = STATUS_FOUND; else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock, MyProc); if (status == STATUS_OK) { /* No conflict with held or previously requested locks */ GrantLock(lock, proclock, lockmode); GrantLockLocal(locallock, owner); } else { Assert(status == STATUS_FOUND); /* * We can't acquire the lock immediately. If caller specified no * blocking, remove useless table entries and return NOT_AVAIL without * waiting. */ if (dontWait) { if (proclock->holdMask == 0) { SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), proclock_hashcode, HASH_REMOVE, NULL)) elog(PANIC, "proclock table corrupted"); } else PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); return LOCKACQUIRE_NOT_AVAIL; } /* * Set bitmask of locks this process already holds on this object. */ MyProc->heldLocks = proclock->holdMask; /* * Sleep till someone wakes me up. */ PG_TRACE2(lock__startwait, locktag->locktag_field2, lockmode); WaitOnLock(locallock, owner); PG_TRACE2(lock__endwait, locktag->locktag_field2, lockmode); /* * NOTE: do not do any material change of state between here and * return. All required changes in locktable state must have been * done when the lock was granted to us --- see notes in WaitOnLock. */ /* * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ LWLockRelease(partitionLock); elog(ERROR, "LockAcquire failed"); } PROCLOCK_PRINT("LockAcquire: granted", proclock); LOCK_PRINT("LockAcquire: granted", lock, lockmode); } LWLockRelease(partitionLock); return LOCKACQUIRE_OK;}/* * Subroutine to free a locallock entry */static voidRemoveLocalLock(LOCALLOCK *locallock){ pfree(locallock->lockOwners); locallock->lockOwners = NULL; if (!hash_search(LockMethodLocalHash, (void *) &(locallock->tag), HASH_REMOVE, NULL)) elog(WARNING, "locallock table corrupted");}/* * LockCheckConflicts -- test whether requested lock conflicts * with those already granted * * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict. * * NOTES: * Here's what makes this complicated: one process's locks don't * conflict with one another, no matter what purpose they are held for * (eg, session and transaction locks do not conflict). * So, we must subtract off our own locks when determining whether the * requested new lock conflicts with those already held. */intLockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock, PGPROC *proc){ int numLockModes = lockMethodTable->numLockModes; LOCKMASK myLocks; LOCKMASK otherLocks; int i; /* * first check for global conflicts: If no locks conflict with my request, * then I get the lock. * * Checking for conflict: lock->grantMask represents the types of * currently held locks. conflictTable[lockmode] has a bit set for each * type of lock that conflicts with request. Bitwise compare tells if * there is a conflict. */ if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask)) { PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return STATUS_OK; } /* * Rats. Something conflicts. But it could still be my own lock. We have * to construct a conflict mask that does not reflect our own locks, but * only lock types held by other processes. */ myLocks = proclock->holdMask; otherLocks = 0; for (i = 1; i <= numLockModes; i++) { int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0; if (lock->granted[i] > myHolding) otherLocks |= LOCKBIT_ON(i); } /* * now check again for conflicts. 'otherLocks' describes the types of * locks held by other processes. If one of these conflicts with the kind * of lock that I want, there is a conflict and I have to sleep. */ if (!(lockMethodTable->conflictTab[lockmode] & otherLocks)) { /* no conflict. OK to get the lock */ PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); return STATUS_OK; } PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock); return STATUS_FOUND;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -