📄 heartbeat.c
字号:
{ mlog(ML_ERROR, "Dump slot information: seq = 0x%"MLFx64", node = %u, " "cksum = 0x%x, generation 0x%"MLFx64"\n", le64_to_cpu(hb_block->hb_seq), hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_generation));}static int o2hb_verify_crc(struct o2hb_region *reg, struct o2hb_disk_heartbeat_block *hb_block){ u32 read, computed; read = le32_to_cpu(hb_block->hb_cksum); computed = o2hb_compute_block_crc_le(reg, hb_block); return read == computed;}/* We want to make sure that nobody is heartbeating on top of us -- * this will help detect an invalid configuration. */static int o2hb_check_last_timestamp(struct o2hb_region *reg){ int node_num, ret; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; node_num = o2nm_this_node(); ret = 1; slot = ®->hr_slots[node_num]; /* Don't check on our 1st timestamp */ if (slot->ds_last_time) { hb_block = slot->ds_raw_block; if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) ret = 0; } return ret;}static inline void o2hb_prepare_block(struct o2hb_region *reg, u64 generation){ int node_num; u64 cputime; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; node_num = o2nm_this_node(); slot = ®->hr_slots[node_num]; hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; memset(hb_block, 0, reg->hr_block_bytes); /* TODO: time stuff */ cputime = CURRENT_TIME.tv_sec; if (!cputime) cputime = 1; hb_block->hb_seq = cpu_to_le64(cputime); hb_block->hb_node = node_num; hb_block->hb_generation = cpu_to_le64(generation); /* This step must always happen last! */ hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, hb_block)); mlog(ML_HB_BIO, "our node generation = 0x%"MLFx64", cksum = 0x%x\n", cpu_to_le64(generation), le32_to_cpu(hb_block->hb_cksum));}static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, struct o2nm_node *node, int idx){ struct list_head *iter; struct o2hb_callback_func *f; list_for_each(iter, &hbcall->list) { f = list_entry(iter, struct o2hb_callback_func, hc_item); mlog(ML_HEARTBEAT, "calling funcs %p\n", f); (f->hc_func)(node, idx, f->hc_data); }}/* Will run the list in order until we process the passed event */static void o2hb_run_event_list(struct o2hb_node_event *queued_event){ int empty; struct o2hb_callback *hbcall; struct o2hb_node_event *event; spin_lock(&o2hb_live_lock); empty = list_empty(&queued_event->hn_item); spin_unlock(&o2hb_live_lock); if (empty) return; /* Holding callback sem assures we don't alter the callback * lists when doing this, and serializes ourselves with other * processes wanting callbacks. */ down_write(&o2hb_callback_sem); spin_lock(&o2hb_live_lock); while (!list_empty(&o2hb_node_events) && !list_empty(&queued_event->hn_item)) { event = list_entry(o2hb_node_events.next, struct o2hb_node_event, hn_item); list_del_init(&event->hn_item); spin_unlock(&o2hb_live_lock); mlog(ML_HEARTBEAT, "Node %s event for %d\n", event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", event->hn_node_num); hbcall = hbcall_from_type(event->hn_event_type); /* We should *never* have gotten on to the list with a * bad type... This isn't something that we should try * to recover from. */ BUG_ON(IS_ERR(hbcall)); o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); spin_lock(&o2hb_live_lock); } spin_unlock(&o2hb_live_lock); up_write(&o2hb_callback_sem);}static void o2hb_queue_node_event(struct o2hb_node_event *event, enum o2hb_callback_type type, struct o2nm_node *node, int node_num){ assert_spin_locked(&o2hb_live_lock); event->hn_event_type = type; event->hn_node = node; event->hn_node_num = node_num; mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); list_add_tail(&event->hn_item, &o2hb_node_events);}static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot){ struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) return; spin_lock(&o2hb_live_lock); if (!list_empty(&slot->ds_live_item)) { mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", slot->ds_node_num); list_del_init(&slot->ds_live_item); if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); } } spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); o2nm_node_put(node);}static int o2hb_check_slot(struct o2hb_region *reg, struct o2hb_disk_slot *slot){ int changed = 0, gen_changed = 0; struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; u64 cputime; memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); /* Is this correct? Do we assume that the node doesn't exist * if we're not configured for him? */ node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) return 0; if (!o2hb_verify_crc(reg, hb_block)) { /* all paths from here will drop o2hb_live_lock for * us. */ spin_lock(&o2hb_live_lock); /* Don't print an error on the console in this case - * a freshly formatted heartbeat area will not have a * crc set on it. */ if (list_empty(&slot->ds_live_item)) goto out; /* The node is live but pushed out a bad crc. We * consider it a transient miss but don't populate any * other values as they may be junk. */ mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", slot->ds_node_num, reg->hr_dev_name); o2hb_dump_slot(hb_block); slot->ds_equal_samples++; goto fire_callbacks; } /* we don't care if these wrap.. the state transitions below * clear at the right places */ cputime = le64_to_cpu(hb_block->hb_seq); if (slot->ds_last_time != cputime) slot->ds_changed_samples++; else slot->ds_equal_samples++; slot->ds_last_time = cputime; /* The node changed heartbeat generations. We assume this to * mean it dropped off but came back before we timed out. We * want to consider it down for the time being but don't want * to lose any changed_samples state we might build up to * considering it live again. */ if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { gen_changed = 1; slot->ds_equal_samples = 0; mlog(ML_HEARTBEAT, "Node %d changed generation (0x%"MLFx64" " "to 0x%"MLFx64")\n", slot->ds_node_num, slot->ds_last_generation, le64_to_cpu(hb_block->hb_generation)); } slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); mlog(ML_HEARTBEAT, "Slot %d gen 0x%"MLFx64" cksum 0x%x " "seq %"MLFu64" last %"MLFu64" changed %u equal %u\n", slot->ds_node_num, slot->ds_last_generation, le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_seq), slot->ds_last_time, slot->ds_changed_samples, slot->ds_equal_samples); spin_lock(&o2hb_live_lock);fire_callbacks: /* dead nodes only come to life after some number of * changes at any time during their dead time */ if (list_empty(&slot->ds_live_item) && slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { mlog(ML_HEARTBEAT, "Node %d (id 0x%"MLFx64") joined my " "region\n", slot->ds_node_num, slot->ds_last_generation); /* first on the list generates a callback */ if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { set_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, slot->ds_node_num); changed = 1; } list_add_tail(&slot->ds_live_item, &o2hb_live_slots[slot->ds_node_num]); slot->ds_equal_samples = 0; goto out; } /* if the list is dead, we're done.. */ if (list_empty(&slot->ds_live_item)) goto out; /* live nodes only go dead after enough consequtive missed * samples.. reset the missed counter whenever we see * activity */ if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { mlog(ML_HEARTBEAT, "Node %d left my region\n", slot->ds_node_num); /* last off the live_slot generates a callback */ list_del_init(&slot->ds_live_item); if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); changed = 1; } /* We don't clear this because the node is still * actually writing new blocks. */ if (!gen_changed) slot->ds_changed_samples = 0; goto out; } if (slot->ds_changed_samples) { slot->ds_changed_samples = 0; slot->ds_equal_samples = 0; }out: spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); o2nm_node_put(node); return changed;}/* This could be faster if we just implmented a find_last_bit, but I * don't think the circumstances warrant it. */static int o2hb_highest_node(unsigned long *nodes, int numbits){ int highest, node; highest = numbits; node = -1; while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { if (node >= numbits) break; highest = node; } return highest;}static void o2hb_do_disk_heartbeat(struct o2hb_region *reg){ int i, ret, highest_node, change = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; if (o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes))) return; highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); if (highest_node >= O2NM_MAX_NODES) { mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); return; } /* No sense in reading the slots of nodes that don't exist * yet. Of course, if the node definitions have holes in them * then we're reading an empty slot anyway... Consider this * best-effort. */ ret = o2hb_read_slots(reg, highest_node + 1); if (ret < 0) { mlog_errno(ret); return; } /* With an up to date view of the slots, we can check that no * other node has been improperly configured to heartbeat in * our slot. */ if (!o2hb_check_last_timestamp(reg)) mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " "in our slot!\n", reg->hr_dev_name); /* fill in the proper info for our next heartbeat */ o2hb_prepare_block(reg, reg->hr_generation); /* And fire off the write. Note that we don't wait on this I/O * until later. */ ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); if (ret < 0) { mlog_errno(ret); return; } i = -1; while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { change |= o2hb_check_slot(reg, ®->hr_slots[i]); } /* * We have to be sure we've advertised ourselves on disk * before we can go to steady state. This ensures that * people we find in our steady state have seen us. */ o2hb_wait_on_io(reg, &write_wc); bio_put(write_bio); o2hb_arm_write_timeout(reg); /* let the person who launched us know when things are steady */ if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { if (atomic_dec_and_test(®->hr_steady_iterations)) wake_up(&o2hb_steady_queue); }}/* Subtract b from a, storing the result in a. a *must* have a larger * value than b. */static void o2hb_tv_subtract(struct timeval *a, struct timeval *b){ /* just return 0 when a is after b */ if (a->tv_sec < b->tv_sec || (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { a->tv_sec = 0; a->tv_usec = 0; return; } a->tv_sec -= b->tv_sec; a->tv_usec -= b->tv_usec; while ( a->tv_usec < 0 ) { a->tv_sec--; a->tv_usec += 1000000; }}static unsigned int o2hb_elapsed_msecs(struct timeval *start, struct timeval *end){ struct timeval res = *end; o2hb_tv_subtract(&res, start); return res.tv_sec * 1000 + res.tv_usec / 1000;}/* * we ride the region ref that the region dir holds. before the region * dir is removed and drops it ref it will wait to tear down this * thread. */static int o2hb_thread(void *data){ int i, ret; struct o2hb_region *reg = data; struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; struct timeval before_hb, after_hb; unsigned int elapsed_msec;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -