📄 lock.c
字号:
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue); XID_PRINT("DeadLockCheck", xidLook); for (;;) { done = (xidLook->queue.next == end); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); LOCK_PRINT("DeadLockCheck", lock, 0); if (lock->tag.relId == 0) /* user' lock */ goto nxtl; /* * waitLock is always in lockQueue of waiting proc, if !first_run * then upper caller will handle waitProcs queue of waitLock. */ if (thisProc->waitLock == lock && !first_run) goto nxtl; /* * If we found proc holding findlock and sleeping on some my other * lock then we have to check does it block me or another waiters. */ if (lock == findlock && !first_run) { LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; int lm; Assert(xidLook->nHolding > 0); for (lm = 1; lm <= lockctl->numLockModes; lm++) { if (xidLook->holders[lm] > 0 && lockctl->conflictTab[lm] & findlock->waitMask) return true; } /* * Else - get the next lock from thisProc's lockQueue */ goto nxtl; } waitQueue = &(lock->waitProcs); waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev); for (i = 0; i < waitQueue->size; i++) { if (waitProc == thisProc) { Assert(waitProc->waitLock == lock); Assert(waitProc == MyProc); waitProc = (PROC *) MAKE_PTR(waitProc->links.prev); continue; } if (lock == findlock) /* first_run also true */ { LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; /* * If me blocked by his holdlock... */ if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock) { /* and he blocked by me -> deadlock */ if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock) return true; /* we shouldn't look at lockQueue of our blockers */ waitProc = (PROC *) MAKE_PTR(waitProc->links.prev); continue; } /* * If he isn't blocked by me and we request * non-conflicting lock modes - no deadlock here because * of he isn't blocked by me in any sence (explicitle or * implicitly). Note that we don't do like test if * !first_run (when thisProc is holder and non-waiter on * lock) and so we call DeadLockCheck below for every * waitProc in thisProc->lockQueue, even for waitProc-s * un-blocked by thisProc. Should we? This could save us * some time... */ if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) && !(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token))) { waitProc = (PROC *) MAKE_PTR(waitProc->links.prev); continue; } } /* * Look in lockQueue of this waitProc, if didn't do this * before. */ for (j = 0; j < nprocs; j++) { if (checked_procs[j] == waitProc) break; } if (j >= nprocs) { Assert(nprocs < MAXBACKENDS); checked_procs[nprocs++] = waitProc; if (DeadLockCheck(waitProc, findlock)) { LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; int holdLock; /* * Ok, but is waitProc waiting for me (thisProc) ? */ if (thisProc->waitLock == lock) { Assert(first_run); holdLock = thisProc->holdLock; } else/* should we cache holdLock ? */ { int lm, tmpMask = 2; Assert(xidLook->nHolding > 0); for (holdLock = 0, lm = 1; lm <= lockctl->numLockModes; lm++, tmpMask <<= 1) { if (xidLook->holders[lm] > 0) holdLock |= tmpMask; } Assert(holdLock != 0); } if (lockctl->conflictTab[waitProc->token] & holdLock) { /* * Last attempt to avoid deadlock - try to wakeup * myself. */ if (first_run) { if (LockResolveConflicts(DEFAULT_LOCKMETHOD, MyProc->waitLock, MyProc->token, MyProc->xid, NULL) == STATUS_OK) { GrantLock(MyProc->waitLock, MyProc->token); (MyProc->waitLock->waitProcs.size)--; ProcWakeup(MyProc, NO_ERROR); return false; } } return true; } /* * Hell! Is he blocked by any (other) holder ? */ if (LockResolveConflicts(DEFAULT_LOCKMETHOD, lock, waitProc->token, waitProc->xid, NULL) != STATUS_OK) { /* * Blocked by others - no deadlock... */#ifdef DEADLOCK_DEBUG LOCK_PRINT("DeadLockCheck: blocked by others", lock, waitProc->token);#endif waitProc = (PROC *) MAKE_PTR(waitProc->links.prev); continue; } /* * Well - wakeup this guy! This is the case of * implicit blocking: thisProc blocked someone who * blocked waitProc by the fact that he/someone is * already waiting for lock. We do this for * anti-starving. */ GrantLock(lock, waitProc->token); waitQueue->size--; waitProc = ProcWakeup(waitProc, NO_ERROR); continue; } } waitProc = (PROC *) MAKE_PTR(waitProc->links.prev); }nxtl: ; if (done) break; SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); xidLook = tmp; } /* if we got here, no deadlock */ return false;}#ifdef NOT_USED/* * Return an array with the pids of all processes owning a lock. * This works only for user locks because normal locks have no * pid information in the corresponding XIDLookupEnt. */ArrayType *LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag){ XIDLookupEnt *xidLook = NULL; SPINLOCK masterLock; LOCK *lock; SHMEM_OFFSET lock_offset; int count = 0; LOCKMETHODTABLE *lockMethodTable; HTAB *xidTable; bool found; int ndims, nitems, hdrlen, size; int lbounds[1], hbounds[1]; ArrayType *array; int *data_ptr; /* Assume that no one will modify the result */ static int empty_array[] = {20, 1, 0, 0, 0};#ifdef USER_LOCKS int is_user_lock; is_user_lock = (lockmethod == USER_LOCKMETHOD); if (is_user_lock) { TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u]", locktag->objId.blkno); }#endif /* This must be changed when short term locks will be used */ locktag->lockmethod = lockmethod; Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods)); lockMethodTable = LockMethodTable[lockmethod]; if (!lockMethodTable) { elog(NOTICE, "lockMethodTable is null in LockOwners"); return (ArrayType *) &empty_array; } if (LockingIsDisabled) return (ArrayType *) &empty_array; masterLock = lockMethodTable->ctl->masterLock; SpinAcquire(masterLock); /* * Find a lock with this tag */ Assert(lockMethodTable->lockHash->hash == tag_hash); lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND, &found); /* * let the caller print its own error message, too. Do not elog(WARN). */ if (!lock) { SpinRelease(masterLock); elog(NOTICE, "LockOwners: locktable corrupted"); return (ArrayType *) &empty_array; } if (!found) { SpinRelease(masterLock);#ifdef USER_LOCKS if (is_user_lock) { TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag"); return (ArrayType *) &empty_array; }#endif elog(NOTICE, "LockOwners: locktable lookup failed, no lock"); return (ArrayType *) &empty_array; } LOCK_PRINT("LockOwners: found", lock, 0); Assert((lock->nHolding > 0) && (lock->nActive > 0)); Assert(lock->nActive <= lock->nHolding); lock_offset = MAKE_OFFSET(lock); /* Construct a 1-dimensional array */ ndims = 1; hdrlen = ARR_OVERHEAD(ndims); lbounds[0] = 0; hbounds[0] = lock->nActive; size = hdrlen + sizeof(int) * hbounds[0]; array = (ArrayType *) palloc(size); MemSet(array, 0, size); memmove((char *) array, (char *) &size, sizeof(int)); memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int)); memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int)); memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int)); SET_LO_FLAG(false, array); data_ptr = (int *) ARR_DATA_PTR(array); xidTable = lockMethodTable->xidHash; hash_seq(NULL); nitems = 0; while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) && (xidLook != (XIDLookupEnt *) TRUE)) { if (count++ > 1000) { elog(NOTICE, "LockOwners: possible loop, giving up"); break; } if (xidLook->tag.pid == 0) { XID_PRINT("LockOwners: no pid", xidLook); continue; } if (!xidLook->tag.lock) { XID_PRINT("LockOwners: NULL LOCK", xidLook); continue; } if (xidLook->tag.lock != lock_offset) { XID_PRINT("LockOwners: different lock", xidLook); continue; } if (LOCK_LOCKMETHOD(*lock) != lockmethod) { XID_PRINT("LockOwners: other table", xidLook); continue; } if (xidLook->nHolding <= 0) { XID_PRINT("LockOwners: not holding", xidLook); continue; } if (nitems >= hbounds[0]) { elog(NOTICE, "LockOwners: array size exceeded"); break; } /* * Check that the holding process is still alive by sending him an * unused (ignored) signal. If the kill fails the process is not * alive. */ if ((xidLook->tag.pid != MyProcPid) \ &&(kill(xidLook->tag.pid, SIGCHLD)) != 0) { /* Return a negative pid to signal that process is dead */ data_ptr[nitems++] = -(xidLook->tag.pid); XID_PRINT("LockOwners: not alive", xidLook); /* XXX - TODO: remove this entry and update lock stats */ continue; } /* Found a process holding the lock */ XID_PRINT("LockOwners: holding", xidLook); data_ptr[nitems++] = xidLook->tag.pid; } SpinRelease(masterLock); /* Adjust the actual size of the array */ hbounds[0] = nitems; size = hdrlen + sizeof(int) * hbounds[0]; memmove((char *) array, (char *) &size, sizeof(int)); memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int)); return array;}#endif#ifdef DEADLOCK_DEBUG/* * Dump all locks in the proc->lockQueue. Must have already acquired * the masterLock. */voidDumpLocks(){ SHMEM_OFFSET location; PROC *proc; SHM_QUEUE *lockQueue; int done; XIDLookupEnt *xidLook = NULL; XIDLookupEnt *tmp = NULL; SHMEM_OFFSET end; SPINLOCK masterLock; int numLockModes; LOCK *lock; int count = 0; int lockmethod = DEFAULT_LOCKMETHOD; LOCKMETHODTABLE *lockMethodTable; ShmemPIDLookup(MyProcPid, &location); if (location == INVALID_OFFSET) return; proc = (PROC *) MAKE_PTR(location); if (proc != MyProc) return; lockQueue = &proc->lockQueue; Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; if (!lockMethodTable) return; numLockModes = lockMethodTable->ctl->numLockModes; masterLock = lockMethodTable->ctl->masterLock; if (SHMQueueEmpty(lockQueue)) return; SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue); end = MAKE_OFFSET(lockQueue); if (MyProc->waitLock) LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0); for (;;) { if (count++ > 2000) { elog(NOTICE, "DumpLocks: xid loop detected, giving up"); break; } /* --------------------------- * XXX Here we assume the shared memory queue is circular and * that we know its internal structure. Should have some sort of * macros to allow one to walk it. mer 20 July 1991 * --------------------------- */ done = (xidLook->queue.next == end); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); XID_PRINT_AUX("DumpLocks", xidLook); LOCK_PRINT_AUX("DumpLocks", lock, 0); if (done) break; SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); xidLook = tmp; }}/* * Dump all postgres locks. Must have already acquired the masterLock. */voidDumpAllLocks(){ SHMEM_OFFSET location; PROC *proc; XIDLookupEnt *xidLook = NULL; LOCK *lock; int pid; int count = 0; int lockmethod = DEFAULT_LOCKMETHOD; LOCKMETHODTABLE *lockMethodTable; HTAB *xidTable; pid = getpid(); ShmemPIDLookup(pid, &location); if (location == INVALID_OFFSET) return; proc = (PROC *) MAKE_PTR(location); if (proc != MyProc) return; Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; if (!lockMethodTable) return; xidTable = lockMethodTable->xidHash; if (MyProc->waitLock) LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock, 0); hash_seq(NULL); while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) && (xidLook != (XIDLookupEnt *) TRUE)) { XID_PRINT_AUX("DumpAllLocks", xidLook); if (xidLook->tag.lock) { lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); LOCK_PRINT_AUX("DumpAllLocks", lock, 0); } else elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL"); if (count++ > 2000) { elog(NOTICE, "DumpAllLocks: possible loop, giving up"); break; } }}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -