📄 heartbeat.c
字号:
/* -*- mode: c; c-basic-offset: 8; -*- * vim: noexpandtab sw=8 ts=8 sts=0: * * Copyright (C) 2004, 2005 Oracle. All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this program; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 021110-1307, USA. */#include <linux/kernel.h>#include <linux/sched.h>#include <linux/jiffies.h>#include <linux/module.h>#include <linux/fs.h>#include <linux/bio.h>#include <linux/blkdev.h>#include <linux/delay.h>#include <linux/file.h>#include <linux/kthread.h>#include <linux/configfs.h>#include <linux/random.h>#include <linux/crc32.h>#include <linux/time.h>#include "heartbeat.h"#include "tcp.h"#include "nodemanager.h"#include "quorum.h"#include "masklog.h"/* * The first heartbeat pass had one global thread that would serialize all hb * callback calls. This global serializing sem should only be removed once * we've made sure that all callees can deal with being called concurrently * from multiple hb region threads. */static DECLARE_RWSEM(o2hb_callback_sem);/* * multiple hb threads are watching multiple regions. A node is live * whenever any of the threads sees activity from the node in its region. */static DEFINE_SPINLOCK(o2hb_live_lock);static struct list_head o2hb_live_slots[O2NM_MAX_NODES];static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];static LIST_HEAD(o2hb_node_events);static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);static LIST_HEAD(o2hb_all_regions);static struct o2hb_callback { struct list_head list;} o2hb_callbacks[O2HB_NUM_CB];static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);#define O2HB_DEFAULT_BLOCK_BITS 9unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;/* Only sets a new threshold if there are no active regions. * * No locking or otherwise interesting code is required for reading * o2hb_dead_threshold as it can't change once regions are active and * it's not interesting to anyone until then anyway. */static void o2hb_dead_threshold_set(unsigned int threshold){ if (threshold > O2HB_MIN_DEAD_THRESHOLD) { spin_lock(&o2hb_live_lock); if (list_empty(&o2hb_all_regions)) o2hb_dead_threshold = threshold; spin_unlock(&o2hb_live_lock); }}struct o2hb_node_event { struct list_head hn_item; enum o2hb_callback_type hn_event_type; struct o2nm_node *hn_node; int hn_node_num;};struct o2hb_disk_slot { struct o2hb_disk_heartbeat_block *ds_raw_block; u8 ds_node_num; u64 ds_last_time; u64 ds_last_generation; u16 ds_equal_samples; u16 ds_changed_samples; struct list_head ds_live_item;};/* each thread owns a region.. when we're asked to tear down the region * we ask the thread to stop, who cleans up the region */struct o2hb_region { struct config_item hr_item; struct list_head hr_all_item; unsigned hr_unclean_stop:1; /* protected by the hr_callback_sem */ struct task_struct *hr_task; unsigned int hr_blocks; unsigned long long hr_start_block; unsigned int hr_block_bits; unsigned int hr_block_bytes; unsigned int hr_slots_per_page; unsigned int hr_num_pages; struct page **hr_slot_data; struct block_device *hr_bdev; struct o2hb_disk_slot *hr_slots; /* let the person setting up hb wait for it to return until it * has reached a 'steady' state. This will be fixed when we have * a more complete api that doesn't lead to this sort of fragility. */ atomic_t hr_steady_iterations; char hr_dev_name[BDEVNAME_SIZE]; unsigned int hr_timeout_ms; /* randomized as the region goes up and down so that a node * recognizes a node going up and down in one iteration */ u64 hr_generation; struct delayed_work hr_write_timeout_work; unsigned long hr_last_timeout_start; /* Used during o2hb_check_slot to hold a copy of the block * being checked because we temporarily have to zero out the * crc field. */ struct o2hb_disk_heartbeat_block *hr_tmp_block;};struct o2hb_bio_wait_ctxt { atomic_t wc_num_reqs; struct completion wc_io_complete; int wc_error;};static void o2hb_write_timeout(struct work_struct *work){ struct o2hb_region *reg = container_of(work, struct o2hb_region, hr_write_timeout_work.work); mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " "milliseconds\n", reg->hr_dev_name, jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); o2quo_disk_timeout();}static void o2hb_arm_write_timeout(struct o2hb_region *reg){ mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); cancel_delayed_work(®->hr_write_timeout_work); reg->hr_last_timeout_start = jiffies; schedule_delayed_work(®->hr_write_timeout_work, msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));}static void o2hb_disarm_write_timeout(struct o2hb_region *reg){ cancel_delayed_work(®->hr_write_timeout_work); flush_scheduled_work();}static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc){ atomic_set(&wc->wc_num_reqs, 1); init_completion(&wc->wc_io_complete); wc->wc_error = 0;}/* Used in error paths too */static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, unsigned int num){ /* sadly atomic_sub_and_test() isn't available on all platforms. The * good news is that the fast path only completes one at a time */ while(num--) { if (atomic_dec_and_test(&wc->wc_num_reqs)) { BUG_ON(num > 0); complete(&wc->wc_io_complete); } }}static void o2hb_wait_on_io(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc){ struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; blk_run_address_space(mapping); o2hb_bio_wait_dec(wc, 1); wait_for_completion(&wc->wc_io_complete);}static void o2hb_bio_end_io(struct bio *bio, int error){ struct o2hb_bio_wait_ctxt *wc = bio->bi_private; if (error) { mlog(ML_ERROR, "IO Error %d\n", error); wc->wc_error = error; } o2hb_bio_wait_dec(wc, 1); bio_put(bio);}/* Setup a Bio to cover I/O against num_slots slots starting at * start_slot. */static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc, unsigned int *current_slot, unsigned int max_slots){ int len, current_page; unsigned int vec_len, vec_start; unsigned int bits = reg->hr_block_bits; unsigned int spp = reg->hr_slots_per_page; unsigned int cs = *current_slot; struct bio *bio; struct page *page; /* Testing has shown this allocation to take long enough under * GFP_KERNEL that the local node can get fenced. It would be * nicest if we could pre-allocate these bios and avoid this * all together. */ bio = bio_alloc(GFP_ATOMIC, 16); if (!bio) { mlog(ML_ERROR, "Could not alloc slots BIO!\n"); bio = ERR_PTR(-ENOMEM); goto bail; } /* Must put everything in 512 byte sectors for the bio... */ bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); bio->bi_bdev = reg->hr_bdev; bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; vec_start = (cs << bits) % PAGE_CACHE_SIZE; while(cs < max_slots) { current_page = cs / spp; page = reg->hr_slot_data[current_page]; vec_len = min(PAGE_CACHE_SIZE - vec_start, (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", current_page, vec_len, vec_start); len = bio_add_page(bio, page, vec_len, vec_start); if (len != vec_len) break; cs += vec_len / (PAGE_CACHE_SIZE/spp); vec_start = 0; }bail: *current_slot = cs; return bio;}static int o2hb_read_slots(struct o2hb_region *reg, unsigned int max_slots){ unsigned int current_slot=0; int status; struct o2hb_bio_wait_ctxt wc; struct bio *bio; o2hb_bio_wait_init(&wc); while(current_slot < max_slots) { bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); goto bail_and_wait; } atomic_inc(&wc.wc_num_reqs); submit_bio(READ, bio); } status = 0;bail_and_wait: o2hb_wait_on_io(reg, &wc); if (wc.wc_error && !status) status = wc.wc_error; return status;}static int o2hb_issue_node_write(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *write_wc){ int status; unsigned int slot; struct bio *bio; o2hb_bio_wait_init(write_wc); slot = o2nm_this_node(); bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); goto bail; } atomic_inc(&write_wc->wc_num_reqs); submit_bio(WRITE, bio); status = 0;bail: return status;}static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, struct o2hb_disk_heartbeat_block *hb_block){ __le32 old_cksum; u32 ret; /* We want to compute the block crc with a 0 value in the * hb_cksum field. Save it off here and replace after the * crc. */ old_cksum = hb_block->hb_cksum; hb_block->hb_cksum = 0; ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); hb_block->hb_cksum = old_cksum; return ret;}static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block){ mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " "cksum = 0x%x, generation 0x%llx\n", (long long)le64_to_cpu(hb_block->hb_seq), hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), (long long)le64_to_cpu(hb_block->hb_generation));}static int o2hb_verify_crc(struct o2hb_region *reg, struct o2hb_disk_heartbeat_block *hb_block){ u32 read, computed; read = le32_to_cpu(hb_block->hb_cksum); computed = o2hb_compute_block_crc_le(reg, hb_block); return read == computed;}/* We want to make sure that nobody is heartbeating on top of us -- * this will help detect an invalid configuration. */static int o2hb_check_last_timestamp(struct o2hb_region *reg){ int node_num, ret; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; node_num = o2nm_this_node(); ret = 1; slot = ®->hr_slots[node_num]; /* Don't check on our 1st timestamp */ if (slot->ds_last_time) { hb_block = slot->ds_raw_block; if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) ret = 0; } return ret;}static inline void o2hb_prepare_block(struct o2hb_region *reg, u64 generation){ int node_num; u64 cputime; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; node_num = o2nm_this_node(); slot = ®->hr_slots[node_num]; hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; memset(hb_block, 0, reg->hr_block_bytes); /* TODO: time stuff */ cputime = CURRENT_TIME.tv_sec; if (!cputime) cputime = 1; hb_block->hb_seq = cpu_to_le64(cputime); hb_block->hb_node = node_num; hb_block->hb_generation = cpu_to_le64(generation); hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); /* This step must always happen last! */ hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, hb_block)); mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", (long long)generation, le32_to_cpu(hb_block->hb_cksum));}static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, struct o2nm_node *node, int idx){ struct list_head *iter; struct o2hb_callback_func *f; list_for_each(iter, &hbcall->list) { f = list_entry(iter, struct o2hb_callback_func, hc_item); mlog(ML_HEARTBEAT, "calling funcs %p\n", f); (f->hc_func)(node, idx, f->hc_data); }}/* Will run the list in order until we process the passed event */static void o2hb_run_event_list(struct o2hb_node_event *queued_event){ int empty; struct o2hb_callback *hbcall; struct o2hb_node_event *event; spin_lock(&o2hb_live_lock); empty = list_empty(&queued_event->hn_item); spin_unlock(&o2hb_live_lock); if (empty) return; /* Holding callback sem assures we don't alter the callback * lists when doing this, and serializes ourselves with other * processes wanting callbacks. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -