📄 lock_deadlock.c
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 1996-2002 * Sleepycat Software. All rights reserved. */#include "db_config.h"#ifndef lintstatic const char revid[] = "$Id: lock_deadlock.c,v 11.54 2002/08/06 05:05:21 bostic Exp $";#endif /* not lint */#ifndef NO_SYSTEM_INCLUDES#include <sys/types.h>#include <string.h>#endif#include "db_int.h"#include "dbinc/db_shash.h"#include "dbinc/lock.h"#include "dbinc/txn.h"#include "dbinc/rep.h"#define ISSET_MAP(M, N) ((M)[(N) / 32] & (1 << (N) % 32))#define CLEAR_MAP(M, N) { \ u_int32_t __i; \ for (__i = 0; __i < (N); __i++) \ (M)[__i] = 0; \}#define SET_MAP(M, B) ((M)[(B) / 32] |= (1 << ((B) % 32)))#define CLR_MAP(M, B) ((M)[(B) / 32] &= ~(1 << ((B) % 32)))#define OR_MAP(D, S, N) { \ u_int32_t __i; \ for (__i = 0; __i < (N); __i++) \ D[__i] |= S[__i]; \}#define BAD_KILLID 0xfffffffftypedef struct { int valid; int self_wait; u_int32_t count; u_int32_t id; u_int32_t last_lock; u_int32_t last_locker_id; db_pgno_t pgno;} locker_info;static int __dd_abort __P((DB_ENV *, locker_info *));static int __dd_build __P((DB_ENV *, u_int32_t, u_int32_t **, u_int32_t *, u_int32_t *, locker_info **));static int __dd_find __P((DB_ENV *, u_int32_t *, locker_info *, u_int32_t, u_int32_t, u_int32_t ***));static int __dd_isolder __P((u_int32_t, u_int32_t, u_int32_t, u_int32_t));static int __dd_verify __P((locker_info *, u_int32_t *, u_int32_t *, u_int32_t *, u_int32_t, u_int32_t, u_int32_t));#ifdef DIAGNOSTICstatic void __dd_debug __P((DB_ENV *, locker_info *, u_int32_t *, u_int32_t, u_int32_t));#endif/* * lock_detect -- * * PUBLIC: int __lock_detect __P((DB_ENV *, u_int32_t, u_int32_t, int *)); */int__lock_detect(dbenv, flags, atype, abortp) DB_ENV *dbenv; u_int32_t flags, atype; int *abortp;{ DB_LOCKREGION *region; DB_LOCKTAB *lt; DB_TXNMGR *tmgr; locker_info *idmap; u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap; u_int32_t i, keeper, killid, limit, nalloc, nlockers; u_int32_t lock_max, txn_max; int ret; PANIC_CHECK(dbenv); ENV_REQUIRES_CONFIG(dbenv, dbenv->lk_handle, "DB_ENV->lock_detect", DB_INIT_LOCK); /* Validate arguments. */ if ((ret = __db_fchk(dbenv, "DB_ENV->lock_detect", flags, 0)) != 0) return (ret); switch (atype) { case DB_LOCK_DEFAULT: case DB_LOCK_EXPIRE: case DB_LOCK_MAXLOCKS: case DB_LOCK_MINLOCKS: case DB_LOCK_MINWRITE: case DB_LOCK_OLDEST: case DB_LOCK_RANDOM: case DB_LOCK_YOUNGEST: break; default: __db_err(dbenv, "DB_ENV->lock_detect: unknown deadlock detection mode specified"); return (EINVAL); } /* * If this environment is a replication client, then we must use the * MINWRITE detection discipline. */ if (__rep_is_client(dbenv)) atype = DB_LOCK_MINWRITE; free_me = NULL; lt = dbenv->lk_handle; if (abortp != NULL) *abortp = 0; /* Check if a detector run is necessary. */ LOCKREGION(dbenv, lt); /* Make a pass only if auto-detect would run. */ region = lt->reginfo.primary; if (region->need_dd == 0) { UNLOCKREGION(dbenv, lt); return (0); } /* Reset need_dd, so we know we've run the detector. */ region->need_dd = 0; /* Build the waits-for bitmap. */ ret = __dd_build(dbenv, atype, &bitmap, &nlockers, &nalloc, &idmap); lock_max = region->stat.st_cur_maxid; UNLOCKREGION(dbenv, lt); /* * We need the cur_maxid from the txn region as well. In order * to avoid tricky synchronization between the lock and txn * regions, we simply unlock the lock region and then lock the * txn region. This introduces a small window during which the * transaction system could then wrap. We're willing to return * the wrong answer for "oldest" or "youngest" in those rare * circumstances. */ tmgr = dbenv->tx_handle; if (tmgr != NULL) { R_LOCK(dbenv, &tmgr->reginfo); txn_max = ((DB_TXNREGION *)tmgr->reginfo.primary)->cur_maxid; R_UNLOCK(dbenv, &tmgr->reginfo); } else txn_max = TXN_MAXIMUM; if (ret != 0 || atype == DB_LOCK_EXPIRE) return (ret); if (nlockers == 0) return (0);#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_WAITSFOR)) __dd_debug(dbenv, idmap, bitmap, nlockers, nalloc);#endif /* Now duplicate the bitmaps so we can verify deadlock participants. */ if ((ret = __os_calloc(dbenv, (size_t)nlockers, sizeof(u_int32_t) * nalloc, ©map)) != 0) goto err; memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc); if ((ret = __os_calloc(dbenv, sizeof(u_int32_t), nalloc, &tmpmap)) != 0) goto err1; /* Find a deadlock. */ if ((ret = __dd_find(dbenv, bitmap, idmap, nlockers, nalloc, &deadp)) != 0) return (ret); killid = BAD_KILLID; free_me = deadp; for (; *deadp != NULL; deadp++) { if (abortp != NULL) ++*abortp; killid = (u_int32_t)((*deadp - bitmap) / nalloc); limit = killid; keeper = BAD_KILLID; if (atype == DB_LOCK_DEFAULT || atype == DB_LOCK_RANDOM) goto dokill; /* * It's conceivable that under XA, the locker could * have gone away. */ if (killid == BAD_KILLID) break; /* * Start with the id that we know is deadlocked * and then examine all other set bits and see * if any are a better candidate for abortion * and that they are genuinely part of the * deadlock. The definition of "best": * OLDEST: smallest id * YOUNGEST: largest id * MAXLOCKS: maximum count * MINLOCKS: minimum count * MINWRITE: minimum count */ for (i = (killid + 1) % nlockers; i != limit; i = (i + 1) % nlockers) { if (!ISSET_MAP(*deadp, i)) continue; switch (atype) { case DB_LOCK_OLDEST: if (__dd_isolder(idmap[killid].id, idmap[i].id, lock_max, txn_max)) continue; keeper = i; break; case DB_LOCK_YOUNGEST: if (__dd_isolder(idmap[i].id, idmap[killid].id, lock_max, txn_max)) continue; keeper = i; break; case DB_LOCK_MAXLOCKS: if (idmap[i].count < idmap[killid].count) continue; keeper = i; break; case DB_LOCK_MINLOCKS: case DB_LOCK_MINWRITE: if (idmap[i].count > idmap[killid].count) continue; keeper = i; break; default: killid = BAD_KILLID; ret = EINVAL; goto dokill; } if (__dd_verify(idmap, *deadp, tmpmap, copymap, nlockers, nalloc, i)) killid = i; }dokill: if (killid == BAD_KILLID) continue; /* * There are cases in which our general algorithm will * fail. Returning 1 from verify indicates that the * particular locker is not only involved in a deadlock, * but that killing him will allow others to make forward * progress. Unfortunately, there are cases where we need * to abort someone, but killing them will not necessarily * ensure forward progress (imagine N readers all trying to * acquire a write lock). In such a scenario, we'll have * gotten all the way through the loop, we will have found * someone to keep (keeper will be valid), but killid will * still be the initial deadlocker. In this case, if the * initial killid satisfies __dd_verify, kill it, else abort * keeper and indicate that we need to run deadlock detection * again. */ if (keeper != BAD_KILLID && killid == limit && __dd_verify(idmap, *deadp, tmpmap, copymap, nlockers, nalloc, killid) == 0) { LOCKREGION(dbenv, lt); region->need_dd = 1; UNLOCKREGION(dbenv, lt); killid = keeper; } /* Kill the locker with lockid idmap[killid]. */ if ((ret = __dd_abort(dbenv, &idmap[killid])) != 0) { /* * It's possible that the lock was already aborted; * this isn't necessarily a problem, so do not treat * it as an error. */ if (ret == DB_ALREADY_ABORTED) ret = 0; else __db_err(dbenv, "warning: unable to abort locker %lx", (u_long)idmap[killid].id); } else if (FLD_ISSET(dbenv->verbose, DB_VERB_DEADLOCK)) __db_err(dbenv, "Aborting locker %lx", (u_long)idmap[killid].id); } __os_free(dbenv, tmpmap);err1: __os_free(dbenv, copymap);err: if (free_me != NULL) __os_free(dbenv, free_me); __os_free(dbenv, bitmap); __os_free(dbenv, idmap); return (ret);}/* * ======================================================================== * Utilities */# define DD_INVALID_ID ((u_int32_t) -1)static int__dd_build(dbenv, atype, bmp, nlockers, allocp, idmap) DB_ENV *dbenv; u_int32_t atype, **bmp, *nlockers, *allocp; locker_info **idmap;{ struct __db_lock *lp; DB_LOCKER *lip, *lockerp, *child; DB_LOCKOBJ *op, *lo; DB_LOCKREGION *region; DB_LOCKTAB *lt; locker_info *id_array; db_timeval_t now; u_int32_t *bitmap, count, dd, *entryp, id, ndx, nentries, *tmpmap; u_int8_t *pptr; int expire_only, is_first, need_timeout, ret; lt = dbenv->lk_handle; region = lt->reginfo.primary; LOCK_SET_TIME_INVALID(&now); need_timeout = 0; expire_only = atype == DB_LOCK_EXPIRE; /* * While we always check for expired timeouts, if we are called * with DB_LOCK_EXPIRE, then we are only checking for timeouts * (i.e., not doing deadlock detection at all). If we aren't * doing real deadlock detection, then we can skip a significant, * amount of the processing. In particular we do not build * the conflict array and our caller needs to expect this. */ if (expire_only) { count = 0; nentries = 0; goto obj_loop; } /* * We'll check how many lockers there are, add a few more in for * good measure and then allocate all the structures. Then we'll * verify that we have enough room when we go back in and get the * mutex the second time. */retry: count = region->stat.st_nlockers; if (count == 0) { *nlockers = 0; return (0); } if (FLD_ISSET(dbenv->verbose, DB_VERB_DEADLOCK)) __db_err(dbenv, "%lu lockers", (u_long)count); count += 20; nentries = ALIGN(count, 32) / 32; /* * Allocate enough space for a count by count bitmap matrix. * * XXX * We can probably save the malloc's between iterations just * reallocing if necessary because count grew by too much. */ if ((ret = __os_calloc(dbenv, (size_t)count, sizeof(u_int32_t) * nentries, &bitmap)) != 0) return (ret); if ((ret = __os_calloc(dbenv, sizeof(u_int32_t), nentries, &tmpmap)) != 0) { __os_free(dbenv, bitmap); return (ret); } if ((ret = __os_calloc(dbenv, (size_t)count, sizeof(locker_info), &id_array)) != 0) { __os_free(dbenv, bitmap); __os_free(dbenv, tmpmap); return (ret); } /* * Now go back in and actually fill in the matrix. */ if (region->stat.st_nlockers > count) { __os_free(dbenv, bitmap); __os_free(dbenv, tmpmap); __os_free(dbenv, id_array); goto retry; } /* * First we go through and assign each locker a deadlock detector id. */ for (id = 0, lip = SH_TAILQ_FIRST(®ion->lockers, __db_locker); lip != NULL; lip = SH_TAILQ_NEXT(lip, ulinks, __db_locker)) { if (F_ISSET(lip, DB_LOCKER_INABORT)) continue; if (lip->master_locker == INVALID_ROFF) { lip->dd_id = id++; id_array[lip->dd_id].id = lip->id; if (atype == DB_LOCK_MINLOCKS || atype == DB_LOCK_MAXLOCKS) id_array[lip->dd_id].count = lip->nlocks; if (atype == DB_LOCK_MINWRITE) id_array[lip->dd_id].count = lip->nwrites; } else lip->dd_id = DD_INVALID_ID; } /* * We only need consider objects that have waiters, so we use * the list of objects with waiters (dd_objs) instead of traversing * the entire hash table. For each object, we traverse the waiters * list and add an entry in the waitsfor matrix for each waiter/holder * combination. */obj_loop: for (op = SH_TAILQ_FIRST(®ion->dd_objs, __db_lockobj); op != NULL; op = SH_TAILQ_NEXT(op, dd_links, __db_lockobj)) { if (expire_only) goto look_waiters; CLEAR_MAP(tmpmap, nentries); /* * First we go through and create a bit map that * represents all the holders of this object. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -