📄 heartbeat.c
字号:
/* -*- mode: c; c-basic-offset: 8; -*- * vim: noexpandtab sw=8 ts=8 sts=0: * * Copyright (C) 2004, 2005 Oracle. All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this program; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 021110-1307, USA. */#include <linux/kernel.h>#include <linux/sched.h>#include <linux/jiffies.h>#include <linux/module.h>#include <linux/fs.h>#include <linux/bio.h>#include <linux/blkdev.h>#include <linux/delay.h>#include <linux/file.h>#include <linux/kthread.h>#include <linux/configfs.h>#include <linux/random.h>#include <linux/crc32.h>#include <linux/time.h>#include "heartbeat.h"#include "tcp.h"#include "nodemanager.h"#include "quorum.h"#include "masklog.h"/* * The first heartbeat pass had one global thread that would serialize all hb * callback calls. This global serializing sem should only be removed once * we've made sure that all callees can deal with being called concurrently * from multiple hb region threads. */static DECLARE_RWSEM(o2hb_callback_sem);/* * multiple hb threads are watching multiple regions. A node is live * whenever any of the threads sees activity from the node in its region. */static spinlock_t o2hb_live_lock = SPIN_LOCK_UNLOCKED;static struct list_head o2hb_live_slots[O2NM_MAX_NODES];static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];static LIST_HEAD(o2hb_node_events);static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);static LIST_HEAD(o2hb_all_regions);static struct o2hb_callback { struct list_head list;} o2hb_callbacks[O2HB_NUM_CB];static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);#define O2HB_DEFAULT_BLOCK_BITS 9unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;/* Only sets a new threshold if there are no active regions. * * No locking or otherwise interesting code is required for reading * o2hb_dead_threshold as it can't change once regions are active and * it's not interesting to anyone until then anyway. */static void o2hb_dead_threshold_set(unsigned int threshold){ if (threshold > O2HB_MIN_DEAD_THRESHOLD) { spin_lock(&o2hb_live_lock); if (list_empty(&o2hb_all_regions)) o2hb_dead_threshold = threshold; spin_unlock(&o2hb_live_lock); }}struct o2hb_node_event { struct list_head hn_item; enum o2hb_callback_type hn_event_type; struct o2nm_node *hn_node; int hn_node_num;};struct o2hb_disk_slot { struct o2hb_disk_heartbeat_block *ds_raw_block; u8 ds_node_num; u64 ds_last_time; u64 ds_last_generation; u16 ds_equal_samples; u16 ds_changed_samples; struct list_head ds_live_item;};/* each thread owns a region.. when we're asked to tear down the region * we ask the thread to stop, who cleans up the region */struct o2hb_region { struct config_item hr_item; struct list_head hr_all_item; unsigned hr_unclean_stop:1; /* protected by the hr_callback_sem */ struct task_struct *hr_task; unsigned int hr_blocks; unsigned long long hr_start_block; unsigned int hr_block_bits; unsigned int hr_block_bytes; unsigned int hr_slots_per_page; unsigned int hr_num_pages; struct page **hr_slot_data; struct block_device *hr_bdev; struct o2hb_disk_slot *hr_slots; /* let the person setting up hb wait for it to return until it * has reached a 'steady' state. This will be fixed when we have * a more complete api that doesn't lead to this sort of fragility. */ atomic_t hr_steady_iterations; char hr_dev_name[BDEVNAME_SIZE]; unsigned int hr_timeout_ms; /* randomized as the region goes up and down so that a node * recognizes a node going up and down in one iteration */ u64 hr_generation; struct work_struct hr_write_timeout_work; unsigned long hr_last_timeout_start; /* Used during o2hb_check_slot to hold a copy of the block * being checked because we temporarily have to zero out the * crc field. */ struct o2hb_disk_heartbeat_block *hr_tmp_block;};struct o2hb_bio_wait_ctxt { atomic_t wc_num_reqs; struct completion wc_io_complete; int wc_error;};static void o2hb_write_timeout(void *arg){ struct o2hb_region *reg = arg; mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " "milliseconds\n", reg->hr_dev_name, jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); o2quo_disk_timeout();}static void o2hb_arm_write_timeout(struct o2hb_region *reg){ mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); cancel_delayed_work(®->hr_write_timeout_work); reg->hr_last_timeout_start = jiffies; schedule_delayed_work(®->hr_write_timeout_work, msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));}static void o2hb_disarm_write_timeout(struct o2hb_region *reg){ cancel_delayed_work(®->hr_write_timeout_work); flush_scheduled_work();}static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, unsigned int num_ios){ atomic_set(&wc->wc_num_reqs, num_ios); init_completion(&wc->wc_io_complete); wc->wc_error = 0;}/* Used in error paths too */static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, unsigned int num){ /* sadly atomic_sub_and_test() isn't available on all platforms. The * good news is that the fast path only completes one at a time */ while(num--) { if (atomic_dec_and_test(&wc->wc_num_reqs)) { BUG_ON(num > 0); complete(&wc->wc_io_complete); } }}static void o2hb_wait_on_io(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc){ struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; blk_run_address_space(mapping); wait_for_completion(&wc->wc_io_complete);}static int o2hb_bio_end_io(struct bio *bio, unsigned int bytes_done, int error){ struct o2hb_bio_wait_ctxt *wc = bio->bi_private; if (error) { mlog(ML_ERROR, "IO Error %d\n", error); wc->wc_error = error; } if (bio->bi_size) return 1; o2hb_bio_wait_dec(wc, 1); return 0;}/* Setup a Bio to cover I/O against num_slots slots starting at * start_slot. */static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc, unsigned int start_slot, unsigned int num_slots){ int i, nr_vecs, len, first_page, last_page; unsigned int vec_len, vec_start; unsigned int bits = reg->hr_block_bits; unsigned int spp = reg->hr_slots_per_page; struct bio *bio; struct page *page; nr_vecs = (num_slots + spp - 1) / spp; /* Testing has shown this allocation to take long enough under * GFP_KERNEL that the local node can get fenced. It would be * nicest if we could pre-allocate these bios and avoid this * all together. */ bio = bio_alloc(GFP_ATOMIC, nr_vecs); if (!bio) { mlog(ML_ERROR, "Could not alloc slots BIO!\n"); bio = ERR_PTR(-ENOMEM); goto bail; } /* Must put everything in 512 byte sectors for the bio... */ bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); bio->bi_bdev = reg->hr_bdev; bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; first_page = start_slot / spp; last_page = first_page + nr_vecs; vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; for(i = first_page; i < last_page; i++) { page = reg->hr_slot_data[i]; vec_len = PAGE_CACHE_SIZE; /* last page might be short */ if (((i + 1) * spp) > (start_slot + num_slots)) vec_len = ((num_slots + start_slot) % spp) << bits; vec_len -= vec_start; mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", i, vec_len, vec_start); len = bio_add_page(bio, page, vec_len, vec_start); if (len != vec_len) { bio_put(bio); bio = ERR_PTR(-EIO); mlog(ML_ERROR, "Error adding page to bio i = %d, " "vec_len = %u, len = %d\n, start = %u\n", i, vec_len, len, vec_start); goto bail; } vec_start = 0; }bail: return bio;}/* * Compute the maximum number of sectors the bdev can handle in one bio, * as a power of two. * * Stolen from oracleasm, thanks Joel! */static int compute_max_sectors(struct block_device *bdev){ int max_pages, max_sectors, pow_two_sectors; struct request_queue *q; q = bdev_get_queue(bdev); max_pages = q->max_sectors >> (PAGE_SHIFT - 9); if (max_pages > BIO_MAX_PAGES) max_pages = BIO_MAX_PAGES; if (max_pages > q->max_phys_segments) max_pages = q->max_phys_segments; if (max_pages > q->max_hw_segments) max_pages = q->max_hw_segments; max_pages--; /* Handle I/Os that straddle a page */ max_sectors = max_pages << (PAGE_SHIFT - 9); /* Why is fls() 1-based???? */ pow_two_sectors = 1 << (fls(max_sectors) - 1); return pow_two_sectors;}static inline void o2hb_compute_request_limits(struct o2hb_region *reg, unsigned int num_slots, unsigned int *num_bios, unsigned int *slots_per_bio){ unsigned int max_sectors, io_sectors; max_sectors = compute_max_sectors(reg->hr_bdev); io_sectors = num_slots << (reg->hr_block_bits - 9); *num_bios = (io_sectors + max_sectors - 1) / max_sectors; *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9); mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This " "device can handle %u sectors of I/O\n", io_sectors, num_slots, max_sectors); mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n", *num_bios, *slots_per_bio);}static int o2hb_read_slots(struct o2hb_region *reg, unsigned int max_slots){ unsigned int num_bios, slots_per_bio, start_slot, num_slots; int i, status; struct o2hb_bio_wait_ctxt wc; struct bio **bios; struct bio *bio; o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); if (!bios) { status = -ENOMEM; mlog_errno(status); return status; } o2hb_bio_wait_init(&wc, num_bios); num_slots = slots_per_bio; for(i = 0; i < num_bios; i++) { start_slot = i * slots_per_bio; /* adjust num_slots at last bio */ if (max_slots < (start_slot + num_slots)) num_slots = max_slots - start_slot; bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots); if (IS_ERR(bio)) { o2hb_bio_wait_dec(&wc, num_bios - i); status = PTR_ERR(bio); mlog_errno(status); goto bail_and_wait; } bios[i] = bio; submit_bio(READ, bio); } status = 0;bail_and_wait: o2hb_wait_on_io(reg, &wc); if (wc.wc_error && !status) status = wc.wc_error; if (bios) { for(i = 0; i < num_bios; i++) if (bios[i]) bio_put(bios[i]); kfree(bios); } return status;}static int o2hb_issue_node_write(struct o2hb_region *reg, struct bio **write_bio, struct o2hb_bio_wait_ctxt *write_wc){ int status; unsigned int slot; struct bio *bio; o2hb_bio_wait_init(write_wc, 1); slot = o2nm_this_node(); bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); goto bail; } submit_bio(WRITE, bio); *write_bio = bio; status = 0;bail: return status;}static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, struct o2hb_disk_heartbeat_block *hb_block){ __le32 old_cksum; u32 ret; /* We want to compute the block crc with a 0 value in the * hb_cksum field. Save it off here and replace after the * crc. */ old_cksum = hb_block->hb_cksum; hb_block->hb_cksum = 0; ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); hb_block->hb_cksum = old_cksum; return ret;}static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -