📄 deadlock.c
字号:
/* * Otherwise, we have a cycle but it does not include the * start point, so say "no deadlock". */ return false; } } /* Mark proc as seen */ Assert(nVisitedProcs < MaxBackends); visitedProcs[nVisitedProcs++] = checkProc; /* * If the proc is not waiting, we have no outgoing waits-for edges. */ if (checkProc->links.next == INVALID_OFFSET) return false; lock = checkProc->waitLock; if (lock == NULL) return false; lockMethodTable = GetLocksMethodTable(lock); numLockModes = lockMethodTable->numLockModes; conflictMask = lockMethodTable->conflictTab[checkProc->waitLockMode]; /* * Scan for procs that already hold conflicting locks. These are * "hard" edges in the waits-for graph. */ lockHolders = &(lock->lockHolders); proclock = (PROCLOCK *) SHMQueueNext(lockHolders, lockHolders, offsetof(PROCLOCK, lockLink)); while (proclock) { proc = (PGPROC *) MAKE_PTR(proclock->tag.proc); /* A proc never blocks itself */ if (proc != checkProc) { for (lm = 1; lm <= numLockModes; lm++) { if (proclock->holding[lm] > 0 && ((1 << lm) & conflictMask) != 0) { /* This proc hard-blocks checkProc */ if (FindLockCycleRecurse(proc, depth + 1, softEdges, nSoftEdges)) { /* fill deadlockDetails[] */ DEADLOCK_INFO *info = &deadlockDetails[depth]; info->locktag = lock->tag; info->lockmode = checkProc->waitLockMode; info->pid = checkProc->pid; return true; } /* If no deadlock, we're done looking at this proclock */ break; } } } proclock = (PROCLOCK *) SHMQueueNext(lockHolders, &proclock->lockLink, offsetof(PROCLOCK, lockLink)); } /* * Scan for procs that are ahead of this one in the lock's wait queue. * Those that have conflicting requests soft-block this one. This * must be done after the hard-block search, since if another proc * both hard- and soft-blocks this one, we want to call it a hard * edge. * * If there is a proposed re-ordering of the lock's wait order, use that * rather than the current wait order. */ for (i = 0; i < nWaitOrders; i++) { if (waitOrders[i].lock == lock) break; } if (i < nWaitOrders) { /* Use the given hypothetical wait queue order */ PGPROC **procs = waitOrders[i].procs; queue_size = waitOrders[i].nProcs; for (i = 0; i < queue_size; i++) { proc = procs[i]; /* Done when we reach the target proc */ if (proc == checkProc) break; /* Is there a conflict with this guy's request? */ if (((1 << proc->waitLockMode) & conflictMask) != 0) { /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, depth + 1, softEdges, nSoftEdges)) { /* fill deadlockDetails[] */ DEADLOCK_INFO *info = &deadlockDetails[depth]; info->locktag = lock->tag; info->lockmode = checkProc->waitLockMode; info->pid = checkProc->pid; /* * Add this edge to the list of soft edges in the * cycle */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; (*nSoftEdges)++; return true; } } } } else { /* Use the true lock wait queue order */ waitQueue = &(lock->waitProcs); queue_size = waitQueue->size; proc = (PGPROC *) MAKE_PTR(waitQueue->links.next); while (queue_size-- > 0) { /* Done when we reach the target proc */ if (proc == checkProc) break; /* Is there a conflict with this guy's request? */ if (((1 << proc->waitLockMode) & conflictMask) != 0) { /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, depth + 1, softEdges, nSoftEdges)) { /* fill deadlockDetails[] */ DEADLOCK_INFO *info = &deadlockDetails[depth]; info->locktag = lock->tag; info->lockmode = checkProc->waitLockMode; info->pid = checkProc->pid; /* * Add this edge to the list of soft edges in the * cycle */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; (*nSoftEdges)++; return true; } } proc = (PGPROC *) MAKE_PTR(proc->links.next); } } /* * No conflict detected here. */ return false;}/* * ExpandConstraints -- expand a list of constraints into a set of * specific new orderings for affected wait queues * * Input is a list of soft edges to be reversed. The output is a list * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PGPROC array * workspace in waitOrderProcs[]. * * Returns TRUE if able to build an ordering that satisfies all the * constraints, FALSE if not (there are contradictory constraints). */static boolExpandConstraints(EDGE *constraints, int nConstraints){ int nWaitOrderProcs = 0; int i, j; nWaitOrders = 0; /* * Scan constraint list backwards. This is because the last-added * constraint is the only one that could fail, and so we want to test * it for inconsistency first. */ for (i = nConstraints; --i >= 0;) { PGPROC *proc = constraints[i].waiter; LOCK *lock = proc->waitLock; /* Did we already make a list for this lock? */ for (j = nWaitOrders; --j >= 0;) { if (waitOrders[j].lock == lock) break; } if (j >= 0) continue; /* No, so allocate a new list */ waitOrders[nWaitOrders].lock = lock; waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs; waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; nWaitOrderProcs += lock->waitProcs.size; Assert(nWaitOrderProcs <= MaxBackends); /* * Do the topo sort. TopoSort need not examine constraints after * this one, since they must be for different locks. */ if (!TopoSort(lock, constraints, i + 1, waitOrders[nWaitOrders].procs)) return false; nWaitOrders++; } return true;}/* * TopoSort -- topological sort of a wait queue * * Generate a re-ordering of a lock's wait queue that satisfies given * constraints about certain procs preceding others. (Each such constraint * is a fact of a partial ordering.) Minimize rearrangement of the queue * not needed to achieve the partial ordering. * * This is a lot simpler and slower than, for example, the topological sort * algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't * try to minimize the damage to the existing order. In practice we are * not likely to be working with more than a few constraints, so the apparent * slowness of the algorithm won't really matter. * * The initial queue ordering is taken directly from the lock's wait queue. * The output is an array of PGPROC pointers, of length equal to the lock's * wait queue length (the caller is responsible for providing this space). * The partial order is specified by an array of EDGE structs. Each EDGE * is one that we need to reverse, therefore the "waiter" must appear before * the "blocker" in the output array. The EDGE array may well contain * edges associated with other locks; these should be ignored. * * Returns TRUE if able to build an ordering that satisfies all the * constraints, FALSE if not (there are contradictory constraints). */static boolTopoSort(LOCK *lock, EDGE *constraints, int nConstraints, PGPROC **ordering) /* output argument */{ PROC_QUEUE *waitQueue = &(lock->waitProcs); int queue_size = waitQueue->size; PGPROC *proc; int i, j, k, last; /* First, fill topoProcs[] array with the procs in their current order */ proc = (PGPROC *) MAKE_PTR(waitQueue->links.next); for (i = 0; i < queue_size; i++) { topoProcs[i] = proc; proc = (PGPROC *) MAKE_PTR(proc->links.next); } /* * Scan the constraints, and for each proc in the array, generate a * count of the number of constraints that say it must be before * something else, plus a list of the constraints that say it must be * after something else. The count for the j'th proc is stored in * beforeConstraints[j], and the head of its list in * afterConstraints[j]. Each constraint stores its list link in * constraints[i].link (note any constraint will be in just one list). * The array index for the before-proc of the i'th constraint is * remembered in constraints[i].pred. */ MemSet(beforeConstraints, 0, queue_size * sizeof(int)); MemSet(afterConstraints, 0, queue_size * sizeof(int)); for (i = 0; i < nConstraints; i++) { proc = constraints[i].waiter; /* Ignore constraint if not for this lock */ if (proc->waitLock != lock) continue; /* Find the waiter proc in the array */ for (j = queue_size; --j >= 0;) { if (topoProcs[j] == proc) break; } Assert(j >= 0); /* should have found a match */ /* Find the blocker proc in the array */ proc = constraints[i].blocker; for (k = queue_size; --k >= 0;) { if (topoProcs[k] == proc) break; } Assert(k >= 0); /* should have found a match */ beforeConstraints[j]++; /* waiter must come before */ /* add this constraint to list of after-constraints for blocker */ constraints[i].pred = j; constraints[i].link = afterConstraints[k]; afterConstraints[k] = i + 1; } /*-------------------- * Now scan the topoProcs array backwards. At each step, output the * last proc that has no remaining before-constraints, and decrease * the beforeConstraints count of each of the procs it was constrained * against. * i = index of ordering[] entry we want to output this time * j = search index for topoProcs[] * k = temp for scanning constraint list for proc j * last = last non-null index in topoProcs (avoid redundant searches) *-------------------- */ last = queue_size - 1; for (i = queue_size; --i >= 0;) { /* Find next candidate to output */ while (topoProcs[last] == NULL) last--; for (j = last; j >= 0; j--) { if (topoProcs[j] != NULL && beforeConstraints[j] == 0) break; } /* If no available candidate, topological sort fails */ if (j < 0) return false; /* Output candidate, and mark it done by zeroing topoProcs[] entry */ ordering[i] = topoProcs[j]; topoProcs[j] = NULL; /* Update beforeConstraints counts of its predecessors */ for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link) beforeConstraints[constraints[k - 1].pred]--; } /* Done */ return true;}#ifdef DEBUG_DEADLOCKstatic voidPrintLockQueue(LOCK *lock, const char *info){ PROC_QUEUE *waitQueue = &(lock->waitProcs); int queue_size = waitQueue->size; PGPROC *proc; int i; printf("%s lock %lx queue ", info, MAKE_OFFSET(lock)); proc = (PGPROC *) MAKE_PTR(waitQueue->links.next); for (i = 0; i < queue_size; i++) { printf(" %d", proc->pid); proc = (PGPROC *) MAKE_PTR(proc->links.next); } printf("\n"); fflush(stdout);}#endif/* * Report a detected deadlock, with available details. */voidDeadLockReport(void){ StringInfoData buf; int i; initStringInfo(&buf); for (i = 0; i < nDeadlockDetails; i++) { DEADLOCK_INFO *info = &deadlockDetails[i]; int nextpid; /* The last proc waits for the first one... */ if (i < nDeadlockDetails - 1) nextpid = info[1].pid; else nextpid = deadlockDetails[0].pid; if (i > 0) appendStringInfoChar(&buf, '\n'); if (info->locktag.relId == XactLockTableId && info->locktag.dbId == 0) { /* Lock is for transaction ID */ appendStringInfo(&buf, gettext("Process %d waits for %s on transaction %u; blocked by process %d."), info->pid, GetLockmodeName(info->lockmode), info->locktag.objId.xid, nextpid); } else { /* Lock is for a relation */ appendStringInfo(&buf, gettext("Process %d waits for %s on relation %u of database %u; blocked by process %d."), info->pid, GetLockmodeName(info->lockmode), info->locktag.relId, info->locktag.dbId, nextpid); } } ereport(ERROR, (errcode(ERRCODE_T_R_DEADLOCK_DETECTED), errmsg("deadlock detected"), errdetail("%s", buf.data)));}/* * RememberSimpleDeadLock: set up info for DeadLockReport when ProcSleep * detects a trivial (two-way) deadlock. proc1 wants to block for lockmode * on lock, but proc2 is already waiting and would be blocked by proc1. */voidRememberSimpleDeadLock(PGPROC *proc1, LOCKMODE lockmode, LOCK *lock, PGPROC *proc2){ DEADLOCK_INFO *info = &deadlockDetails[0]; info->locktag = lock->tag; info->lockmode = lockmode; info->pid = proc1->pid; info++; info->locktag = proc2->waitLock->tag; info->lockmode = proc2->waitLockMode; info->pid = proc2->pid; nDeadlockDetails = 2;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -