⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lock.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
/*****************************************************************************************************************************************************************  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.****  This copyrighted material is made available to anyone wishing to use,**  modify, copy, or redistribute it subject to the terms and conditions**  of the GNU General Public License v.2.***************************************************************************************************************************************************************//* Central locking logic has four stages:   dlm_lock()   dlm_unlock()   request_lock(ls, lkb)   convert_lock(ls, lkb)   unlock_lock(ls, lkb)   cancel_lock(ls, lkb)   _request_lock(r, lkb)   _convert_lock(r, lkb)   _unlock_lock(r, lkb)   _cancel_lock(r, lkb)   do_request(r, lkb)   do_convert(r, lkb)   do_unlock(r, lkb)   do_cancel(r, lkb)   Stage 1 (lock, unlock) is mainly about checking input args and   splitting into one of the four main operations:       dlm_lock          = request_lock       dlm_lock+CONVERT  = convert_lock       dlm_unlock        = unlock_lock       dlm_unlock+CANCEL = cancel_lock   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is   provided to the next stage.   Stage 3, _xxxx_lock(), determines if the operation is local or remote.   When remote, it calls send_xxxx(), when local it calls do_xxxx().   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the   given rsb and lkb and queues callbacks.   For remote operations, send_xxxx() results in the corresponding do_xxxx()   function being executed on the remote node.  The connecting send/receive   calls on local (L) and remote (R) nodes:   L: send_xxxx()              ->  R: receive_xxxx()                                   R: do_xxxx()   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()*/#include <linux/types.h>#include "dlm_internal.h"#include <linux/dlm_device.h>#include "memory.h"#include "lowcomms.h"#include "requestqueue.h"#include "util.h"#include "dir.h"#include "member.h"#include "lockspace.h"#include "ast.h"#include "lock.h"#include "rcom.h"#include "recover.h"#include "lvb_table.h"#include "user.h"#include "config.h"static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);static int send_remove(struct dlm_rsb *r);static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,				    struct dlm_message *ms);static int receive_extralen(struct dlm_message *ms);static void do_purge(struct dlm_ls *ls, int nodeid, int pid);static void del_timeout(struct dlm_lkb *lkb);void dlm_timeout_warn(struct dlm_lkb *lkb);/* * Lock compatibilty matrix - thanks Steve * UN = Unlocked state. Not really a state, used as a flag * PD = Padding. Used to make the matrix a nice power of two in size * Other states are the same as the VMS DLM. * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same) */static const int __dlm_compat_matrix[8][8] = {      /* UN NL CR CW PR PW EX PD */        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */};/* * This defines the direction of transfer of LVB data. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1] * 1 = LVB is returned to the caller * 0 = LVB is written to the resource * -1 = nothing happens to the LVB */const int dlm_lvb_operations[8][8] = {        /* UN   NL  CR  CW  PR  PW  EX  PD*/        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */};#define modes_compat(gr, rq) \	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]int dlm_modes_compat(int mode1, int mode2){	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];}/* * Compatibility matrix for conversions with QUECVT set. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1] */static const int __quecvt_compat_matrix[8][8] = {      /* UN NL CR CW PR PW EX PD */        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */};void dlm_print_lkb(struct dlm_lkb *lkb){	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);}void dlm_print_rsb(struct dlm_rsb *r){	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",	       r->res_nodeid, r->res_flags, r->res_first_lkid,	       r->res_recover_locks_count, r->res_name);}void dlm_dump_rsb(struct dlm_rsb *r){	struct dlm_lkb *lkb;	dlm_print_rsb(r);	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));	printk(KERN_ERR "rsb lookup list\n");	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)		dlm_print_lkb(lkb);	printk(KERN_ERR "rsb grant queue:\n");	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)		dlm_print_lkb(lkb);	printk(KERN_ERR "rsb convert queue:\n");	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)		dlm_print_lkb(lkb);	printk(KERN_ERR "rsb wait queue:\n");	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)		dlm_print_lkb(lkb);}/* Threads cannot use the lockspace while it's being recovered */static inline void dlm_lock_recovery(struct dlm_ls *ls){	down_read(&ls->ls_in_recovery);}void dlm_unlock_recovery(struct dlm_ls *ls){	up_read(&ls->ls_in_recovery);}int dlm_lock_recovery_try(struct dlm_ls *ls){	return down_read_trylock(&ls->ls_in_recovery);}static inline int can_be_queued(struct dlm_lkb *lkb){	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);}static inline int force_blocking_asts(struct dlm_lkb *lkb){	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);}static inline int is_demoted(struct dlm_lkb *lkb){	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);}static inline int is_altmode(struct dlm_lkb *lkb){	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);}static inline int is_granted(struct dlm_lkb *lkb){	return (lkb->lkb_status == DLM_LKSTS_GRANTED);}static inline int is_remote(struct dlm_rsb *r){	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););	return !!r->res_nodeid;}static inline int is_process_copy(struct dlm_lkb *lkb){	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));}static inline int is_master_copy(struct dlm_lkb *lkb){	if (lkb->lkb_flags & DLM_IFL_MSTCPY)		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;}static inline int middle_conversion(struct dlm_lkb *lkb){	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))		return 1;	return 0;}static inline int down_conversion(struct dlm_lkb *lkb){	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);}static inline int is_overlap_unlock(struct dlm_lkb *lkb){	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;}static inline int is_overlap_cancel(struct dlm_lkb *lkb){	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;}static inline int is_overlap(struct dlm_lkb *lkb){	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |				  DLM_IFL_OVERLAP_CANCEL));}static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv){	if (is_master_copy(lkb))		return;	del_timeout(lkb);	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););	/* if the operation was a cancel, then return -DLM_ECANCEL, if a	   timeout caused the cancel then return -ETIMEDOUT */	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;		rv = -ETIMEDOUT;	}	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;		rv = -EDEADLK;	}	lkb->lkb_lksb->sb_status = rv;	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;	dlm_add_ast(lkb, AST_COMP);}static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb){	queue_cast(r, lkb,		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);}static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode){	if (is_master_copy(lkb))		send_bast(r, lkb, rqmode);	else {		lkb->lkb_bastmode = rqmode;		dlm_add_ast(lkb, AST_BAST);	}}/* * Basic operations on rsb's and lkb's */static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len){	struct dlm_rsb *r;	r = allocate_rsb(ls, len);	if (!r)		return NULL;	r->res_ls = ls;	r->res_length = len;	memcpy(r->res_name, name, len);	mutex_init(&r->res_mutex);	INIT_LIST_HEAD(&r->res_lookup);	INIT_LIST_HEAD(&r->res_grantqueue);	INIT_LIST_HEAD(&r->res_convertqueue);	INIT_LIST_HEAD(&r->res_waitqueue);	INIT_LIST_HEAD(&r->res_root_list);	INIT_LIST_HEAD(&r->res_recover_list);	return r;}static int search_rsb_list(struct list_head *head, char *name, int len,			   unsigned int flags, struct dlm_rsb **r_ret){	struct dlm_rsb *r;	int error = 0;	list_for_each_entry(r, head, res_hashchain) {		if (len == r->res_length && !memcmp(name, r->res_name, len))			goto found;	}	return -EBADR; found:	if (r->res_nodeid && (flags & R_MASTER))		error = -ENOTBLK;	*r_ret = r;	return error;}static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,		       unsigned int flags, struct dlm_rsb **r_ret){	struct dlm_rsb *r;	int error;	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);	if (!error) {		kref_get(&r->res_ref);		goto out;	}	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);	if (error)		goto out;	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);	if (dlm_no_directory(ls))		goto out;	if (r->res_nodeid == -1) {		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);		r->res_first_lkid = 0;	} else if (r->res_nodeid > 0) {		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);		r->res_first_lkid = 0;	} else {		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);	} out:	*r_ret = r;	return error;}static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,		      unsigned int flags, struct dlm_rsb **r_ret){	int error;	write_lock(&ls->ls_rsbtbl[b].lock);	error = _search_rsb(ls, name, len, b, flags, r_ret);	write_unlock(&ls->ls_rsbtbl[b].lock);	return error;}/* * Find rsb in rsbtbl and potentially create/add one * * Delaying the release of rsb's has a similar benefit to applications keeping * NL locks on an rsb, but without the guarantee that the cached master value * will still be valid when the rsb is reused.  Apps aren't always smart enough * to keep NL locks on an rsb that they may lock again shortly; this can lead * to excessive master lookups and removals if we don't delay the release. * * Searching for an rsb means looking through both the normal list and toss * list.  When found on the toss list the rsb is moved to the normal list with * ref count of 1; when found on normal list the ref count is incremented. */static int find_rsb(struct dlm_ls *ls, char *name, int namelen,		    unsigned int flags, struct dlm_rsb **r_ret){	struct dlm_rsb *r, *tmp;	uint32_t hash, bucket;	int error = 0;	if (dlm_no_directory(ls))		flags |= R_CREATE;	hash = jhash(name, namelen, 0);	bucket = hash & (ls->ls_rsbtbl_size - 1);	error = search_rsb(ls, name, namelen, bucket, flags, &r);	if (!error)		goto out;	if (error == -EBADR && !(flags & R_CREATE))		goto out;	/* the rsb was found but wasn't a master copy */	if (error == -ENOTBLK)		goto out;	error = -ENOMEM;	r = create_rsb(ls, name, namelen);	if (!r)		goto out;	r->res_hash = hash;	r->res_bucket = bucket;	r->res_nodeid = -1;	kref_init(&r->res_ref);	/* With no directory, the master can be set immediately */	if (dlm_no_directory(ls)) {		int nodeid = dlm_dir_nodeid(r);		if (nodeid == dlm_our_nodeid())			nodeid = 0;		r->res_nodeid = nodeid;	}	write_lock(&ls->ls_rsbtbl[bucket].lock);	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);	if (!error) {		write_unlock(&ls->ls_rsbtbl[bucket].lock);		free_rsb(r);		r = tmp;		goto out;	}	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);	write_unlock(&ls->ls_rsbtbl[bucket].lock);	error = 0; out:	*r_ret = r;	return error;}int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,		 unsigned int flags, struct dlm_rsb **r_ret){	return find_rsb(ls, name, namelen, flags, r_ret);}/* This is only called to add a reference when the code already holds   a valid reference to the rsb, so there's no need for locking. */static inline void hold_rsb(struct dlm_rsb *r){	kref_get(&r->res_ref);}void dlm_hold_rsb(struct dlm_rsb *r){	hold_rsb(r);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -