📄 dlmunlock.c
字号:
/* -*- mode: c; c-basic-offset: 8; -*- * vim: noexpandtab sw=8 ts=8 sts=0: * * dlmunlock.c * * underlying calls for unlocking locks * * Copyright (C) 2004 Oracle. All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this program; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 021110-1307, USA. * */#include <linux/module.h>#include <linux/fs.h>#include <linux/types.h>#include <linux/slab.h>#include <linux/highmem.h>#include <linux/utsname.h>#include <linux/init.h>#include <linux/sysctl.h>#include <linux/random.h>#include <linux/blkdev.h>#include <linux/socket.h>#include <linux/inet.h>#include <linux/spinlock.h>#include <linux/delay.h>#include "cluster/heartbeat.h"#include "cluster/nodemanager.h"#include "cluster/tcp.h"#include "dlmapi.h"#include "dlmcommon.h"#define MLOG_MASK_PREFIX ML_DLM#include "cluster/masklog.h"#define DLM_UNLOCK_FREE_LOCK 0x00000001#define DLM_UNLOCK_CALL_AST 0x00000002#define DLM_UNLOCK_REMOVE_LOCK 0x00000004#define DLM_UNLOCK_REGRANT_LOCK 0x00000008#define DLM_UNLOCK_CLEAR_CONVERT_TYPE 0x00000010static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int *actions);static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int *actions);static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int flags, u8 owner);/* * according to the spec: * http://opendlm.sourceforge.net/cvsmirror/opendlm/docs/dlmbook_final.pdf * * flags & LKM_CANCEL != 0: must be converting or blocked * flags & LKM_CANCEL == 0: must be granted * * So to unlock a converting lock, you must first cancel the * convert (passing LKM_CANCEL in flags), then call the unlock * again (with no LKM_CANCEL in flags). *//* * locking: * caller needs: none * taken: res->spinlock and lock->spinlock taken and dropped * held on exit: none * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network * all callers should have taken an extra ref on lock coming in */static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int flags, int *call_ast, int master_node){ enum dlm_status status; int actions = 0; int in_use; u8 owner; mlog(0, "master_node = %d, valblk = %d\n", master_node, flags & LKM_VALBLK); if (master_node) BUG_ON(res->owner != dlm->node_num); else BUG_ON(res->owner == dlm->node_num); spin_lock(&dlm->spinlock); /* We want to be sure that we're not freeing a lock * that still has AST's pending... */ in_use = !list_empty(&lock->ast_list); spin_unlock(&dlm->spinlock); if (in_use) { mlog(ML_ERROR, "lockres %.*s: Someone is calling dlmunlock " "while waiting for an ast!", res->lockname.len, res->lockname.name); return DLM_BADPARAM; } spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_IN_PROGRESS) { if (master_node) { mlog(ML_ERROR, "lockres in progress!\n"); spin_unlock(&res->spinlock); return DLM_FORWARD; } /* ok for this to sleep if not in a network handler */ __dlm_wait_on_lockres(res); res->state |= DLM_LOCK_RES_IN_PROGRESS; } spin_lock(&lock->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { status = DLM_RECOVERING; goto leave; } /* see above for what the spec says about * LKM_CANCEL and the lock queue state */ if (flags & LKM_CANCEL) status = dlm_get_cancel_actions(dlm, res, lock, lksb, &actions); else status = dlm_get_unlock_actions(dlm, res, lock, lksb, &actions); if (status != DLM_NORMAL) goto leave; /* By now this has been masked out of cancel requests. */ if (flags & LKM_VALBLK) { /* make the final update to the lvb */ if (master_node) memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN); else flags |= LKM_PUT_LVB; /* let the send function * handle it. */ } if (!master_node) { owner = res->owner; /* drop locks and send message */ if (flags & LKM_CANCEL) lock->cancel_pending = 1; else lock->unlock_pending = 1; spin_unlock(&lock->spinlock); spin_unlock(&res->spinlock); status = dlm_send_remote_unlock_request(dlm, res, lock, lksb, flags, owner); spin_lock(&res->spinlock); spin_lock(&lock->spinlock); /* if the master told us the lock was already granted, * let the ast handle all of these actions */ if (status == DLM_NORMAL && lksb->status == DLM_CANCELGRANT) { actions &= ~(DLM_UNLOCK_REMOVE_LOCK| DLM_UNLOCK_REGRANT_LOCK| DLM_UNLOCK_CLEAR_CONVERT_TYPE); } else if (status == DLM_RECOVERING || status == DLM_MIGRATING || status == DLM_FORWARD) { /* must clear the actions because this unlock * is about to be retried. cannot free or do * any list manipulation. */ mlog(0, "%s:%.*s: clearing actions, %s\n", dlm->name, res->lockname.len, res->lockname.name, status==DLM_RECOVERING?"recovering": (status==DLM_MIGRATING?"migrating": "forward")); actions = 0; } if (flags & LKM_CANCEL) lock->cancel_pending = 0; else lock->unlock_pending = 0; } /* get an extra ref on lock. if we are just switching * lists here, we dont want the lock to go away. */ dlm_lock_get(lock); if (actions & DLM_UNLOCK_REMOVE_LOCK) { list_del_init(&lock->list); dlm_lock_put(lock); } if (actions & DLM_UNLOCK_REGRANT_LOCK) { dlm_lock_get(lock); list_add_tail(&lock->list, &res->granted); } if (actions & DLM_UNLOCK_CLEAR_CONVERT_TYPE) { mlog(0, "clearing convert_type at %smaster node\n", master_node ? "" : "non-"); lock->ml.convert_type = LKM_IVMODE; } /* remove the extra ref on lock */ dlm_lock_put(lock);leave: res->state &= ~DLM_LOCK_RES_IN_PROGRESS; if (!dlm_lock_on_list(&res->converting, lock)) BUG_ON(lock->ml.convert_type != LKM_IVMODE); else BUG_ON(lock->ml.convert_type == LKM_IVMODE); spin_unlock(&lock->spinlock); spin_unlock(&res->spinlock); wake_up(&res->wq); /* let the caller's final dlm_lock_put handle the actual kfree */ if (actions & DLM_UNLOCK_FREE_LOCK) { /* this should always be coupled with list removal */ BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK)); mlog(0, "lock %"MLFu64" should be gone now! refs=%d\n", lock->ml.cookie, atomic_read(&lock->lock_refs.refcount)-1); dlm_lock_put(lock); } if (actions & DLM_UNLOCK_CALL_AST) *call_ast = 1; /* if cancel or unlock succeeded, lvb work is done */ if (status == DLM_NORMAL) lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB); return status;}void dlm_commit_pending_unlock(struct dlm_lock_resource *res, struct dlm_lock *lock){ /* leave DLM_LKSB_PUT_LVB on the lksb so any final * update of the lvb will be sent to the new master */ list_del_init(&lock->list);}void dlm_commit_pending_cancel(struct dlm_lock_resource *res, struct dlm_lock *lock){ list_del_init(&lock->list); list_add_tail(&lock->list, &res->granted); lock->ml.convert_type = LKM_IVMODE;}static inline enum dlm_status dlmunlock_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int flags, int *call_ast){ return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);}static inline enum dlm_status dlmunlock_remote(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int flags, int *call_ast){ return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);}/* * locking: * caller needs: none * taken: none * held on exit: none * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network */static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int flags, u8 owner){ struct dlm_unlock_lock unlock; int tmpret; enum dlm_status ret; int status = 0; struct kvec vec[2]; size_t veclen = 1; mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); memset(&unlock, 0, sizeof(unlock)); unlock.node_idx = dlm->node_num; unlock.flags = cpu_to_be32(flags); unlock.cookie = lock->ml.cookie; unlock.namelen = res->lockname.len; memcpy(unlock.name, res->lockname.name, unlock.namelen); vec[0].iov_len = sizeof(struct dlm_unlock_lock); vec[0].iov_base = &unlock; if (flags & LKM_PUT_LVB) { /* extra data to send if we are updating lvb */ vec[1].iov_len = DLM_LVB_LEN; vec[1].iov_base = lock->lksb->lvb; veclen++; } tmpret = o2net_send_message_vec(DLM_UNLOCK_LOCK_MSG, dlm->key, vec, veclen, owner, &status); if (tmpret >= 0) { // successfully sent and received if (status == DLM_CANCELGRANT) ret = DLM_NORMAL; else if (status == DLM_FORWARD) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -