📄 deadlock.c
字号:
*/ procLocks = &(lock->procLocks); proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); while (proclock) { proc = (PGPROC *) MAKE_PTR(proclock->tag.proc); /* A proc never blocks itself */ if (proc != checkProc) { for (lm = 1; lm <= numLockModes; lm++) { if ((proclock->holdMask & LOCKBIT_ON(lm)) && (conflictMask & LOCKBIT_ON(lm))) { /* This proc hard-blocks checkProc */ if (FindLockCycleRecurse(proc, depth + 1, softEdges, nSoftEdges)) { /* fill deadlockDetails[] */ DEADLOCK_INFO *info = &deadlockDetails[depth]; info->locktag = lock->tag; info->lockmode = checkProc->waitLockMode; info->pid = checkProc->pid; return true; } /* If no deadlock, we're done looking at this proclock */ break; } } } proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, offsetof(PROCLOCK, lockLink)); } /* * Scan for procs that are ahead of this one in the lock's wait queue. * Those that have conflicting requests soft-block this one. This must be * done after the hard-block search, since if another proc both hard- and * soft-blocks this one, we want to call it a hard edge. * * If there is a proposed re-ordering of the lock's wait order, use that * rather than the current wait order. */ for (i = 0; i < nWaitOrders; i++) { if (waitOrders[i].lock == lock) break; } if (i < nWaitOrders) { /* Use the given hypothetical wait queue order */ PGPROC **procs = waitOrders[i].procs; queue_size = waitOrders[i].nProcs; for (i = 0; i < queue_size; i++) { proc = procs[i]; /* Done when we reach the target proc */ if (proc == checkProc) break; /* Is there a conflict with this guy's request? */ if (((1 << proc->waitLockMode) & conflictMask) != 0) { /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, depth + 1, softEdges, nSoftEdges)) { /* fill deadlockDetails[] */ DEADLOCK_INFO *info = &deadlockDetails[depth]; info->locktag = lock->tag; info->lockmode = checkProc->waitLockMode; info->pid = checkProc->pid; /* * Add this edge to the list of soft edges in the cycle */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; (*nSoftEdges)++; return true; } } } } else { /* Use the true lock wait queue order */ waitQueue = &(lock->waitProcs); queue_size = waitQueue->size; proc = (PGPROC *) MAKE_PTR(waitQueue->links.next); while (queue_size-- > 0) { /* Done when we reach the target proc */ if (proc == checkProc) break; /* Is there a conflict with this guy's request? */ if (((1 << proc->waitLockMode) & conflictMask) != 0) { /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, depth + 1, softEdges, nSoftEdges)) { /* fill deadlockDetails[] */ DEADLOCK_INFO *info = &deadlockDetails[depth]; info->locktag = lock->tag; info->lockmode = checkProc->waitLockMode; info->pid = checkProc->pid; /* * Add this edge to the list of soft edges in the cycle */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; (*nSoftEdges)++; return true; } } proc = (PGPROC *) MAKE_PTR(proc->links.next); } } /* * No conflict detected here. */ return false;}/* * ExpandConstraints -- expand a list of constraints into a set of * specific new orderings for affected wait queues * * Input is a list of soft edges to be reversed. The output is a list * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PGPROC array * workspace in waitOrderProcs[]. * * Returns TRUE if able to build an ordering that satisfies all the * constraints, FALSE if not (there are contradictory constraints). */static boolExpandConstraints(EDGE *constraints, int nConstraints){ int nWaitOrderProcs = 0; int i, j; nWaitOrders = 0; /* * Scan constraint list backwards. This is because the last-added * constraint is the only one that could fail, and so we want to test it * for inconsistency first. */ for (i = nConstraints; --i >= 0;) { PGPROC *proc = constraints[i].waiter; LOCK *lock = proc->waitLock; /* Did we already make a list for this lock? */ for (j = nWaitOrders; --j >= 0;) { if (waitOrders[j].lock == lock) break; } if (j >= 0) continue; /* No, so allocate a new list */ waitOrders[nWaitOrders].lock = lock; waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs; waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; nWaitOrderProcs += lock->waitProcs.size; Assert(nWaitOrderProcs <= MaxBackends); /* * Do the topo sort. TopoSort need not examine constraints after this * one, since they must be for different locks. */ if (!TopoSort(lock, constraints, i + 1, waitOrders[nWaitOrders].procs)) return false; nWaitOrders++; } return true;}/* * TopoSort -- topological sort of a wait queue * * Generate a re-ordering of a lock's wait queue that satisfies given * constraints about certain procs preceding others. (Each such constraint * is a fact of a partial ordering.) Minimize rearrangement of the queue * not needed to achieve the partial ordering. * * This is a lot simpler and slower than, for example, the topological sort * algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't * try to minimize the damage to the existing order. In practice we are * not likely to be working with more than a few constraints, so the apparent * slowness of the algorithm won't really matter. * * The initial queue ordering is taken directly from the lock's wait queue. * The output is an array of PGPROC pointers, of length equal to the lock's * wait queue length (the caller is responsible for providing this space). * The partial order is specified by an array of EDGE structs. Each EDGE * is one that we need to reverse, therefore the "waiter" must appear before * the "blocker" in the output array. The EDGE array may well contain * edges associated with other locks; these should be ignored. * * Returns TRUE if able to build an ordering that satisfies all the * constraints, FALSE if not (there are contradictory constraints). */static boolTopoSort(LOCK *lock, EDGE *constraints, int nConstraints, PGPROC **ordering) /* output argument */{ PROC_QUEUE *waitQueue = &(lock->waitProcs); int queue_size = waitQueue->size; PGPROC *proc; int i, j, k, last; /* First, fill topoProcs[] array with the procs in their current order */ proc = (PGPROC *) MAKE_PTR(waitQueue->links.next); for (i = 0; i < queue_size; i++) { topoProcs[i] = proc; proc = (PGPROC *) MAKE_PTR(proc->links.next); } /* * Scan the constraints, and for each proc in the array, generate a count * of the number of constraints that say it must be before something else, * plus a list of the constraints that say it must be after something * else. The count for the j'th proc is stored in beforeConstraints[j], * and the head of its list in afterConstraints[j]. Each constraint * stores its list link in constraints[i].link (note any constraint will * be in just one list). The array index for the before-proc of the i'th * constraint is remembered in constraints[i].pred. */ MemSet(beforeConstraints, 0, queue_size * sizeof(int)); MemSet(afterConstraints, 0, queue_size * sizeof(int)); for (i = 0; i < nConstraints; i++) { proc = constraints[i].waiter; /* Ignore constraint if not for this lock */ if (proc->waitLock != lock) continue; /* Find the waiter proc in the array */ for (j = queue_size; --j >= 0;) { if (topoProcs[j] == proc) break; } Assert(j >= 0); /* should have found a match */ /* Find the blocker proc in the array */ proc = constraints[i].blocker; for (k = queue_size; --k >= 0;) { if (topoProcs[k] == proc) break; } Assert(k >= 0); /* should have found a match */ beforeConstraints[j]++; /* waiter must come before */ /* add this constraint to list of after-constraints for blocker */ constraints[i].pred = j; constraints[i].link = afterConstraints[k]; afterConstraints[k] = i + 1; } /*-------------------- * Now scan the topoProcs array backwards. At each step, output the * last proc that has no remaining before-constraints, and decrease * the beforeConstraints count of each of the procs it was constrained * against. * i = index of ordering[] entry we want to output this time * j = search index for topoProcs[] * k = temp for scanning constraint list for proc j * last = last non-null index in topoProcs (avoid redundant searches) *-------------------- */ last = queue_size - 1; for (i = queue_size; --i >= 0;) { /* Find next candidate to output */ while (topoProcs[last] == NULL) last--; for (j = last; j >= 0; j--) { if (topoProcs[j] != NULL && beforeConstraints[j] == 0) break; } /* If no available candidate, topological sort fails */ if (j < 0) return false; /* Output candidate, and mark it done by zeroing topoProcs[] entry */ ordering[i] = topoProcs[j]; topoProcs[j] = NULL; /* Update beforeConstraints counts of its predecessors */ for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link) beforeConstraints[constraints[k - 1].pred]--; } /* Done */ return true;}#ifdef DEBUG_DEADLOCKstatic voidPrintLockQueue(LOCK *lock, const char *info){ PROC_QUEUE *waitQueue = &(lock->waitProcs); int queue_size = waitQueue->size; PGPROC *proc; int i; printf("%s lock %lx queue ", info, MAKE_OFFSET(lock)); proc = (PGPROC *) MAKE_PTR(waitQueue->links.next); for (i = 0; i < queue_size; i++) { printf(" %d", proc->pid); proc = (PGPROC *) MAKE_PTR(proc->links.next); } printf("\n"); fflush(stdout);}#endif/* * Append a description of a lockable object to buf. * * XXX probably this should be exported from lmgr.c or some such place. */static voidDescribeLockTag(StringInfo buf, const LOCKTAG *lock){ switch (lock->locktag_type) { case LOCKTAG_RELATION: appendStringInfo(buf, _("relation %u of database %u"), lock->locktag_field2, lock->locktag_field1); break; case LOCKTAG_RELATION_EXTEND: appendStringInfo(buf, _("extension of relation %u of database %u"), lock->locktag_field2, lock->locktag_field1); break; case LOCKTAG_PAGE: appendStringInfo(buf, _("page %u of relation %u of database %u"), lock->locktag_field3, lock->locktag_field2, lock->locktag_field1); break; case LOCKTAG_TUPLE: appendStringInfo(buf, _("tuple (%u,%u) of relation %u of database %u"), lock->locktag_field3, lock->locktag_field4, lock->locktag_field2, lock->locktag_field1); break; case LOCKTAG_TRANSACTION: appendStringInfo(buf, _("transaction %u"), lock->locktag_field1); break; case LOCKTAG_OBJECT: appendStringInfo(buf, _("object %u of class %u of database %u"), lock->locktag_field3, lock->locktag_field2, lock->locktag_field1); break; case LOCKTAG_USERLOCK: appendStringInfo(buf, _("user lock [%u,%u]"), lock->locktag_field1, lock->locktag_field2); break; default: appendStringInfo(buf, _("unrecognized locktag type %d"), lock->locktag_type); break; }}/* * Report a detected deadlock, with available details. */voidDeadLockReport(void){ StringInfoData buf; StringInfoData buf2; int i; initStringInfo(&buf); initStringInfo(&buf2); for (i = 0; i < nDeadlockDetails; i++) { DEADLOCK_INFO *info = &deadlockDetails[i]; int nextpid; /* The last proc waits for the first one... */ if (i < nDeadlockDetails - 1) nextpid = info[1].pid; else nextpid = deadlockDetails[0].pid; if (i > 0) appendStringInfoChar(&buf, '\n'); /* reset buf2 to hold next object description */ buf2.len = 0; buf2.data[0] = '\0'; DescribeLockTag(&buf2, &info->locktag); appendStringInfo(&buf, _("Process %d waits for %s on %s; blocked by process %d."), info->pid, GetLockmodeName(info->lockmode), buf2.data, nextpid); } ereport(ERROR, (errcode(ERRCODE_T_R_DEADLOCK_DETECTED), errmsg("deadlock detected"), errdetail("%s", buf.data)));}/* * RememberSimpleDeadLock: set up info for DeadLockReport when ProcSleep * detects a trivial (two-way) deadlock. proc1 wants to block for lockmode * on lock, but proc2 is already waiting and would be blocked by proc1. */voidRememberSimpleDeadLock(PGPROC *proc1, LOCKMODE lockmode, LOCK *lock, PGPROC *proc2){ DEADLOCK_INFO *info = &deadlockDetails[0]; info->locktag = lock->tag; info->lockmode = lockmode; info->pid = proc1->pid; info++; info->locktag = proc2->waitLock->tag; info->lockmode = proc2->waitLockMode; info->pid = proc2->pid; nDeadlockDetails = 2;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -