📄 lock.c
字号:
/***************************************************************************************************************************************************************** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.**** This copyrighted material is made available to anyone wishing to use,** modify, copy, or redistribute it subject to the terms and conditions** of the GNU General Public License v.2.***************************************************************************************************************************************************************//* Central locking logic has four stages: dlm_lock() dlm_unlock() request_lock(ls, lkb) convert_lock(ls, lkb) unlock_lock(ls, lkb) cancel_lock(ls, lkb) _request_lock(r, lkb) _convert_lock(r, lkb) _unlock_lock(r, lkb) _cancel_lock(r, lkb) do_request(r, lkb) do_convert(r, lkb) do_unlock(r, lkb) do_cancel(r, lkb) Stage 1 (lock, unlock) is mainly about checking input args and splitting into one of the four main operations: dlm_lock = request_lock dlm_lock+CONVERT = convert_lock dlm_unlock = unlock_lock dlm_unlock+CANCEL = cancel_lock Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is provided to the next stage. Stage 3, _xxxx_lock(), determines if the operation is local or remote. When remote, it calls send_xxxx(), when local it calls do_xxxx(). Stage 4, do_xxxx(), is the guts of the operation. It manipulates the given rsb and lkb and queues callbacks. For remote operations, send_xxxx() results in the corresponding do_xxxx() function being executed on the remote node. The connecting send/receive calls on local (L) and remote (R) nodes: L: send_xxxx() -> R: receive_xxxx() R: do_xxxx() L: receive_xxxx_reply() <- R: send_xxxx_reply()*/#include <linux/types.h>#include "dlm_internal.h"#include <linux/dlm_device.h>#include "memory.h"#include "lowcomms.h"#include "requestqueue.h"#include "util.h"#include "dir.h"#include "member.h"#include "lockspace.h"#include "ast.h"#include "lock.h"#include "rcom.h"#include "recover.h"#include "lvb_table.h"#include "user.h"#include "config.h"static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_remove(struct dlm_rsb *r);static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms);static int receive_extralen(struct dlm_message *ms);static void do_purge(struct dlm_ls *ls, int nodeid, int pid);static void del_timeout(struct dlm_lkb *lkb);void dlm_timeout_warn(struct dlm_lkb *lkb);/* * Lock compatibilty matrix - thanks Steve * UN = Unlocked state. Not really a state, used as a flag * PD = Padding. Used to make the matrix a nice power of two in size * Other states are the same as the VMS DLM. * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) */static const int __dlm_compat_matrix[8][8] = { /* UN NL CR CW PR PW EX PD */ {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ {0, 0, 0, 0, 0, 0, 0, 0} /* PD */};/* * This defines the direction of transfer of LVB data. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1] * 1 = LVB is returned to the caller * 0 = LVB is written to the resource * -1 = nothing happens to the LVB */const int dlm_lvb_operations[8][8] = { /* UN NL CR CW PR PW EX PD*/ { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */};#define modes_compat(gr, rq) \ __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]int dlm_modes_compat(int mode1, int mode2){ return __dlm_compat_matrix[mode1 + 1][mode2 + 1];}/* * Compatibility matrix for conversions with QUECVT set. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1] */static const int __quecvt_compat_matrix[8][8] = { /* UN NL CR CW PR PW EX PD */ {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ {0, 0, 0, 0, 0, 0, 0, 0} /* PD */};void dlm_print_lkb(struct dlm_lkb *lkb){ printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);}void dlm_print_rsb(struct dlm_rsb *r){ printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", r->res_nodeid, r->res_flags, r->res_first_lkid, r->res_recover_locks_count, r->res_name);}void dlm_dump_rsb(struct dlm_rsb *r){ struct dlm_lkb *lkb; dlm_print_rsb(r); printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); printk(KERN_ERR "rsb lookup list\n"); list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) dlm_print_lkb(lkb); printk(KERN_ERR "rsb grant queue:\n"); list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) dlm_print_lkb(lkb); printk(KERN_ERR "rsb convert queue:\n"); list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) dlm_print_lkb(lkb); printk(KERN_ERR "rsb wait queue:\n"); list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) dlm_print_lkb(lkb);}/* Threads cannot use the lockspace while it's being recovered */static inline void dlm_lock_recovery(struct dlm_ls *ls){ down_read(&ls->ls_in_recovery);}void dlm_unlock_recovery(struct dlm_ls *ls){ up_read(&ls->ls_in_recovery);}int dlm_lock_recovery_try(struct dlm_ls *ls){ return down_read_trylock(&ls->ls_in_recovery);}static inline int can_be_queued(struct dlm_lkb *lkb){ return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);}static inline int force_blocking_asts(struct dlm_lkb *lkb){ return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);}static inline int is_demoted(struct dlm_lkb *lkb){ return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);}static inline int is_altmode(struct dlm_lkb *lkb){ return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);}static inline int is_granted(struct dlm_lkb *lkb){ return (lkb->lkb_status == DLM_LKSTS_GRANTED);}static inline int is_remote(struct dlm_rsb *r){ DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); return !!r->res_nodeid;}static inline int is_process_copy(struct dlm_lkb *lkb){ return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));}static inline int is_master_copy(struct dlm_lkb *lkb){ if (lkb->lkb_flags & DLM_IFL_MSTCPY) DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;}static inline int middle_conversion(struct dlm_lkb *lkb){ if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) return 1; return 0;}static inline int down_conversion(struct dlm_lkb *lkb){ return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);}static inline int is_overlap_unlock(struct dlm_lkb *lkb){ return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;}static inline int is_overlap_cancel(struct dlm_lkb *lkb){ return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;}static inline int is_overlap(struct dlm_lkb *lkb){ return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | DLM_IFL_OVERLAP_CANCEL));}static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv){ if (is_master_copy(lkb)) return; del_timeout(lkb); DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); /* if the operation was a cancel, then return -DLM_ECANCEL, if a timeout caused the cancel then return -ETIMEDOUT */ if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; rv = -ETIMEDOUT; } if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; rv = -EDEADLK; } lkb->lkb_lksb->sb_status = rv; lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; dlm_add_ast(lkb, AST_COMP);}static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb){ queue_cast(r, lkb, is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);}static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode){ if (is_master_copy(lkb)) send_bast(r, lkb, rqmode); else { lkb->lkb_bastmode = rqmode; dlm_add_ast(lkb, AST_BAST); }}/* * Basic operations on rsb's and lkb's */static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len){ struct dlm_rsb *r; r = allocate_rsb(ls, len); if (!r) return NULL; r->res_ls = ls; r->res_length = len; memcpy(r->res_name, name, len); mutex_init(&r->res_mutex); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); INIT_LIST_HEAD(&r->res_recover_list); return r;}static int search_rsb_list(struct list_head *head, char *name, int len, unsigned int flags, struct dlm_rsb **r_ret){ struct dlm_rsb *r; int error = 0; list_for_each_entry(r, head, res_hashchain) { if (len == r->res_length && !memcmp(name, r->res_name, len)) goto found; } return -EBADR; found: if (r->res_nodeid && (flags & R_MASTER)) error = -ENOTBLK; *r_ret = r; return error;}static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, unsigned int flags, struct dlm_rsb **r_ret){ struct dlm_rsb *r; int error; error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); if (!error) { kref_get(&r->res_ref); goto out; } error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); if (error) goto out; list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); if (dlm_no_directory(ls)) goto out; if (r->res_nodeid == -1) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; } else if (r->res_nodeid > 0) { rsb_set_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; } else { DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); } out: *r_ret = r; return error;}static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, unsigned int flags, struct dlm_rsb **r_ret){ int error; write_lock(&ls->ls_rsbtbl[b].lock); error = _search_rsb(ls, name, len, b, flags, r_ret); write_unlock(&ls->ls_rsbtbl[b].lock); return error;}/* * Find rsb in rsbtbl and potentially create/add one * * Delaying the release of rsb's has a similar benefit to applications keeping * NL locks on an rsb, but without the guarantee that the cached master value * will still be valid when the rsb is reused. Apps aren't always smart enough * to keep NL locks on an rsb that they may lock again shortly; this can lead * to excessive master lookups and removals if we don't delay the release. * * Searching for an rsb means looking through both the normal list and toss * list. When found on the toss list the rsb is moved to the normal list with * ref count of 1; when found on normal list the ref count is incremented. */static int find_rsb(struct dlm_ls *ls, char *name, int namelen, unsigned int flags, struct dlm_rsb **r_ret){ struct dlm_rsb *r, *tmp; uint32_t hash, bucket; int error = 0; if (dlm_no_directory(ls)) flags |= R_CREATE; hash = jhash(name, namelen, 0); bucket = hash & (ls->ls_rsbtbl_size - 1); error = search_rsb(ls, name, namelen, bucket, flags, &r); if (!error) goto out; if (error == -EBADR && !(flags & R_CREATE)) goto out; /* the rsb was found but wasn't a master copy */ if (error == -ENOTBLK) goto out; error = -ENOMEM; r = create_rsb(ls, name, namelen); if (!r) goto out; r->res_hash = hash; r->res_bucket = bucket; r->res_nodeid = -1; kref_init(&r->res_ref); /* With no directory, the master can be set immediately */ if (dlm_no_directory(ls)) { int nodeid = dlm_dir_nodeid(r); if (nodeid == dlm_our_nodeid()) nodeid = 0; r->res_nodeid = nodeid; } write_lock(&ls->ls_rsbtbl[bucket].lock); error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); if (!error) { write_unlock(&ls->ls_rsbtbl[bucket].lock); free_rsb(r); r = tmp; goto out; } list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); write_unlock(&ls->ls_rsbtbl[bucket].lock); error = 0; out: *r_ret = r; return error;}int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, unsigned int flags, struct dlm_rsb **r_ret){ return find_rsb(ls, name, namelen, flags, r_ret);}/* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */static inline void hold_rsb(struct dlm_rsb *r){ kref_get(&r->res_ref);}void dlm_hold_rsb(struct dlm_rsb *r){ hold_rsb(r);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -