📄 heartbeat.c
字号:
o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); o2hb_mlog_blocking(reg, &start, "allocating bios for read"); bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); o2hb_mlog_blocking_done(reg, &start); if (!bios) { status = -ENOMEM; mlog_errno(status); return status; } o2hb_bio_wait_init(&wc, num_bios); num_slots = slots_per_bio; for(i = 0; i < num_bios; i++) { start_slot = i * slots_per_bio; /* adjust num_slots at last bio */ if (max_slots < (start_slot + num_slots)) num_slots = max_slots - start_slot; bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots, 0); if (IS_ERR(bio)) { o2hb_bio_wait_dec(&wc, num_bios - i); status = PTR_ERR(bio); mlog_errno(status); goto bail_and_wait; } bios[i] = bio; o2hb_mlog_blocking(reg, &start, "submit_bio for read"); submit_bio(READ, bio); o2hb_mlog_blocking_done(reg, &start); } status = 0;bail_and_wait: o2hb_mlog_blocking(reg, &start, "waiting for read completion"); o2hb_wait_on_io(reg, &wc); if (wc.wc_error && !status) status = wc.wc_error; o2hb_mlog_blocking_done(reg, &start); if (bios) { for(i = 0; i < num_bios; i++) if (bios[i]) bio_put(bios[i]); kfree(bios); } return status;}static int o2hb_issue_node_write(struct o2hb_region *reg, struct bio **write_bio, struct o2hb_bio_wait_ctxt *write_wc){ int status; unsigned int slot; struct bio *bio; struct timeval start; o2hb_bio_wait_init(write_wc, 1); slot = o2nm_this_node(); bio = o2hb_setup_one_bio(reg, write_wc, slot, 1, 1); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); goto bail; } o2hb_mlog_blocking(reg, &start, "submit_bio for write"); submit_bio(WRITE, bio); o2hb_mlog_blocking_done(reg, &start); *write_bio = bio; status = 0;bail: return status;}static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, struct o2hb_disk_heartbeat_block *hb_block){ __le32 old_cksum; u32 ret; /* We want to compute the block crc with a 0 value in the * hb_cksum field. Save it off here and replace after the * crc. */ old_cksum = hb_block->hb_cksum; hb_block->hb_cksum = 0; ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); hb_block->hb_cksum = old_cksum; return ret;}static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block){ mlog(ML_ERROR, "Dump slot information: seq = 0x%"MLFx64", node = %u, " "cksum = 0x%x, generation 0x%"MLFx64"\n", le64_to_cpu(hb_block->hb_seq), hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_generation));}static int o2hb_verify_crc(struct o2hb_region *reg, struct o2hb_disk_heartbeat_block *hb_block){ u32 read, computed; read = le32_to_cpu(hb_block->hb_cksum); computed = o2hb_compute_block_crc_le(reg, hb_block); return read == computed;}/* We want to make sure that nobody is heartbeating on top of us -- * this will help detect an invalid configuration. */static int o2hb_check_last_timestamp(struct o2hb_region *reg){ int node_num, ret; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; node_num = o2nm_this_node(); ret = 1; slot = ®->hr_slots[node_num]; /* Don't check on our 1st timestamp */ if (slot->ds_last_time) { hb_block = slot->ds_raw_block; if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) ret = 0; } return ret;}static inline void o2hb_prepare_block(struct o2hb_region *reg, u64 generation){ int node_num; u64 cputime; struct o2hb_disk_slot *slot; struct o2hb_disk_heartbeat_block *hb_block; node_num = o2nm_this_node(); slot = ®->hr_slots[node_num]; hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; memset(hb_block, 0, reg->hr_block_bytes); /* TODO: time stuff */ cputime = CURRENT_TIME.tv_sec; if (!cputime) cputime = 1; hb_block->hb_seq = cpu_to_le64(cputime); hb_block->hb_node = node_num; hb_block->hb_generation = cpu_to_le64(generation); hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); /* This step must always happen last! */ hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, hb_block)); mlog(ML_HB_BIO, "our node generation = 0x%"MLFx64", cksum = 0x%x\n", cpu_to_le64(generation), le32_to_cpu(hb_block->hb_cksum));}static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, struct o2nm_node *node, int idx){ struct list_head *iter; struct o2hb_callback_func *f; list_for_each(iter, &hbcall->list) { f = list_entry(iter, struct o2hb_callback_func, hc_item); mlog(ML_HEARTBEAT, "calling funcs %p\n", f); (f->hc_func)(node, idx, f->hc_data); }}/* Will run the list in order until we process the passed event */static void o2hb_run_event_list(struct o2hb_node_event *queued_event){ int empty; struct o2hb_callback *hbcall; struct o2hb_node_event *event; spin_lock(&o2hb_live_lock); empty = list_empty(&queued_event->hn_item); spin_unlock(&o2hb_live_lock); if (empty) return; /* Holding callback sem assures we don't alter the callback * lists when doing this, and serializes ourselves with other * processes wanting callbacks. */ down_write(&o2hb_callback_sem); spin_lock(&o2hb_live_lock); while (!list_empty(&o2hb_node_events) && !list_empty(&queued_event->hn_item)) { event = list_entry(o2hb_node_events.next, struct o2hb_node_event, hn_item); list_del_init(&event->hn_item); spin_unlock(&o2hb_live_lock); mlog(ML_HEARTBEAT, "Node %s event for %d\n", event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", event->hn_node_num); hbcall = hbcall_from_type(event->hn_event_type); /* We should *never* have gotten on to the list with a * bad type... This isn't something that we should try * to recover from. */ BUG_ON(IS_ERR(hbcall)); o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); spin_lock(&o2hb_live_lock); } spin_unlock(&o2hb_live_lock); up_write(&o2hb_callback_sem);}static void o2hb_queue_node_event(struct o2hb_node_event *event, enum o2hb_callback_type type, struct o2nm_node *node, int node_num){ assert_spin_locked(&o2hb_live_lock); event->hn_event_type = type; event->hn_node = node; event->hn_node_num = node_num; mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); list_add_tail(&event->hn_item, &o2hb_node_events);}static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot){ struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) return; spin_lock(&o2hb_live_lock); if (!list_empty(&slot->ds_live_item)) { mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", slot->ds_node_num); list_del_init(&slot->ds_live_item); if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); } } spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); o2nm_node_put(node);}static int o2hb_check_slot(struct o2hb_region *reg, struct o2hb_disk_slot *slot){ int changed = 0, gen_changed = 0; struct o2hb_node_event event = { .hn_item = LIST_HEAD_INIT(event.hn_item), }; struct o2nm_node *node; struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; u64 cputime; unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; unsigned int slot_dead_ms; memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); /* Is this correct? Do we assume that the node doesn't exist * if we're not configured for him? */ node = o2nm_get_node_by_num(slot->ds_node_num); if (!node) return 0; if (!o2hb_verify_crc(reg, hb_block)) { /* all paths from here will drop o2hb_live_lock for * us. */ spin_lock(&o2hb_live_lock); /* Don't print an error on the console in this case - * a freshly formatted heartbeat area will not have a * crc set on it. */ if (list_empty(&slot->ds_live_item)) goto out; /* The node is live but pushed out a bad crc. We * consider it a transient miss but don't populate any * other values as they may be junk. */ mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", slot->ds_node_num, reg->hr_dev_name); o2hb_dump_slot(hb_block); slot->ds_equal_samples++; goto fire_callbacks; } /* we don't care if these wrap.. the state transitions below * clear at the right places */ cputime = le64_to_cpu(hb_block->hb_seq); if (slot->ds_last_time != cputime) slot->ds_changed_samples++; else slot->ds_equal_samples++; slot->ds_last_time = cputime; /* The node changed heartbeat generations. We assume this to * mean it dropped off but came back before we timed out. We * want to consider it down for the time being but don't want * to lose any changed_samples state we might build up to * considering it live again. */ if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { gen_changed = 1; slot->ds_equal_samples = 0; mlog(ML_HEARTBEAT, "Node %d changed generation (0x%"MLFx64" " "to 0x%"MLFx64")\n", slot->ds_node_num, slot->ds_last_generation, le64_to_cpu(hb_block->hb_generation)); } slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); mlog(ML_HEARTBEAT, "Slot %d gen 0x%"MLFx64" cksum 0x%x " "seq %"MLFu64" last %"MLFu64" changed %u equal %u\n", slot->ds_node_num, slot->ds_last_generation, le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_seq), slot->ds_last_time, slot->ds_changed_samples, slot->ds_equal_samples); spin_lock(&o2hb_live_lock);fire_callbacks: /* dead nodes only come to life after some number of * changes at any time during their dead time */ if (list_empty(&slot->ds_live_item) && slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { mlog(ML_HEARTBEAT, "Node %d (id 0x%"MLFx64") joined my " "region\n", slot->ds_node_num, slot->ds_last_generation); /* first on the list generates a callback */ if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { set_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, slot->ds_node_num); changed = 1; } list_add_tail(&slot->ds_live_item, &o2hb_live_slots[slot->ds_node_num]); slot->ds_equal_samples = 0; /* We want to be sure that all nodes agree on the * number of milliseconds before a node will be * considered dead. The self-fencing timeout is * computed from this value, and a discrepancy might * result in heartbeat calling a node dead when it * hasn't self-fenced yet. */ slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); if (slot_dead_ms && slot_dead_ms != dead_ms) { /* TODO: Perhaps we can fail the region here. */ mlog(ML_ERROR, "Node %d on device %s has a dead count " "of %u ms, but our count is %u ms.\n" "Please double check your configuration values " "for 'O2CB_HEARTBEAT_THRESHOLD'\n", slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, dead_ms); } goto out; } /* if the list is dead, we're done.. */ if (list_empty(&slot->ds_live_item)) goto out; /* live nodes only go dead after enough consequtive missed * samples.. reset the missed counter whenever we see * activity */ if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { mlog(ML_HEARTBEAT, "Node %d left my region\n", slot->ds_node_num); /* last off the live_slot generates a callback */ list_del_init(&slot->ds_live_item); if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, slot->ds_node_num); changed = 1; } /* We don't clear this because the node is still * actually writing new blocks. */ if (!gen_changed) slot->ds_changed_samples = 0; goto out; } if (slot->ds_changed_samples) { slot->ds_changed_samples = 0; slot->ds_equal_samples = 0; }out: spin_unlock(&o2hb_live_lock); o2hb_run_event_list(&event); o2nm_node_put(node); return changed;}/* This could be faster if we just implmented a find_last_bit, but I * don't think the circumstances warrant it. */static int o2hb_highest_node(unsigned long *nodes, int numbits){ int highest, node; highest = numbits; node = -1; while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { if (node >= numbits) break; highest = node; } return highest;}static int o2hb_do_disk_heartbeat(struct o2hb_region *reg){ int i, ret, highest_node, change = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; struct timeval start; ret = o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes)); if (ret) { mlog_errno(ret); return ret; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -