📄 lock.c
字号:
static void toss_rsb(struct kref *kref){ struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); kref_init(&r->res_ref); list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; if (r->res_lvbptr) { free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; }}/* When all references to the rsb are gone it's transfered to the tossed list for later disposal. */static void put_rsb(struct dlm_rsb *r){ struct dlm_ls *ls = r->res_ls; uint32_t bucket = r->res_bucket; write_lock(&ls->ls_rsbtbl[bucket].lock); kref_put(&r->res_ref, toss_rsb); write_unlock(&ls->ls_rsbtbl[bucket].lock);}void dlm_put_rsb(struct dlm_rsb *r){ put_rsb(r);}/* See comment for unhold_lkb */static void unhold_rsb(struct dlm_rsb *r){ int rv; rv = kref_put(&r->res_ref, toss_rsb); DLM_ASSERT(!rv, dlm_dump_rsb(r););}static void kill_rsb(struct kref *kref){ struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); /* All work is done after the return from kref_put() so we can release the write_lock before the remove and free. */ DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););}/* Attaching/detaching lkb's from rsb's is for rsb reference counting. The rsb must exist as long as any lkb's for it do. */static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb){ hold_rsb(r); lkb->lkb_resource = r;}static void detach_lkb(struct dlm_lkb *lkb){ if (lkb->lkb_resource) { put_rsb(lkb->lkb_resource); lkb->lkb_resource = NULL; }}static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret){ struct dlm_lkb *lkb, *tmp; uint32_t lkid = 0; uint16_t bucket; lkb = allocate_lkb(ls); if (!lkb) return -ENOMEM; lkb->lkb_nodeid = -1; lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); INIT_LIST_HEAD(&lkb->lkb_time_list); get_random_bytes(&bucket, sizeof(bucket)); bucket &= (ls->ls_lkbtbl_size - 1); write_lock(&ls->ls_lkbtbl[bucket].lock); /* counter can roll over so we must verify lkid is not in use */ while (lkid == 0) { lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { if (tmp->lkb_id != lkid) continue; lkid = 0; break; } } lkb->lkb_id = lkid; list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list); write_unlock(&ls->ls_lkbtbl[bucket].lock); *lkb_ret = lkb; return 0;}static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid){ struct dlm_lkb *lkb; uint16_t bucket = (lkid >> 16); list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { if (lkb->lkb_id == lkid) return lkb; } return NULL;}static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret){ struct dlm_lkb *lkb; uint16_t bucket = (lkid >> 16); if (bucket >= ls->ls_lkbtbl_size) return -EBADSLT; read_lock(&ls->ls_lkbtbl[bucket].lock); lkb = __find_lkb(ls, lkid); if (lkb) kref_get(&lkb->lkb_ref); read_unlock(&ls->ls_lkbtbl[bucket].lock); *lkb_ret = lkb; return lkb ? 0 : -ENOENT;}static void kill_lkb(struct kref *kref){ struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); /* All work is done after the return from kref_put() so we can release the write_lock before the detach_lkb */ DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););}/* __put_lkb() is used when an lkb may not have an rsb attached to it so we need to provide the lockspace explicitly */static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb){ uint16_t bucket = (lkb->lkb_id >> 16); write_lock(&ls->ls_lkbtbl[bucket].lock); if (kref_put(&lkb->lkb_ref, kill_lkb)) { list_del(&lkb->lkb_idtbl_list); write_unlock(&ls->ls_lkbtbl[bucket].lock); detach_lkb(lkb); /* for local/process lkbs, lvbptr points to caller's lksb */ if (lkb->lkb_lvbptr && is_master_copy(lkb)) free_lvb(lkb->lkb_lvbptr); free_lkb(lkb); return 1; } else { write_unlock(&ls->ls_lkbtbl[bucket].lock); return 0; }}int dlm_put_lkb(struct dlm_lkb *lkb){ struct dlm_ls *ls; DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); ls = lkb->lkb_resource->res_ls; return __put_lkb(ls, lkb);}/* This is only called to add a reference when the code already holds a valid reference to the lkb, so there's no need for locking. */static inline void hold_lkb(struct dlm_lkb *lkb){ kref_get(&lkb->lkb_ref);}/* This is called when we need to remove a reference and are certain it's not the last ref. e.g. del_lkb is always called between a find_lkb/put_lkb and is always the inverse of a previous add_lkb. put_lkb would work fine, but would involve unnecessary locking */static inline void unhold_lkb(struct dlm_lkb *lkb){ int rv; rv = kref_put(&lkb->lkb_ref, kill_lkb); DLM_ASSERT(!rv, dlm_print_lkb(lkb););}static void lkb_add_ordered(struct list_head *new, struct list_head *head, int mode){ struct dlm_lkb *lkb = NULL; list_for_each_entry(lkb, head, lkb_statequeue) if (lkb->lkb_rqmode < mode) break; if (!lkb) list_add_tail(new, head); else __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);}/* add/remove lkb to rsb's grant/convert/wait queue */static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status){ kref_get(&lkb->lkb_ref); DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); lkb->lkb_status = status; switch (status) { case DLM_LKSTS_WAITING: if (lkb->lkb_exflags & DLM_LKF_HEADQUE) list_add(&lkb->lkb_statequeue, &r->res_waitqueue); else list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); break; case DLM_LKSTS_GRANTED: /* convention says granted locks kept in order of grmode */ lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, lkb->lkb_grmode); break; case DLM_LKSTS_CONVERT: if (lkb->lkb_exflags & DLM_LKF_HEADQUE) list_add(&lkb->lkb_statequeue, &r->res_convertqueue); else list_add_tail(&lkb->lkb_statequeue, &r->res_convertqueue); break; default: DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); }}static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb){ lkb->lkb_status = 0; list_del(&lkb->lkb_statequeue); unhold_lkb(lkb);}static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts){ hold_lkb(lkb); del_lkb(r, lkb); add_lkb(r, lkb, sts); unhold_lkb(lkb);}static int msg_reply_type(int mstype){ switch (mstype) { case DLM_MSG_REQUEST: return DLM_MSG_REQUEST_REPLY; case DLM_MSG_CONVERT: return DLM_MSG_CONVERT_REPLY; case DLM_MSG_UNLOCK: return DLM_MSG_UNLOCK_REPLY; case DLM_MSG_CANCEL: return DLM_MSG_CANCEL_REPLY; case DLM_MSG_LOOKUP: return DLM_MSG_LOOKUP_REPLY; } return -1;}/* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */static int add_to_waiters(struct dlm_lkb *lkb, int mstype){ struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error = 0; mutex_lock(&ls->ls_waiters_mutex); if (is_overlap_unlock(lkb) || (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { error = -EINVAL; goto out; } if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK: lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; break; case DLM_MSG_CANCEL: lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; break; default: error = -EBUSY; goto out; } lkb->lkb_wait_count++; hold_lkb(lkb); log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", lkb->lkb_id, lkb->lkb_wait_type, mstype, lkb->lkb_wait_count, lkb->lkb_flags); goto out; } DLM_ASSERT(!lkb->lkb_wait_count, dlm_print_lkb(lkb); printk("wait_count %d\n", lkb->lkb_wait_count);); lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: if (error) log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", lkb->lkb_id, error, lkb->lkb_flags, mstype, lkb->lkb_wait_type, lkb->lkb_resource->res_name); mutex_unlock(&ls->ls_waiters_mutex); return error;}/* We clear the RESEND flag because we might be taking an lkb off the waiters list as part of process_requestqueue (e.g. a lookup that has an optimized request reply on the requestqueue) between dlm_recover_waiters_pre() which set RESEND and dlm_recover_waiters_post() */static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype){ struct dlm_ls *ls = lkb->lkb_resource->res_ls; int overlap_done = 0; if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; overlap_done = 1; goto out_del; } if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; overlap_done = 1; goto out_del; } /* N.B. type of reply may not always correspond to type of original msg due to lookup->request optimization, verify others? */ if (lkb->lkb_wait_type) { lkb->lkb_wait_type = 0; goto out_del; } log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); return -1; out_del: /* the force-unlock/cancel has completed and we haven't recvd a reply to the op that was in progress prior to the unlock/cancel; we give up on any reply to the earlier op. FIXME: not sure when/how this would happen */ if (overlap_done && lkb->lkb_wait_type) { log_error(ls, "remove_from_waiters %x reply %d give up on %d", lkb->lkb_id, mstype, lkb->lkb_wait_type); lkb->lkb_wait_count--; lkb->lkb_wait_type = 0; } DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); lkb->lkb_flags &= ~DLM_IFL_RESEND; lkb->lkb_wait_count--; if (!lkb->lkb_wait_count) list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); return 0;}static int remove_from_waiters(struct dlm_lkb *lkb, int mstype){ struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; mutex_lock(&ls->ls_waiters_mutex); error = _remove_from_waiters(lkb, mstype); mutex_unlock(&ls->ls_waiters_mutex); return error;}/* Handles situations where we might be processing a "fake" or "stub" reply in which we can't try to take waiters_mutex again. */static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms){ struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; if (ms != &ls->ls_stub_ms) mutex_lock(&ls->ls_waiters_mutex); error = _remove_from_waiters(lkb, ms->m_type); if (ms != &ls->ls_stub_ms) mutex_unlock(&ls->ls_waiters_mutex); return error;}static void dir_remove(struct dlm_rsb *r){ int to_nodeid; if (dlm_no_directory(r->res_ls)) return; to_nodeid = dlm_dir_nodeid(r); if (to_nodeid != dlm_our_nodeid()) send_remove(r); else dlm_dir_remove_entry(r->res_ls, to_nodeid, r->res_name, r->res_length);}/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is found since they are in order of newest to oldest? */static int shrink_bucket(struct dlm_ls *ls, int b){ struct dlm_rsb *r; int count = 0, found; for (;;) { found = 0; write_lock(&ls->ls_rsbtbl[b].lock); list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, res_hashchain) { if (!time_after_eq(jiffies, r->res_toss_time + dlm_config.ci_toss_secs * HZ)) continue; found = 1; break; } if (!found) { write_unlock(&ls->ls_rsbtbl[b].lock); break; } if (kref_put(&r->res_ref, kill_rsb)) { list_del(&r->res_hashchain); write_unlock(&ls->ls_rsbtbl[b].lock); if (is_master(r)) dir_remove(r); free_rsb(r); count++; } else { write_unlock(&ls->ls_rsbtbl[b].lock); log_error(ls, "tossed rsb in use %s", r->res_name); } } return count;}void dlm_scan_rsbs(struct dlm_ls *ls){ int i; for (i = 0; i < ls->ls_rsbtbl_size; i++) { shrink_bucket(ls, i); if (dlm_locking_stopped(ls)) break; cond_resched(); }}static void add_timeout(struct dlm_lkb *lkb){ struct dlm_ls *ls = lkb->lkb_resource->res_ls; if (is_master_copy(lkb)) { lkb->lkb_timestamp = jiffies; return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -