📄 heartbeat.c
字号:
down_write(&o2hb_callback_sem); spin_lock(&o2hb_live_lock); while (!list_empty(&o2hb_node_events) && !list_empty(&queued_event->hn_item)) { event = list_entry(o2hb_node_events.next, struct o2hb_node_event, hn_item); list_del_init(&event->hn_item); spin_unlock(&o2hb_live_lock); mlog(ML_HEARTBEAT, "Node %s event for %d\n", event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", event->hn_node_num); hbcall = hbcall_from_type(event->hn_event_type); /* We should *never* have gotten on to the list with a * bad type... This isn't something that we should try * to recover from. */ BUG_ON(IS_ERR(hbcall)); o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); spin_lock(&o2hb_live_lock); } spin_unlock(&o2hb_live_lock); up_write(&o2hb_callback_sem);}static void o2hb_queue_node_event(struct o2hb_node_event *event, enum o2hb_callback_type type, struct o2nm_node *node, int node_num){ assert_spin_locked(&o2hb_live_lock); event->hn_event_type = type; event->hn_node = node; event->hn_node_num = node_num; mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); list_add_tail(&event->hn_item, &o2hb_node_events);}static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot){ struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) return; spin_lock(&o2hb_live_lock); if (!list_empty(&slot->ds_live_item)) { mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", slot->ds_node_num); list_del_init(&slot->ds_live_item); if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); } } spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); o2nm_node_put(node);}static int o2hb_check_slot(struct o2hb_region *reg, struct o2hb_disk_slot *slot){ int changed = 0, gen_changed = 0; struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; u64 cputime; unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; unsigned int slot_dead_ms; memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); /* Is this correct? Do we assume that the node doesn't exist * if we're not configured for him? */ node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) return 0; if (!o2hb_verify_crc(reg, hb_block)) { /* all paths from here will drop o2hb_live_lock for * us. */ spin_lock(&o2hb_live_lock); /* Don't print an error on the console in this case - * a freshly formatted heartbeat area will not have a * crc set on it. */ if (list_empty(&slot->ds_live_item)) goto out; /* The node is live but pushed out a bad crc. We * consider it a transient miss but don't populate any * other values as they may be junk. */ mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", slot->ds_node_num, reg->hr_dev_name); o2hb_dump_slot(hb_block); slot->ds_equal_samples++; goto fire_callbacks; } /* we don't care if these wrap.. the state transitions below * clear at the right places */ cputime = le64_to_cpu(hb_block->hb_seq); if (slot->ds_last_time != cputime) slot->ds_changed_samples++; else slot->ds_equal_samples++; slot->ds_last_time = cputime; /* The node changed heartbeat generations. We assume this to * mean it dropped off but came back before we timed out. We * want to consider it down for the time being but don't want * to lose any changed_samples state we might build up to * considering it live again. */ if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { gen_changed = 1; slot->ds_equal_samples = 0; mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " "to 0x%llx)\n", slot->ds_node_num, (long long)slot->ds_last_generation, (long long)le64_to_cpu(hb_block->hb_generation)); } slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " "seq %llu last %llu changed %u equal %u\n", slot->ds_node_num, (long long)slot->ds_last_generation, le32_to_cpu(hb_block->hb_cksum), (unsigned long long)le64_to_cpu(hb_block->hb_seq), (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, slot->ds_equal_samples); spin_lock(&o2hb_live_lock);fire_callbacks: /* dead nodes only come to life after some number of * changes at any time during their dead time */ if (list_empty(&slot->ds_live_item) && slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", slot->ds_node_num, (long long)slot->ds_last_generation); /* first on the list generates a callback */ if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { set_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, slot->ds_node_num); changed = 1; } list_add_tail(&slot->ds_live_item, &o2hb_live_slots[slot->ds_node_num]); slot->ds_equal_samples = 0; /* We want to be sure that all nodes agree on the * number of milliseconds before a node will be * considered dead. The self-fencing timeout is * computed from this value, and a discrepancy might * result in heartbeat calling a node dead when it * hasn't self-fenced yet. */ slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); if (slot_dead_ms && slot_dead_ms != dead_ms) { /* TODO: Perhaps we can fail the region here. */ mlog(ML_ERROR, "Node %d on device %s has a dead count " "of %u ms, but our count is %u ms.\n" "Please double check your configuration values " "for 'O2CB_HEARTBEAT_THRESHOLD'\n", slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, dead_ms); } goto out; } /* if the list is dead, we're done.. */ if (list_empty(&slot->ds_live_item)) goto out; /* live nodes only go dead after enough consequtive missed * samples.. reset the missed counter whenever we see * activity */ if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { mlog(ML_HEARTBEAT, "Node %d left my region\n", slot->ds_node_num); /* last off the live_slot generates a callback */ list_del_init(&slot->ds_live_item); if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); changed = 1; } /* We don't clear this because the node is still * actually writing new blocks. */ if (!gen_changed) slot->ds_changed_samples = 0; goto out; } if (slot->ds_changed_samples) { slot->ds_changed_samples = 0; slot->ds_equal_samples = 0; }out: spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); o2nm_node_put(node); return changed;}/* This could be faster if we just implmented a find_last_bit, but I * don't think the circumstances warrant it. */static int o2hb_highest_node(unsigned long *nodes, int numbits){ int highest, node; highest = numbits; node = -1; while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { if (node >= numbits) break; highest = node; } return highest;}static int o2hb_do_disk_heartbeat(struct o2hb_region *reg){ int i, ret, highest_node, change = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct o2hb_bio_wait_ctxt write_wc; ret = o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes)); if (ret) { mlog_errno(ret); return ret; } highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); if (highest_node >= O2NM_MAX_NODES) { mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); return -EINVAL; } /* No sense in reading the slots of nodes that don't exist * yet. Of course, if the node definitions have holes in them * then we're reading an empty slot anyway... Consider this * best-effort. */ ret = o2hb_read_slots(reg, highest_node + 1); if (ret < 0) { mlog_errno(ret); return ret; } /* With an up to date view of the slots, we can check that no * other node has been improperly configured to heartbeat in * our slot. */ if (!o2hb_check_last_timestamp(reg)) mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " "in our slot!\n", reg->hr_dev_name); /* fill in the proper info for our next heartbeat */ o2hb_prepare_block(reg, reg->hr_generation); /* And fire off the write. Note that we don't wait on this I/O * until later. */ ret = o2hb_issue_node_write(reg, &write_wc); if (ret < 0) { mlog_errno(ret); return ret; } i = -1; while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { change |= o2hb_check_slot(reg, ®->hr_slots[i]); } /* * We have to be sure we've advertised ourselves on disk * before we can go to steady state. This ensures that * people we find in our steady state have seen us. */ o2hb_wait_on_io(reg, &write_wc); if (write_wc.wc_error) { /* Do not re-arm the write timeout on I/O error - we * can't be sure that the new block ever made it to * disk */ mlog(ML_ERROR, "Write error %d on device \"%s\"\n", write_wc.wc_error, reg->hr_dev_name); return write_wc.wc_error; } o2hb_arm_write_timeout(reg); /* let the person who launched us know when things are steady */ if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { if (atomic_dec_and_test(®->hr_steady_iterations)) wake_up(&o2hb_steady_queue); } return 0;}/* Subtract b from a, storing the result in a. a *must* have a larger * value than b. */static void o2hb_tv_subtract(struct timeval *a, struct timeval *b){ /* just return 0 when a is after b */ if (a->tv_sec < b->tv_sec || (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { a->tv_sec = 0; a->tv_usec = 0; return; } a->tv_sec -= b->tv_sec; a->tv_usec -= b->tv_usec; while ( a->tv_usec < 0 ) { a->tv_sec--; a->tv_usec += 1000000; }}static unsigned int o2hb_elapsed_msecs(struct timeval *start, struct timeval *end){ struct timeval res = *end; o2hb_tv_subtract(&res, start); return res.tv_sec * 1000 + res.tv_usec / 1000;}/* * we ride the region ref that the region dir holds. before the region * dir is removed and drops it ref it will wait to tear down this * thread. */static int o2hb_thread(void *data){ int i, ret; struct o2hb_region *reg = data; struct o2hb_bio_wait_ctxt write_wc; struct timeval before_hb, after_hb; unsigned int elapsed_msec; mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); set_user_nice(current, -20); while (!kthread_should_stop() && !reg->hr_unclean_stop) { /* We track the time spent inside * o2hb_do_disk_heartbeat so that we avoid more then * hr_timeout_ms between disk writes. On busy systems * this should result in a heartbeat which is less * likely to time itself out. */ do_gettimeofday(&before_hb); i = 0; do { ret = o2hb_do_disk_heartbeat(reg); } while (ret && ++i < 2); do_gettimeofday(&after_hb); elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", before_hb.tv_sec, (unsigned long) before_hb.tv_usec, after_hb.tv_sec, (unsigned long) after_hb.tv_usec, elapsed_msec); if (elapsed_msec < reg->hr_timeout_ms) { /* the kthread api has blocked signals for us so no * need to record the return value. */ msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); } } o2hb_disarm_write_timeout(reg); /* unclean stop is only used in very bad situation */ for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) o2hb_shutdown_slot(®->hr_slots[i]); /* Explicit down notification - avoid forcing the other nodes * to timeout on this region when we could just as easily * write a clear generation - thus indicating to them that * this node has left this region. * * XXX: Should we skip this on unclean_stop? */ o2hb_prepare_block(reg, 0); ret = o2hb_issue_node_write(reg, &write_wc); if (ret == 0) { o2hb_wait_on_io(reg, &write_wc); } else { mlog_errno(ret); } mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); return 0;}void o2hb_init(void){ int i; for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) INIT_LIST_HEAD(&o2hb_callbacks[i].list); for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) INIT_LIST_HEAD(&o2hb_live_slots[i]); INIT_LIST_HEAD(&o2hb_node_events); memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));}/* if we're already in a callback then we're already serialized by the sem */static void o2hb_fill_node_map_from_callback(unsigned long *map, unsigned bytes){ BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); memcpy(map, &o2hb_live_node_bitmap, bytes);}/* * get a map of all nodes that are heartbeating in any regions */void o2hb_fill_node_map(unsigned long *map, unsigned bytes){ /* callers want to serialize this map and callbacks so that they * can trust that they don't miss nodes coming to the party */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -