📄 lock.c
字号:
* needed */ proclocktag.lock = MAKE_OFFSET(lock); proclocktag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &proclocktag.xid); /* * Find or create a proclock entry with this tag */ proclockTable = lockMethodTable->proclockHash; proclock = (PROCLOCK *) hash_search(proclockTable, (void *) &proclocktag, HASH_ENTER, &found); if (!proclock) { LWLockRelease(masterLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You may need to increase max_locks_per_transaction."))); } /* * If new, initialize the new entry */ if (!found) { proclock->nHolding = 0; MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES); /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink); SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else { PROCLOCK_PRINT("LockAcquire: found", proclock); Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0)); Assert(proclock->nHolding <= lock->nGranted);#ifdef CHECK_DEADLOCK_RISK /* * Issue warning if we already hold a lower-level lock on this * object and do not hold a lock of the requested level or higher. * This indicates a deadlock-prone coding practice (eg, we'd have * a deadlock if another backend were following the same code path * at about the same time). * * This is not enabled by default, because it may generate log * entries about user-level coding practices that are in fact safe * in context. It can be enabled to help find system-level * problems. * * XXX Doing numeric comparison on the lockmodes is a hack; it'd be * better to use a table. For now, though, this works. */ for (i = lockMethodTable->numLockModes; i > 0; i--) { if (proclock->holding[i] > 0) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ elog(LOG, "deadlock risk: raising lock level" " from %s to %s on object %u/%u/%u", lock_mode_names[i], lock_mode_names[lockmode], lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); break; } }#endif /* CHECK_DEADLOCK_RISK */ } /* * lock->nRequested and lock->requested[] count the total number of * requests, whether granted or waiting, so increment those * immediately. The other counts don't increment till we get the lock. */ lock->nRequested++; lock->requested[lockmode]++; Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* * If I already hold one or more locks of the requested type, just * grant myself another one without blocking. */ if (proclock->holding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); PROCLOCK_PRINT("LockAcquire: owning", proclock); LWLockRelease(masterLock); return TRUE; } /* * If this process (under any XID) is a proclock of the lock, also * grant myself another one without blocking. */ LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); if (myHolding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock); LWLockRelease(masterLock); return TRUE; } /* * If lock requested conflicts with locks requested by waiters, must * join wait queue. Otherwise, check for conflict with already-held * locks. (That's last because most complex check.) */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) status = STATUS_FOUND; else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock, MyProc, myHolding); if (status == STATUS_OK) { /* No conflict with held or previously requested locks */ GrantLock(lock, proclock, lockmode); } else { Assert(status == STATUS_FOUND); /* * We can't acquire the lock immediately. If caller specified no * blocking, remove the proclock entry and return FALSE without * waiting. */ if (dontWait) { if (proclock->nHolding == 0) { SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); proclock = (PROCLOCK *) hash_search(proclockTable, (void *) proclock, HASH_REMOVE, NULL); if (!proclock) elog(WARNING, "proclock table corrupted"); } else PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(masterLock); return FALSE; } /* * Construct bitmask of locks this process holds on this object. */ { int heldLocks = 0; int tmpMask; for (i = 1, tmpMask = 2; i <= lockMethodTable->numLockModes; i++, tmpMask <<= 1) { if (myHolding[i] > 0) heldLocks |= tmpMask; } MyProc->heldLocks = heldLocks; } /* * Sleep till someone wakes me up. */ status = WaitOnLock(lockmethod, lockmode, lock, proclock); /* * NOTE: do not do any material change of state between here and * return. All required changes in locktable state must have been * done when the lock was granted to us --- see notes in * WaitOnLock. */ /* * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0))) { PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ LWLockRelease(masterLock); return FALSE; } PROCLOCK_PRINT("LockAcquire: granted", proclock); LOCK_PRINT("LockAcquire: granted", lock, lockmode); } LWLockRelease(masterLock); return status == STATUS_OK;}/* * LockCheckConflicts -- test whether requested lock conflicts * with those already granted * * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict. * * NOTES: * Here's what makes this complicated: one process's locks don't * conflict with one another, even if they are held under different * transaction IDs (eg, session and xact locks do not conflict). * So, we must subtract off our own locks when determining whether the * requested new lock conflicts with those already held. * * The caller can optionally pass the process's total holding counts, if * known. If NULL is passed then these values will be computed internally. */intLockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock, PGPROC *proc, int *myHolding) /* myHolding[] array or NULL */{ int numLockModes = lockMethodTable->numLockModes; int bitmask; int i, tmpMask; int localHolding[MAX_LOCKMODES]; /* * first check for global conflicts: If no locks conflict with my * request, then I get the lock. * * Checking for conflict: lock->grantMask represents the types of * currently held locks. conflictTable[lockmode] has a bit set for * each type of lock that conflicts with request. Bitwise compare * tells if there is a conflict. */ if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask)) { PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return STATUS_OK; } /* * Rats. Something conflicts. But it could still be my own lock. We * have to construct a conflict mask that does not reflect our own * locks. Locks held by the current process under another XID also * count as "our own locks". */ if (myHolding == NULL) { /* Caller didn't do calculation of total holding for me */ LockCountMyLocks(proclock->tag.lock, proc, localHolding); myHolding = localHolding; } /* Compute mask of lock types held by other processes */ bitmask = 0; tmpMask = 2; for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) { if (lock->granted[i] != myHolding[i]) bitmask |= tmpMask; } /* * now check again for conflicts. 'bitmask' describes the types of * locks held by other processes. If one of these conflicts with the * kind of lock that I want, there is a conflict and I have to sleep. */ if (!(lockMethodTable->conflictTab[lockmode] & bitmask)) { /* no conflict. OK to get the lock */ PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); return STATUS_OK; } PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock); return STATUS_FOUND;}/* * LockCountMyLocks --- Count total number of locks held on a given lockable * object by a given process (under any transaction ID). * * XXX This could be rather slow if the process holds a large number of locks. * Perhaps it could be sped up if we kept yet a third hashtable of per- * process lock information. However, for the normal case where a transaction * doesn't hold a large number of locks, keeping such a table would probably * be a net slowdown. */static voidLockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding){ SHM_QUEUE *procHolders = &(proc->procHolders); PROCLOCK *proclock; int i; MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int)); proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, offsetof(PROCLOCK, procLink)); while (proclock) { if (lockOffset == proclock->tag.lock) { for (i = 1; i < MAX_LOCKMODES; i++) myHolding[i] += proclock->holding[i]; } proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, offsetof(PROCLOCK, procLink)); }}/* * GrantLock -- update the lock and proclock data structures to show * the lock request has been granted. * * NOTE: if proc was blocked, it also needs to be removed from the wait list * and have its waitLock/waitHolder fields cleared. That's not done here. */voidGrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode){ lock->nGranted++; lock->granted[lockmode]++; lock->grantMask |= BITS_ON[lockmode]; if (lock->granted[lockmode] == lock->requested[lockmode]) lock->waitMask &= BITS_OFF[lockmode]; LOCK_PRINT("GrantLock", lock, lockmode); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); proclock->holding[lockmode]++; proclock->nHolding++; Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0));}/* * WaitOnLock -- wait to acquire a lock * * Caller must have set MyProc->heldLocks to reflect locks already held * on the lockable object by this process (under all XIDs). * * The locktable's masterLock must be held at entry. */static intWaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock){ LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod]; char *new_status, *old_status; Assert(lockmethod < NumLockMethods); LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode); old_status = pstrdup(get_ps_display()); new_status = (char *) palloc(strlen(old_status) + 10); strcpy(new_status, old_status); strcat(new_status, " waiting"); set_ps_display(new_status); /* * NOTE: Think not to put any shared-state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state * must be fully set by the lock grantor, or by CheckDeadLock if we * give up waiting for the lock. This is necessary because of the * possibility that a cancel/die interrupt will interrupt ProcSleep * after someone else grants us the lock, but before we've noticed it. * Hence, after granting, the locktable state must fully reflect the * fact that we own the lock; we can't do additional work on return. * Contrariwise, if we fail, any cleanup must happen in xact abort * processing, not here, to ensure it will also happen in the * cancel/die case. */ if (ProcSleep(lockMethodTable, lockmode, lock, proclock) != STATUS_OK) { /* * We failed as a result of a deadlock, see CheckDeadLock(). Quit * now. Removal of the proclock and lock objects, if no longer * needed, will happen in xact cleanup (see above for motivation). */ LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode); LWLockRelease(lockMethodTable->masterLock); /* * Now that we aren't holding the LockMgrLock, we can give an * error report including details about the detected deadlock. */ DeadLockReport(); /* not reached */ } set_ps_display(old_status); pfree(old_status); pfree(new_status); LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode); return STATUS_OK;}/* * Remove a proc from the wait-queue it is on * (caller must know it is on one). * * Locktable lock must be held by caller. * * NB: this does not remove the process' proclock object, nor the lock object, * even though their counts might now have gone to zero. That will happen * during a subsequent LockReleaseAll call, which we expect will happen * during transaction cleanup. (Removal of a proc from its wait queue by * this routine can only happen if we are aborting the transaction.) */voidRemoveFromWaitQueue(PGPROC *proc){ LOCK *waitLock = proc->waitLock; LOCKMODE lockmode = proc->waitLockMode; /* Make sure proc is waiting */ Assert(proc->links.next != INVALID_OFFSET); Assert(waitLock); Assert(waitLock->waitProcs.size > 0); /* Remove proc from lock's wait queue */ SHMQueueDelete(&(proc->links)); waitLock->waitProcs.size--; /* Undo increments of request counts by waiting process */ Assert(waitLock->nRequested > 0); Assert(waitLock->nRequested > proc->waitLock->nGranted); waitLock->nRequested--; Assert(waitLock->requested[lockmode] > 0); waitLock->requested[lockmode]--; /* don't forget to clear waitMask bit if appropriate */ if (waitLock->granted[lockmode] == waitLock->requested[lockmode]) waitLock->waitMask &= BITS_OFF[lockmode]; /* Clean up the proc's own state */ proc->waitLock = NULL; proc->waitHolder = NULL; /* See if any other waiters for the lock can be woken up now */ ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);}/* * LockRelease -- look up 'locktag' in lock table 'lockmethod' and * release one 'lockmode' lock on it. * * Side Effects: find any waiting processes that are now wakable, * grant them their requested locks and awaken them. * (We have to grant the lock here to avoid a race between * the waking process and any new process to * come along and request the lock.) */boolLockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode){ LOCK *lock; LWLockId masterLock; LOCKMETHODTABLE *lockMethodTable; PROCLOCK *proclock; PROCLOCKTAG proclocktag; HTAB *proclockTable; bool wakeupNeeded = false;#ifdef LOCK_DEBUG if (lockmethod == USER_LOCKMETHOD && Trace_userlocks) elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);#endif /* ???????? This must be changed when short term locks will be used */ locktag->lockmethod = lockmethod; Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; if (!lockMethodTable) { elog(WARNING, "lockMethodTable is null in LockRelease"); return FALSE; } masterLock = lockMethodTable->masterLock; LWLockAcquire(masterLock, LW_EXCLUSIVE); /* * Find a lock with this tag */ Assert(lockMethodTable->lockHash->hash == tag_hash); lock = (LOCK *) hash_search(lockMethodTable->lockHash, (void *) locktag, HASH_FIND, NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -