📄 dlmrecovery.c
字号:
/* -*- mode: c; c-basic-offset: 8; -*- * vim: noexpandtab sw=8 ts=8 sts=0: * * dlmrecovery.c * * recovery stuff * * Copyright (C) 2004 Oracle. All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this program; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 021110-1307, USA. * */#include <linux/module.h>#include <linux/fs.h>#include <linux/types.h>#include <linux/slab.h>#include <linux/highmem.h>#include <linux/utsname.h>#include <linux/init.h>#include <linux/sysctl.h>#include <linux/random.h>#include <linux/blkdev.h>#include <linux/socket.h>#include <linux/inet.h>#include <linux/timer.h>#include <linux/kthread.h>#include <linux/delay.h>#include "cluster/heartbeat.h"#include "cluster/nodemanager.h"#include "cluster/tcp.h"#include "dlmapi.h"#include "dlmcommon.h"#include "dlmdomain.h"#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_RECOVERY)#include "cluster/masklog.h"static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);static int dlm_recovery_thread(void *data);void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);static int dlm_do_recovery(struct dlm_ctxt *dlm);static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, u8 dead_node);static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res);static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, const char *lockname, int namelen, int total_locks, u64 cookie, u8 flags, u8 master);static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, struct dlm_migratable_lockres *mres, u8 send_to, struct dlm_lock_resource *res, int total_locks);static int dlm_process_recovery_data(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_migratable_lockres *mres);static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to);static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, struct list_head *list, u8 dead_node);static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, u8 dead_node, u8 new_master);static void dlm_reco_ast(void *astdata);static void dlm_reco_bast(void *astdata, int blocked_type);static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data);static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 *real_master);static u64 dlm_get_next_mig_cookie(void);static DEFINE_SPINLOCK(dlm_reco_state_lock);static DEFINE_SPINLOCK(dlm_mig_cookie_lock);static u64 dlm_mig_cookie = 1;static u64 dlm_get_next_mig_cookie(void){ u64 c; spin_lock(&dlm_mig_cookie_lock); c = dlm_mig_cookie; if (dlm_mig_cookie == (~0ULL)) dlm_mig_cookie = 1; else dlm_mig_cookie++; spin_unlock(&dlm_mig_cookie_lock); return c;}static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, u8 dead_node){ assert_spin_locked(&dlm->spinlock); if (dlm->reco.dead_node != dead_node) mlog(0, "%s: changing dead_node from %u to %u\n", dlm->name, dlm->reco.dead_node, dead_node); dlm->reco.dead_node = dead_node;}static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, u8 master){ assert_spin_locked(&dlm->spinlock); mlog(0, "%s: changing new_master from %u to %u\n", dlm->name, dlm->reco.new_master, master); dlm->reco.new_master = master;}static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm){ assert_spin_locked(&dlm->spinlock); clear_bit(dlm->reco.dead_node, dlm->recovery_map); dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);}static inline void dlm_reset_recovery(struct dlm_ctxt *dlm){ spin_lock(&dlm->spinlock); __dlm_reset_recovery(dlm); spin_unlock(&dlm->spinlock);}/* Worker function used during recovery. */void dlm_dispatch_work(struct work_struct *work){ struct dlm_ctxt *dlm = container_of(work, struct dlm_ctxt, dispatched_work); LIST_HEAD(tmp_list); struct dlm_work_item *item, *next; dlm_workfunc_t *workfunc; int tot=0; spin_lock(&dlm->work_lock); list_splice_init(&dlm->work_list, &tmp_list); spin_unlock(&dlm->work_lock); list_for_each_entry(item, &tmp_list, list) { tot++; } mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); list_for_each_entry_safe(item, next, &tmp_list, list) { workfunc = item->func; list_del_init(&item->list); /* already have ref on dlm to avoid having * it disappear. just double-check. */ BUG_ON(item->dlm != dlm); /* this is allowed to sleep and * call network stuff */ workfunc(item, item->data); dlm_put(dlm); kfree(item); }}/* * RECOVERY THREAD */void dlm_kick_recovery_thread(struct dlm_ctxt *dlm){ /* wake the recovery thread * this will wake the reco thread in one of three places * 1) sleeping with no recovery happening * 2) sleeping with recovery mastered elsewhere * 3) recovery mastered here, waiting on reco data */ wake_up(&dlm->dlm_reco_thread_wq);}/* Launch the recovery thread */int dlm_launch_recovery_thread(struct dlm_ctxt *dlm){ mlog(0, "starting dlm recovery thread...\n"); dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, "dlm_reco_thread"); if (IS_ERR(dlm->dlm_reco_thread_task)) { mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); dlm->dlm_reco_thread_task = NULL; return -EINVAL; } return 0;}void dlm_complete_recovery_thread(struct dlm_ctxt *dlm){ if (dlm->dlm_reco_thread_task) { mlog(0, "waiting for dlm recovery thread to exit\n"); kthread_stop(dlm->dlm_reco_thread_task); dlm->dlm_reco_thread_task = NULL; }}/* * this is lame, but here's how recovery works... * 1) all recovery threads cluster wide will work on recovering * ONE node at a time * 2) negotiate who will take over all the locks for the dead node. * thats right... ALL the locks. * 3) once a new master is chosen, everyone scans all locks * and moves aside those mastered by the dead guy * 4) each of these locks should be locked until recovery is done * 5) the new master collects up all of secondary lock queue info * one lock at a time, forcing each node to communicate back * before continuing * 6) each secondary lock queue responds with the full known lock info * 7) once the new master has run all its locks, it sends a ALLDONE! * message to everyone * 8) upon receiving this message, the secondary queue node unlocks * and responds to the ALLDONE * 9) once the new master gets responses from everyone, he unlocks * everything and recovery for this dead node is done *10) go back to 2) while there are still dead nodes * */static void dlm_print_reco_node_status(struct dlm_ctxt *dlm){ struct dlm_reco_node_data *ndata; struct dlm_lock_resource *res; mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n", dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", dlm->reco.dead_node, dlm->reco.new_master); list_for_each_entry(ndata, &dlm->reco.node_data, list) { char *st = "unknown"; switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: st = "init"; break; case DLM_RECO_NODE_DATA_REQUESTING: st = "requesting"; break; case DLM_RECO_NODE_DATA_DEAD: st = "dead"; break; case DLM_RECO_NODE_DATA_RECEIVING: st = "receiving"; break; case DLM_RECO_NODE_DATA_REQUESTED: st = "requested"; break; case DLM_RECO_NODE_DATA_DONE: st = "done"; break; case DLM_RECO_NODE_DATA_FINALIZE_SENT: st = "finalize-sent"; break; default: st = "bad"; break; } mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", dlm->name, ndata->node_num, st); } list_for_each_entry(res, &dlm->reco.resources, recovering) { mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n", dlm->name, res->lockname.len, res->lockname.name); }}#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)static int dlm_recovery_thread(void *data){ int status; struct dlm_ctxt *dlm = data; unsigned long timeout = msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS); mlog(0, "dlm thread running for %s...\n", dlm->name); while (!kthread_should_stop()) { if (dlm_joined(dlm)) { status = dlm_do_recovery(dlm); if (status == -EAGAIN) { /* do not sleep, recheck immediately. */ continue; } if (status < 0) mlog_errno(status); } wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, kthread_should_stop(), timeout); } mlog(0, "quitting DLM recovery thread\n"); return 0;}/* returns true when the recovery master has contacted us */static int dlm_reco_master_ready(struct dlm_ctxt *dlm){ int ready; spin_lock(&dlm->spinlock); ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); spin_unlock(&dlm->spinlock); return ready;}/* returns true if node is no longer in the domain * could be dead or just not joined */int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node){ int dead; spin_lock(&dlm->spinlock); dead = !test_bit(node, dlm->domain_map); spin_unlock(&dlm->spinlock); return dead;}/* returns true if node is no longer in the domain * could be dead or just not joined */static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node){ int recovered; spin_lock(&dlm->spinlock); recovered = !test_bit(node, dlm->recovery_map); spin_unlock(&dlm->spinlock); return recovered;}int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout){ if (timeout) { mlog(ML_NOTICE, "%s: waiting %dms for notification of " "death of node %u\n", dlm->name, timeout, node); wait_event_timeout(dlm->dlm_reco_thread_wq, dlm_is_node_dead(dlm, node), msecs_to_jiffies(timeout)); } else { mlog(ML_NOTICE, "%s: waiting indefinitely for notification " "of death of node %u\n", dlm->name, node); wait_event(dlm->dlm_reco_thread_wq, dlm_is_node_dead(dlm, node)); } /* for now, return 0 */ return 0;}int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout){ if (timeout) { mlog(0, "%s: waiting %dms for notification of " "recovery of node %u\n", dlm->name, timeout, node); wait_event_timeout(dlm->dlm_reco_thread_wq, dlm_is_node_recovered(dlm, node), msecs_to_jiffies(timeout)); } else { mlog(0, "%s: waiting indefinitely for notification " "of recovery of node %u\n", dlm->name, node); wait_event(dlm->dlm_reco_thread_wq, dlm_is_node_recovered(dlm, node)); } /* for now, return 0 */ return 0;}/* callers of the top-level api calls (dlmlock/dlmunlock) should * block on the dlm->reco.event when recovery is in progress. * the dlm recovery thread will set this state when it begins * recovering a dead node (as the new master or not) and clear * the state and wake as soon as all affected lock resources have * been marked with the RECOVERY flag */static int dlm_in_recovery(struct dlm_ctxt *dlm){ int in_recovery; spin_lock(&dlm->spinlock); in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); spin_unlock(&dlm->spinlock); return in_recovery;}void dlm_wait_for_recovery(struct dlm_ctxt *dlm){ if (dlm_in_recovery(dlm)) { mlog(0, "%s: reco thread %d in recovery: " "state=%d, master=%u, dead=%u\n", dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.state, dlm->reco.new_master, dlm->reco.dead_node); } wait_event(dlm->reco.event, !dlm_in_recovery(dlm));}static void dlm_begin_recovery(struct dlm_ctxt *dlm){ spin_lock(&dlm->spinlock); BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); dlm->reco.state |= DLM_RECO_STATE_ACTIVE; spin_unlock(&dlm->spinlock);}static void dlm_end_recovery(struct dlm_ctxt *dlm){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -