⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 heartbeat.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- mode: c; c-basic-offset: 8; -*- * vim: noexpandtab sw=8 ts=8 sts=0: * * Copyright (C) 2004, 2005 Oracle.  All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this program; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 021110-1307, USA. */#include <linux/kernel.h>#include <linux/sched.h>#include <linux/jiffies.h>#include <linux/module.h>#include <linux/fs.h>#include <linux/bio.h>#include <linux/blkdev.h>#include <linux/delay.h>#include <linux/file.h>#include <linux/kthread.h>#include <linux/configfs.h>#include <linux/random.h>#include <linux/crc32.h>#include <linux/time.h>#include "heartbeat.h"#include "tcp.h"#include "nodemanager.h"#include "quorum.h"#include "masklog.h"/* * The first heartbeat pass had one global thread that would serialize all hb * callback calls.  This global serializing sem should only be removed once * we've made sure that all callees can deal with being called concurrently * from multiple hb region threads. */static DECLARE_RWSEM(o2hb_callback_sem);/* * multiple hb threads are watching multiple regions.  A node is live * whenever any of the threads sees activity from the node in its region. */static DEFINE_SPINLOCK(o2hb_live_lock);static struct list_head o2hb_live_slots[O2NM_MAX_NODES];static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];static LIST_HEAD(o2hb_node_events);static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);static LIST_HEAD(o2hb_all_regions);static struct o2hb_callback {	struct list_head list;} o2hb_callbacks[O2HB_NUM_CB];static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);#define O2HB_DEFAULT_BLOCK_BITS       9unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;/* Only sets a new threshold if there are no active regions.  * * No locking or otherwise interesting code is required for reading * o2hb_dead_threshold as it can't change once regions are active and * it's not interesting to anyone until then anyway. */static void o2hb_dead_threshold_set(unsigned int threshold){	if (threshold > O2HB_MIN_DEAD_THRESHOLD) {		spin_lock(&o2hb_live_lock);		if (list_empty(&o2hb_all_regions))			o2hb_dead_threshold = threshold;		spin_unlock(&o2hb_live_lock);	}}struct o2hb_node_event {	struct list_head        hn_item;	enum o2hb_callback_type hn_event_type;	struct o2nm_node        *hn_node;	int                     hn_node_num;};struct o2hb_disk_slot {	struct o2hb_disk_heartbeat_block *ds_raw_block;	u8			ds_node_num;	u64			ds_last_time;	u64			ds_last_generation;	u16			ds_equal_samples;	u16			ds_changed_samples;	struct list_head	ds_live_item;};/* each thread owns a region.. when we're asked to tear down the region * we ask the thread to stop, who cleans up the region */struct o2hb_region {	struct config_item	hr_item;	struct list_head	hr_all_item;	unsigned		hr_unclean_stop:1;	/* protected by the hr_callback_sem */	struct task_struct 	*hr_task;	unsigned int		hr_blocks;	unsigned long long	hr_start_block;	unsigned int		hr_block_bits;	unsigned int		hr_block_bytes;	unsigned int		hr_slots_per_page;	unsigned int		hr_num_pages;	struct page             **hr_slot_data;	struct block_device	*hr_bdev;	struct o2hb_disk_slot	*hr_slots;	/* let the person setting up hb wait for it to return until it	 * has reached a 'steady' state.  This will be fixed when we have	 * a more complete api that doesn't lead to this sort of fragility. */	atomic_t		hr_steady_iterations;	char			hr_dev_name[BDEVNAME_SIZE];	unsigned int		hr_timeout_ms;	/* randomized as the region goes up and down so that a node	 * recognizes a node going up and down in one iteration */	u64			hr_generation;	struct delayed_work	hr_write_timeout_work;	unsigned long		hr_last_timeout_start;	/* Used during o2hb_check_slot to hold a copy of the block	 * being checked because we temporarily have to zero out the	 * crc field. */	struct o2hb_disk_heartbeat_block *hr_tmp_block;};struct o2hb_bio_wait_ctxt {	atomic_t          wc_num_reqs;	struct completion wc_io_complete;	int               wc_error;};static void o2hb_write_timeout(struct work_struct *work){	struct o2hb_region *reg =		container_of(work, struct o2hb_region,			     hr_write_timeout_work.work);	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "	     "milliseconds\n", reg->hr_dev_name,	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 	o2quo_disk_timeout();}static void o2hb_arm_write_timeout(struct o2hb_region *reg){	mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);	cancel_delayed_work(&reg->hr_write_timeout_work);	reg->hr_last_timeout_start = jiffies;	schedule_delayed_work(&reg->hr_write_timeout_work,			      msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));}static void o2hb_disarm_write_timeout(struct o2hb_region *reg){	cancel_delayed_work(&reg->hr_write_timeout_work);	flush_scheduled_work();}static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc){	atomic_set(&wc->wc_num_reqs, 1);	init_completion(&wc->wc_io_complete);	wc->wc_error = 0;}/* Used in error paths too */static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,				     unsigned int num){	/* sadly atomic_sub_and_test() isn't available on all platforms.  The	 * good news is that the fast path only completes one at a time */	while(num--) {		if (atomic_dec_and_test(&wc->wc_num_reqs)) {			BUG_ON(num > 0);			complete(&wc->wc_io_complete);		}	}}static void o2hb_wait_on_io(struct o2hb_region *reg,			    struct o2hb_bio_wait_ctxt *wc){	struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;	blk_run_address_space(mapping);	o2hb_bio_wait_dec(wc, 1);	wait_for_completion(&wc->wc_io_complete);}static void o2hb_bio_end_io(struct bio *bio,			   int error){	struct o2hb_bio_wait_ctxt *wc = bio->bi_private;	if (error) {		mlog(ML_ERROR, "IO Error %d\n", error);		wc->wc_error = error;	}	o2hb_bio_wait_dec(wc, 1);	bio_put(bio);}/* Setup a Bio to cover I/O against num_slots slots starting at * start_slot. */static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,				      struct o2hb_bio_wait_ctxt *wc,				      unsigned int *current_slot,				      unsigned int max_slots){	int len, current_page;	unsigned int vec_len, vec_start;	unsigned int bits = reg->hr_block_bits;	unsigned int spp = reg->hr_slots_per_page;	unsigned int cs = *current_slot;	struct bio *bio;	struct page *page;	/* Testing has shown this allocation to take long enough under	 * GFP_KERNEL that the local node can get fenced. It would be	 * nicest if we could pre-allocate these bios and avoid this	 * all together. */	bio = bio_alloc(GFP_ATOMIC, 16);	if (!bio) {		mlog(ML_ERROR, "Could not alloc slots BIO!\n");		bio = ERR_PTR(-ENOMEM);		goto bail;	}	/* Must put everything in 512 byte sectors for the bio... */	bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);	bio->bi_bdev = reg->hr_bdev;	bio->bi_private = wc;	bio->bi_end_io = o2hb_bio_end_io;	vec_start = (cs << bits) % PAGE_CACHE_SIZE;	while(cs < max_slots) {		current_page = cs / spp;		page = reg->hr_slot_data[current_page];		vec_len = min(PAGE_CACHE_SIZE - vec_start,			      (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",		     current_page, vec_len, vec_start);		len = bio_add_page(bio, page, vec_len, vec_start);		if (len != vec_len) break;		cs += vec_len / (PAGE_CACHE_SIZE/spp);		vec_start = 0;	}bail:	*current_slot = cs;	return bio;}static int o2hb_read_slots(struct o2hb_region *reg,			   unsigned int max_slots){	unsigned int current_slot=0;	int status;	struct o2hb_bio_wait_ctxt wc;	struct bio *bio;	o2hb_bio_wait_init(&wc);	while(current_slot < max_slots) {		bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);		if (IS_ERR(bio)) {			status = PTR_ERR(bio);			mlog_errno(status);			goto bail_and_wait;		}		atomic_inc(&wc.wc_num_reqs);		submit_bio(READ, bio);	}	status = 0;bail_and_wait:	o2hb_wait_on_io(reg, &wc);	if (wc.wc_error && !status)		status = wc.wc_error;	return status;}static int o2hb_issue_node_write(struct o2hb_region *reg,				 struct o2hb_bio_wait_ctxt *write_wc){	int status;	unsigned int slot;	struct bio *bio;	o2hb_bio_wait_init(write_wc);	slot = o2nm_this_node();	bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);	if (IS_ERR(bio)) {		status = PTR_ERR(bio);		mlog_errno(status);		goto bail;	}	atomic_inc(&write_wc->wc_num_reqs);	submit_bio(WRITE, bio);	status = 0;bail:	return status;}static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,				     struct o2hb_disk_heartbeat_block *hb_block){	__le32 old_cksum;	u32 ret;	/* We want to compute the block crc with a 0 value in the	 * hb_cksum field. Save it off here and replace after the	 * crc. */	old_cksum = hb_block->hb_cksum;	hb_block->hb_cksum = 0;	ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);	hb_block->hb_cksum = old_cksum;	return ret;}static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block){	mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "	     "cksum = 0x%x, generation 0x%llx\n",	     (long long)le64_to_cpu(hb_block->hb_seq),	     hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),	     (long long)le64_to_cpu(hb_block->hb_generation));}static int o2hb_verify_crc(struct o2hb_region *reg,			   struct o2hb_disk_heartbeat_block *hb_block){	u32 read, computed;	read = le32_to_cpu(hb_block->hb_cksum);	computed = o2hb_compute_block_crc_le(reg, hb_block);	return read == computed;}/* We want to make sure that nobody is heartbeating on top of us -- * this will help detect an invalid configuration. */static int o2hb_check_last_timestamp(struct o2hb_region *reg){	int node_num, ret;	struct o2hb_disk_slot *slot;	struct o2hb_disk_heartbeat_block *hb_block;	node_num = o2nm_this_node();	ret = 1;	slot = &reg->hr_slots[node_num];	/* Don't check on our 1st timestamp */	if (slot->ds_last_time) {		hb_block = slot->ds_raw_block;		if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)			ret = 0;	}	return ret;}static inline void o2hb_prepare_block(struct o2hb_region *reg,				      u64 generation){	int node_num;	u64 cputime;	struct o2hb_disk_slot *slot;	struct o2hb_disk_heartbeat_block *hb_block;	node_num = o2nm_this_node();	slot = &reg->hr_slots[node_num];	hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;	memset(hb_block, 0, reg->hr_block_bytes);	/* TODO: time stuff */	cputime = CURRENT_TIME.tv_sec;	if (!cputime)		cputime = 1;	hb_block->hb_seq = cpu_to_le64(cputime);	hb_block->hb_node = node_num;	hb_block->hb_generation = cpu_to_le64(generation);	hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);	/* This step must always happen last! */	hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,								   hb_block));	mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",	     (long long)generation,	     le32_to_cpu(hb_block->hb_cksum));}static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,				struct o2nm_node *node,				int idx){	struct list_head *iter;	struct o2hb_callback_func *f;	list_for_each(iter, &hbcall->list) {		f = list_entry(iter, struct o2hb_callback_func, hc_item);		mlog(ML_HEARTBEAT, "calling funcs %p\n", f);		(f->hc_func)(node, idx, f->hc_data);	}}/* Will run the list in order until we process the passed event */static void o2hb_run_event_list(struct o2hb_node_event *queued_event){	int empty;	struct o2hb_callback *hbcall;	struct o2hb_node_event *event;	spin_lock(&o2hb_live_lock);	empty = list_empty(&queued_event->hn_item);	spin_unlock(&o2hb_live_lock);	if (empty)		return;	/* Holding callback sem assures we don't alter the callback	 * lists when doing this, and serializes ourselves with other	 * processes wanting callbacks. */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -