📄 heartbeat.c
字号:
highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); if (highest_node >= O2NM_MAX_NODES) { mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); return -EINVAL; } /* No sense in reading the slots of nodes that don't exist * yet. Of course, if the node definitions have holes in them * then we're reading an empty slot anyway... Consider this * best-effort. */ ret = o2hb_read_slots(reg, highest_node + 1); if (ret < 0) { mlog_errno(ret); return ret; } /* With an up to date view of the slots, we can check that no * other node has been improperly configured to heartbeat in * our slot. */ if (!o2hb_check_last_timestamp(reg)) mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " "in our slot!\n", reg->hr_dev_name); /* fill in the proper info for our next heartbeat */ o2hb_prepare_block(reg, reg->hr_generation); /* And fire off the write. Note that we don't wait on this I/O * until later. */ ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); if (ret < 0) { mlog_errno(ret); return ret; } o2hb_mlog_blocking(reg, &start, "checking slots"); i = -1; while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { change |= o2hb_check_slot(reg, ®->hr_slots[i]); } o2hb_mlog_blocking_done(reg, &start); /* * We have to be sure we've advertised ourselves on disk * before we can go to steady state. This ensures that * people we find in our steady state have seen us. */ o2hb_mlog_blocking(reg, &start, "waiting for write completion"); o2hb_wait_on_io(reg, &write_wc); o2hb_mlog_blocking_done(reg, &start); bio_put(write_bio); if (write_wc.wc_error) { /* Do not re-arm the write timeout on I/O error - we * can't be sure that the new block ever made it to * disk */ mlog(ML_ERROR, "Write error %d on device \"%s\"\n", write_wc.wc_error, reg->hr_dev_name); return write_wc.wc_error; } o2hb_arm_write_timeout(reg); /* let the person who launched us know when things are steady */ if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { if (atomic_dec_and_test(®->hr_steady_iterations)) wake_up(&o2hb_steady_queue); } return 0;}/* Subtract b from a, storing the result in a. a *must* have a larger * value than b. */static void o2hb_tv_subtract(struct timeval *a, struct timeval *b){ /* just return 0 when a is after b */ if (a->tv_sec < b->tv_sec || (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { a->tv_sec = 0; a->tv_usec = 0; return; } a->tv_sec -= b->tv_sec; a->tv_usec -= b->tv_usec; while ( a->tv_usec < 0 ) { a->tv_sec--; a->tv_usec += 1000000; }}static unsigned int o2hb_elapsed_msecs(struct timeval *start, struct timeval *end){ struct timeval res = *end; o2hb_tv_subtract(&res, start); return res.tv_sec * 1000 + res.tv_usec / 1000;}/* * we ride the region ref that the region dir holds. before the region * dir is removed and drops it ref it will wait to tear down this * thread. */static int o2hb_thread(void *data){ int i, ret; struct o2hb_region *reg = data; struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; struct timeval before_hb, after_hb; unsigned int elapsed_msec; mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); set_user_nice(current, -20); while (!kthread_should_stop() && !reg->hr_unclean_stop) { /* We track the time spent inside * o2hb_do_disk_heartbeat so that we avoid more then * hr_timeout_ms between disk writes. On busy systems * this should result in a heartbeat which is less * likely to time itself out. */ do_gettimeofday(&before_hb); i = 0; do { ret = o2hb_do_disk_heartbeat(reg); } while (ret && ++i < 2); do_gettimeofday(&after_hb); elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", before_hb.tv_sec, before_hb.tv_usec, after_hb.tv_sec, after_hb.tv_usec, elapsed_msec); if (elapsed_msec < reg->hr_timeout_ms) { struct timeval start; /* the kthread api has blocked signals for us so no * need to record the return value. */ o2hb_mlog_blocking(reg, &start, "msleep"); msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); o2hb_mlog_blocking_done(reg, &start); } } o2hb_disarm_write_timeout(reg); /* unclean stop is only used in very bad situation */ for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) o2hb_shutdown_slot(®->hr_slots[i]); /* Explicit down notification - avoid forcing the other nodes * to timeout on this region when we could just as easily * write a clear generation - thus indicating to them that * this node has left this region. * * XXX: Should we skip this on unclean_stop? */ o2hb_prepare_block(reg, 0); ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); if (ret == 0) { o2hb_wait_on_io(reg, &write_wc); bio_put(write_bio); } else { mlog_errno(ret); } mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); return 0;}void o2hb_init(void){ int i; for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) INIT_LIST_HEAD(&o2hb_callbacks[i].list); for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) INIT_LIST_HEAD(&o2hb_live_slots[i]); INIT_LIST_HEAD(&o2hb_node_events); memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));}/* if we're already in a callback then we're already serialized by the sem */void o2hb_fill_node_map_from_callback(unsigned long *map, unsigned bytes){ BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); memcpy(map, &o2hb_live_node_bitmap, bytes);}/* * get a map of all nodes that are heartbeating in any regions */void o2hb_fill_node_map(unsigned long *map, unsigned bytes){ /* callers want to serialize this map and callbacks so that they * can trust that they don't miss nodes coming to the party */ down_read(&o2hb_callback_sem); spin_lock(&o2hb_live_lock); o2hb_fill_node_map_from_callback(map, bytes); spin_unlock(&o2hb_live_lock); up_read(&o2hb_callback_sem);}EXPORT_SYMBOL_GPL(o2hb_fill_node_map);/* * heartbeat configfs bits. The heartbeat set is a default set under * the cluster set in nodemanager.c. */static struct o2hb_region *to_o2hb_region(struct config_item *item){ return item ? container_of(item, struct o2hb_region, hr_item) : NULL;}/* drop_item only drops its ref after killing the thread, nothing should * be using the region anymore. this has to clean up any state that * attributes might have built up. */static void o2hb_region_release(struct config_item *item){ int i; struct page *page; struct o2hb_region *reg = to_o2hb_region(item); if (reg->hr_tmp_block) kfree(reg->hr_tmp_block); if (reg->hr_slot_data) { for (i = 0; i < reg->hr_num_pages; i++) { page = reg->hr_slot_data[i]; if (page) __free_page(page); } kfree(reg->hr_slot_data); } if (reg->hr_bdev) blkdev_put(reg->hr_bdev); if (reg->hr_slots) kfree(reg->hr_slots); spin_lock(&o2hb_live_lock); list_del(®->hr_all_item); spin_unlock(&o2hb_live_lock); kfree(reg);}static int o2hb_read_block_input(struct o2hb_region *reg, const char *page, size_t count, unsigned long *ret_bytes, unsigned int *ret_bits){ unsigned long bytes; char *p = (char *)page; bytes = simple_strtoul(p, &p, 0); if (!p || (*p && (*p != '\n'))) return -EINVAL; /* Heartbeat and fs min / max block sizes are the same. */ if (bytes > 4096 || bytes < 512) return -ERANGE; if (hweight16(bytes) != 1) return -EINVAL; if (ret_bytes) *ret_bytes = bytes; if (ret_bits) *ret_bits = ffs(bytes) - 1; return 0;}static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, char *page){ return sprintf(page, "%u\n", reg->hr_block_bytes);}static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, const char *page, size_t count){ int status; unsigned long block_bytes; unsigned int block_bits; if (reg->hr_bdev) return -EINVAL; status = o2hb_read_block_input(reg, page, count, &block_bytes, &block_bits); if (status) return status; reg->hr_block_bytes = (unsigned int)block_bytes; reg->hr_block_bits = block_bits; return count;}static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, char *page){ return sprintf(page, "%llu\n", reg->hr_start_block);}static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, const char *page, size_t count){ unsigned long long tmp; char *p = (char *)page; if (reg->hr_bdev) return -EINVAL; tmp = simple_strtoull(p, &p, 0); if (!p || (*p && (*p != '\n'))) return -EINVAL; reg->hr_start_block = tmp; return count;}static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, char *page){ return sprintf(page, "%d\n", reg->hr_blocks);}static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, const char *page, size_t count){ unsigned long tmp; char *p = (char *)page; if (reg->hr_bdev) return -EINVAL; tmp = simple_strtoul(p, &p, 0); if (!p || (*p && (*p != '\n'))) return -EINVAL; if (tmp > O2NM_MAX_NODES || tmp == 0) return -ERANGE; reg->hr_blocks = (unsigned int)tmp; return count;}static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, char *page){ unsigned int ret = 0; if (reg->hr_bdev) ret = sprintf(page, "%s\n", reg->hr_dev_name); return ret;}static void o2hb_init_region_params(struct o2hb_region *reg){ reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", reg->hr_start_block, reg->hr_blocks); mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", reg->hr_block_bytes, reg->hr_block_bits); mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);}static int o2hb_map_slot_data(struct o2hb_region *reg){ int i, j; unsigned int last_slot; unsigned int spp = reg->hr_slots_per_page; struct page *page; char *raw; struct o2hb_disk_slot *slot; reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); if (reg->hr_tmp_block == NULL) { mlog_errno(-ENOMEM); return -ENOMEM; } reg->hr_slots = kcalloc(reg->hr_blocks, sizeof(struct o2hb_disk_slot), GFP_KERNEL); if (reg->hr_slots == NULL) { mlog_errno(-ENOMEM); return -ENOMEM; } for(i = 0; i < reg->hr_blocks; i++) { slot = ®->hr_slots[i]; slot->ds_node_num = i; INIT_LIST_HEAD(&slot->ds_live_item); slot->ds_raw_block = NULL; } reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " "at %u blocks per page\n", reg->hr_num_pages, reg->hr_blocks, spp); reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), GFP_KERNEL); if (!reg->hr_slot_data) { mlog_errno(-ENOMEM); return -ENOMEM; } for(i = 0; i < reg->hr_num_pages; i++) { page = alloc_page(GFP_KERNEL); if (!page) { mlog_errno(-ENOMEM); return -ENOMEM; } reg->hr_slot_data[i] = page; last_slot = i * spp; raw = page_address(page); for (j = 0; (j < spp) && ((j + last_slot) < reg->hr_blocks); j++) { BUG_ON((j + last_slot) >= reg->hr_blocks); slot = ®->hr_slots[j + last_slot]; slot->ds_raw_block = (struct o2hb_disk_heartbeat_block *) raw; raw += reg->hr_block_bytes; } } return 0;}/* Read in all the slots available and populate the tracking * structures so that we can start with a baseline idea of what's * there. */static int o2hb_populate_slot_data(struct o2hb_region *reg){ int ret, i; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; mlog_entry_void(); ret = o2hb_read_slots(reg, reg->hr_blocks); if (ret) { mlog_errno(ret); goto out; } /* We only want to get an idea of the values initially in each * slot, so we do no verification - o2hb_check_slot will * actually determine if each configured slot is valid and * whether any values have changed. */ for(i = 0; i < reg->hr_blocks; i++) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -