⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 heartbeat.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- mode: c; c-basic-offset: 8; -*- * vim: noexpandtab sw=8 ts=8 sts=0: * * Copyright (C) 2004, 2005 Oracle.  All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this program; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 021110-1307, USA. */#include <linux/kernel.h>#include <linux/sched.h>#include <linux/jiffies.h>#include <linux/module.h>#include <linux/fs.h>#include <linux/bio.h>#include <linux/blkdev.h>#include <linux/delay.h>#include <linux/file.h>#include <linux/kthread.h>#include <linux/configfs.h>#include <linux/random.h>#include <linux/crc32.h>#include <linux/time.h>#include "heartbeat.h"#include "tcp.h"#include "nodemanager.h"#include "quorum.h"#include "masklog.h"/* * The first heartbeat pass had one global thread that would serialize all hb * callback calls.  This global serializing sem should only be removed once * we've made sure that all callees can deal with being called concurrently * from multiple hb region threads. */static DECLARE_RWSEM(o2hb_callback_sem);/* * multiple hb threads are watching multiple regions.  A node is live * whenever any of the threads sees activity from the node in its region. */static spinlock_t o2hb_live_lock = SPIN_LOCK_UNLOCKED;static struct list_head o2hb_live_slots[O2NM_MAX_NODES];static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];static LIST_HEAD(o2hb_node_events);static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);static LIST_HEAD(o2hb_all_regions);static struct o2hb_callback {	struct list_head list;} o2hb_callbacks[O2HB_NUM_CB];static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);#define O2HB_DEFAULT_BLOCK_BITS       9unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;/* Only sets a new threshold if there are no active regions.  * * No locking or otherwise interesting code is required for reading * o2hb_dead_threshold as it can't change once regions are active and * it's not interesting to anyone until then anyway. */static void o2hb_dead_threshold_set(unsigned int threshold){	if (threshold > O2HB_MIN_DEAD_THRESHOLD) {		spin_lock(&o2hb_live_lock);		if (list_empty(&o2hb_all_regions))			o2hb_dead_threshold = threshold;		spin_unlock(&o2hb_live_lock);	}}struct o2hb_node_event {	struct list_head        hn_item;	enum o2hb_callback_type hn_event_type;	struct o2nm_node        *hn_node;	int                     hn_node_num;};struct o2hb_disk_slot {	struct o2hb_disk_heartbeat_block *ds_raw_block;	u8			ds_node_num;	u64			ds_last_time;	u64			ds_last_generation;	u16			ds_equal_samples;	u16			ds_changed_samples;	struct list_head	ds_live_item;};/* each thread owns a region.. when we're asked to tear down the region * we ask the thread to stop, who cleans up the region */struct o2hb_region {	struct config_item	hr_item;	struct list_head	hr_all_item;	unsigned		hr_unclean_stop:1;	/* protected by the hr_callback_sem */	struct task_struct 	*hr_task;	unsigned int		hr_blocks;	unsigned long long	hr_start_block;	unsigned int		hr_block_bits;	unsigned int		hr_block_bytes;	unsigned int		hr_slots_per_page;	unsigned int		hr_num_pages;	struct page             **hr_slot_data;	struct block_device	*hr_bdev;	struct o2hb_disk_slot	*hr_slots;	/* let the person setting up hb wait for it to return until it	 * has reached a 'steady' state.  This will be fixed when we have	 * a more complete api that doesn't lead to this sort of fragility. */	atomic_t		hr_steady_iterations;	char			hr_dev_name[BDEVNAME_SIZE];	unsigned int		hr_timeout_ms;	/* randomized as the region goes up and down so that a node	 * recognizes a node going up and down in one iteration */	u64			hr_generation;	struct work_struct	hr_write_timeout_work;	unsigned long		hr_last_timeout_start;	/* Used during o2hb_check_slot to hold a copy of the block	 * being checked because we temporarily have to zero out the	 * crc field. */	struct o2hb_disk_heartbeat_block *hr_tmp_block;};struct o2hb_bio_wait_ctxt {	atomic_t          wc_num_reqs;	struct completion wc_io_complete;};static void o2hb_write_timeout(void *arg){	struct o2hb_region *reg = arg;	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "	     "milliseconds\n", reg->hr_dev_name,	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 	o2quo_disk_timeout();}static void o2hb_arm_write_timeout(struct o2hb_region *reg){	mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);	cancel_delayed_work(&reg->hr_write_timeout_work);	reg->hr_last_timeout_start = jiffies;	schedule_delayed_work(&reg->hr_write_timeout_work,			      msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));}static void o2hb_disarm_write_timeout(struct o2hb_region *reg){	cancel_delayed_work(&reg->hr_write_timeout_work);	flush_scheduled_work();}static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,				      unsigned int num_ios){	atomic_set(&wc->wc_num_reqs, num_ios);	init_completion(&wc->wc_io_complete);}/* Used in error paths too */static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,				     unsigned int num){	/* sadly atomic_sub_and_test() isn't available on all platforms.  The	 * good news is that the fast path only completes one at a time */	while(num--) {		if (atomic_dec_and_test(&wc->wc_num_reqs)) {			BUG_ON(num > 0);			complete(&wc->wc_io_complete);		}	}}static void o2hb_wait_on_io(struct o2hb_region *reg,			    struct o2hb_bio_wait_ctxt *wc){	struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;	blk_run_address_space(mapping);	wait_for_completion(&wc->wc_io_complete);}static int o2hb_bio_end_io(struct bio *bio,			   unsigned int bytes_done,			   int error){	struct o2hb_bio_wait_ctxt *wc = bio->bi_private;	if (error)		mlog(ML_ERROR, "IO Error %d\n", error);	if (bio->bi_size)		return 1;	o2hb_bio_wait_dec(wc, 1);	return 0;}/* Setup a Bio to cover I/O against num_slots slots starting at * start_slot. */static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,				      struct o2hb_bio_wait_ctxt *wc,				      unsigned int start_slot,				      unsigned int num_slots){	int i, nr_vecs, len, first_page, last_page;	unsigned int vec_len, vec_start;	unsigned int bits = reg->hr_block_bits;	unsigned int spp = reg->hr_slots_per_page;	struct bio *bio;	struct page *page;	nr_vecs = (num_slots + spp - 1) / spp;	/* Testing has shown this allocation to take long enough under	 * GFP_KERNEL that the local node can get fenced. It would be	 * nicest if we could pre-allocate these bios and avoid this	 * all together. */	bio = bio_alloc(GFP_ATOMIC, nr_vecs);	if (!bio) {		mlog(ML_ERROR, "Could not alloc slots BIO!\n");		bio = ERR_PTR(-ENOMEM);		goto bail;	}	/* Must put everything in 512 byte sectors for the bio... */	bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);	bio->bi_bdev = reg->hr_bdev;	bio->bi_private = wc;	bio->bi_end_io = o2hb_bio_end_io;	first_page = start_slot / spp;	last_page = first_page + nr_vecs;	vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;	for(i = first_page; i < last_page; i++) {		page = reg->hr_slot_data[i];		vec_len = PAGE_CACHE_SIZE;		/* last page might be short */		if (((i + 1) * spp) > (start_slot + num_slots))			vec_len = ((num_slots + start_slot) % spp) << bits;		vec_len -=  vec_start;		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",		     i, vec_len, vec_start);		len = bio_add_page(bio, page, vec_len, vec_start);		if (len != vec_len) {			bio_put(bio);			bio = ERR_PTR(-EIO);			mlog(ML_ERROR, "Error adding page to bio i = %d, "			     "vec_len = %u, len = %d\n, start = %u\n",			     i, vec_len, len, vec_start);			goto bail;		}		vec_start = 0;	}bail:	return bio;}/* * Compute the maximum number of sectors the bdev can handle in one bio, * as a power of two. * * Stolen from oracleasm, thanks Joel! */static int compute_max_sectors(struct block_device *bdev){	int max_pages, max_sectors, pow_two_sectors;	struct request_queue *q;	q = bdev_get_queue(bdev);	max_pages = q->max_sectors >> (PAGE_SHIFT - 9);	if (max_pages > BIO_MAX_PAGES)		max_pages = BIO_MAX_PAGES;	if (max_pages > q->max_phys_segments)		max_pages = q->max_phys_segments;	if (max_pages > q->max_hw_segments)		max_pages = q->max_hw_segments;	max_pages--; /* Handle I/Os that straddle a page */	max_sectors = max_pages << (PAGE_SHIFT - 9);	/* Why is fls() 1-based???? */	pow_two_sectors = 1 << (fls(max_sectors) - 1);	return pow_two_sectors;}static inline void o2hb_compute_request_limits(struct o2hb_region *reg,					       unsigned int num_slots,					       unsigned int *num_bios,					       unsigned int *slots_per_bio){	unsigned int max_sectors, io_sectors;	max_sectors = compute_max_sectors(reg->hr_bdev);	io_sectors = num_slots << (reg->hr_block_bits - 9);	*num_bios = (io_sectors + max_sectors - 1) / max_sectors;	*slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);	mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "	     "device can handle %u sectors of I/O\n", io_sectors, num_slots,	     max_sectors);	mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",	     *num_bios, *slots_per_bio);}static int o2hb_read_slots(struct o2hb_region *reg,			   unsigned int max_slots){	unsigned int num_bios, slots_per_bio, start_slot, num_slots;	int i, status;	struct o2hb_bio_wait_ctxt wc;	struct bio **bios;	struct bio *bio;	o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);	bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);	if (!bios) {		status = -ENOMEM;		mlog_errno(status);		return status;	}	o2hb_bio_wait_init(&wc, num_bios);	num_slots = slots_per_bio;	for(i = 0; i < num_bios; i++) {		start_slot = i * slots_per_bio;		/* adjust num_slots at last bio */		if (max_slots < (start_slot + num_slots))			num_slots = max_slots - start_slot;		bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);		if (IS_ERR(bio)) {			o2hb_bio_wait_dec(&wc, num_bios - i);			status = PTR_ERR(bio);			mlog_errno(status);			goto bail_and_wait;		}		bios[i] = bio;		submit_bio(READ, bio);	}	status = 0;bail_and_wait:	o2hb_wait_on_io(reg, &wc);	if (bios) {		for(i = 0; i < num_bios; i++)			if (bios[i])				bio_put(bios[i]);		kfree(bios);	}	return status;}static int o2hb_issue_node_write(struct o2hb_region *reg,				 struct bio **write_bio,				 struct o2hb_bio_wait_ctxt *write_wc){	int status;	unsigned int slot;	struct bio *bio;	o2hb_bio_wait_init(write_wc, 1);	slot = o2nm_this_node();	bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);	if (IS_ERR(bio)) {		status = PTR_ERR(bio);		mlog_errno(status);		goto bail;	}	submit_bio(WRITE, bio);	*write_bio = bio;	status = 0;bail:	return status;}static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,				     struct o2hb_disk_heartbeat_block *hb_block){	__le32 old_cksum;	u32 ret;	/* We want to compute the block crc with a 0 value in the	 * hb_cksum field. Save it off here and replace after the	 * crc. */	old_cksum = hb_block->hb_cksum;	hb_block->hb_cksum = 0;	ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);	hb_block->hb_cksum = old_cksum;	return ret;}static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -