📄 onehop.c
字号:
/* * Copyright (c) 2003-2005 [Anjali Gupta] * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *//* jy: (main additions) * all doRPC and xRPC have been changed to fd_xRPC that does failure detection * (for a lossy/congested network) * for every lookup, check its correctness against global state * to fix(?) currently failure_detection is done with the failure of one packet */#include "onehop.h"#include "consistenthash.h"#include "observers/onehopobserver.h"#include <iostream>#define TESTPRINT(me,he,tag) if (me.ip==1159 && he.ip==2047) printf("WUWU now %llu tag %s\n", now(),tag);#define DEBUG_MSG(n,msg,s) if (p2psim_verbose && n.ip == OneHop::debug_node) printf("%llu %u,%qx %s knows debug from sender %u,%qx\n",now(), me.ip, me.id, msg, s.ip, s.id);#define MAX_IDS_MSG 295#define MAX_LOOKUP_TIME 4000//XXX:Strange bug - delaycb gives join a 0ed argumentlong OneHop::lookups = 0;long OneHop::failed = 0;long OneHop::two_failed = 0;long OneHop::old_lookups = 0;long OneHop::old_failed = 0;long OneHop::old_two_failed = 0;long OneHop::num_nodes = 0;long OneHop::joins = 0;long OneHop::crashes = 0;long OneHop::same_lookups = 0;long OneHop::same_failed = 0;Time OneHop::act_interslice = 0;Time OneHop::tot_interslice = 0;long OneHop::nonempty_outers = 0;long OneHop::nonempty_leaders = 0;Time OneHop::act_intraslice = 0;Time OneHop::tot_intraslice = 0;Time OneHop::total_empty = 0;Time OneHop::total_count = 0;Time OneHop::exp_intraslice = 0;bw OneHop::bandwidth = 0;bw OneHop::leader_bandwidth = 0;bw OneHop::messages = 0;bw OneHop::leader_messages = 0;bw OneHop::lookup_bandwidth = 0;bw OneHop::old_bandwidth = 0;bw OneHop::old_lookup_bandwidth = 0;bw OneHop::old_leader_bandwidth = 0;unsigned OneHop::start = 0;Time OneHop::old_time = 0;int OneHop::_publish_time = -1; //jy: control how frequently one publishes statisticsunsigned OneHop::num_violations = 0; //jy: anjali sends too many ids in one message, calculate how many violations there arevector<double> OneHop::sliceleader_bw_avg;unsigned OneHop::debug_node = 1;OneHop::OneHop(IPAddress i , Args& a) : P2Protocol(i){ _k = a.nget<uint>("slices",5,10); //number of units -- must be an odd number _u = a.nget<uint>("units",5,10); loctable = New OneHopLocTable(_k, _u); me.id = id(); me.ip = ip(); loctable->add_node(me); retries = 0; _stab_timer = a.nget<uint>("stab",1000,10); leader_log.clear(); outer_log.clear(); _join_complete = false; _retry_timer = 1000; prev_slice_leader = false; _to_multiplier = 3; //jy doRPC suffer from timeout if the dst is dead, the min of this value = 3 // slice_size = 0xffffffffffffffff / _k; slice_size = ((ConsistentHash::CHID)-1)/ _k; //lookups = 0; //failed = 0; _leaderstab_running = false; low_to_high.clear(); high_to_low.clear(); sent_low = false; sent_high = false; if (me.ip == 1) OneHop::sliceleader_bw_avg.clear(); last_stabilize = 0; _wkn.ip = 0;}voidOneHop::record_stat(IPAddress src, IPAddress dst, uint type, uint num_ids, uint num_else){ assert(type <= 5); // printf("now %llu warning: %u sending too much %u\n", now(), me.ip, num_ids); //assert(num_ids < 300); if (num_ids > 300) { OneHop::num_violations += (num_ids/300); Node::record_bw_stat(type,num_ids,40 * (num_ids/300) + num_else); Node::record_inout_bw_stat(src,dst,num_ids,40 * (num_ids/300) + num_else); }else{ Node::record_bw_stat(type,num_ids,num_else); Node::record_inout_bw_stat(src,dst,num_ids,num_else); }}//jy: check correctness of lookupsboolOneHop::check_correctness(CHID k, IDMap n){ vector<IDMap> ids = OneHopObserver::Instance(NULL)->get_sorted_nodes(); IDMap tmp; tmp.id = k; uint idsz = ids.size(); uint pos = upper_bound(ids.begin(), ids.end(), tmp, OneHop::IDMap::cmp) - ids.begin(); if (now() == 4227297 && n.ip == 504) fprintf(stderr,"shit!\n"); while (1) { if (pos >= idsz) pos = 0; IDMap hehe = ids[pos]; if (Network::Instance()->alive(ids[pos].ip)) break; pos++; } DEBUG(4) << now() << ":" << ip() << "," << printID(id()) << ":key " << printID(k) << " correct? " << (ids[pos].ip==n.ip?1:0) << " reply " << n.ip << "," << printID(n.id) << " real " << ids[pos].ip << "," << printID(ids[pos].id) << endl; if (ids[pos].ip == n.ip) return true; else return false;}voidOneHop::lookup(Args *args){ if (!alive()) return; lookup_internal_args *la = New lookup_internal_args; la->k = args->nget<CHID>("key"); la->start_time = now(); la->hops = 0; la->timeouts = 0; la->timeout_lat = 0; la->attempts = 0; lookup_internal(la);}voidOneHop::lookup_internal(lookup_internal_args *la) { la->attempts++; if (!alive() || now()-la->start_time > MAX_LOOKUP_TIME) { record_lookup_stat(me.ip, me.ip, now()-la->start_time, false, false, la->hops,la->timeouts,la->timeout_lat); delete la; return; } if (!_join_complete) { DEBUG(1) << now() << ":" << me.ip << ","<< printID(me.id) << ": failed at time "<<now()<< "coz join is incomplete last_join time " << last_join << endl; delaycb(1000, &OneHop::lookup_internal, la); /* record_lookup_stat(me.ip, me.ip, now()-la->start_time, false, false, la->hops,la->timeouts,la->timeout_lat); delete la; */ return; } //DEBUG(1) << ip() << ":(slice " << slice(id()) << "): Lookup for " << k << "(slice " << slice(k) << ")" << endl; lookup_args a; lookup_ret r; a.key = la->k; a.sender = me; a.dead_nodes.clear(); IDMap succ_node = me; while ((now()-la->start_time) < MAX_LOOKUP_TIME) { succ_node = loctable->succ(la->k); assert(succ_node.ip); record_stat(me.ip,succ_node.ip,TYPE_USER_LOOKUP,2); bool ok = doRPC(succ_node.ip,&OneHop::lookup_handler,&a,&r, TIMEOUT(me.ip,succ_node.ip)); if (ok) record_stat(succ_node.ip,me.ip,TYPE_USER_LOOKUP,r.is_owner?0:1,1); if (me.ip!=succ_node.ip) la->hops++; if (!alive()) break; DEBUG(1) << now() << ":" << me.ip << "," << printID(me.id) << " lookup done from " << succ_node.ip << "/" << printID(succ_node.id) << " for key " << printID(a.key) << ", ok: " << ok << ", is_owner: " << r.is_owner; if( !r.is_owner ) { DEBUG(1) << ", correct_owner=" << printID(r.correct_owner.id); } DEBUG(1) << endl; if (ok) { if (r.is_owner) { break; }else{ DEBUG_MSG(r.correct_owner, "lookup", succ_node); loctable->add_node(r.correct_owner); } }else{ la->timeouts++; la->timeout_lat += TIMEOUT(me.ip, succ_node.ip); loctable->del_node(succ_node.id); test_inform_dead_args *aa = New test_inform_dead_args; aa->suspect= succ_node; aa->informed = me; aa->justdelete = true; delaycb(0,&OneHop::test_dead_inform,aa); //return; // please don't tell us about this guy anymore, he sucks a.dead_nodes.push_back(succ_node.id); } } if (!alive()) { delete la; } else if ((now()-la->start_time) > MAX_LOOKUP_TIME) { record_lookup_stat(me.ip, me.ip, now()-la->start_time, false, false, la->hops,la->timeouts,la->timeout_lat); delete la; } else if (check_correctness(la->k,succ_node)) { record_lookup_stat(me.ip, succ_node.ip, now()-la->start_time, true, true, la->hops,la->timeouts,la->timeout_lat); delete la; }else{ record_lookup_stat(me.ip, succ_node.ip, now()-la->start_time, true, false, la->hops,la->timeouts,la->timeout_lat); delete la; //delaycb(100,&OneHop::lookup_internal, la); }}/* anjali's old lookup procedure it only tries two hops and declare failure why not try more hops?? voidOneHop::lookup(Args *args) { if (!alive()) return; if (!_join_complete) { //jy: this is a failure record_lookup_stat(me.ip, me.ip, 0, false, false, 1, 0, 0); return; } Time before0 = now(); //jy: record the start of lookup lookups++; CHID k = args->nget<CHID>("key"); bool same = (slice(k) == slice(me.id)); if (same) same_lookups++; //DEBUG(1) << ip() << ":(slice " << slice(id()) << "): Lookup for " << k << "(slice " << slice(k) << ")" << endl; assert(k); lookup_args *a = New lookup_args; lookup_ret r; a->key = k; a->sender = me; IDMap succ_node = loctable->succ(k); assert(succ_node.ip); record_stat(TYPE_USER_LOOKUP,2); bool ok = doRPC(succ_node.ip,&OneHop::lookup_handler,a,&r); if (ok) record_stat(TYPE_USER_LOOKUP,1); Time before1 = now(); //jy: record the end of first lookup hop if (!alive()) goto LOOKUP_DONE; lookup_bandwidth += 24; if (!ok) { failed++; two_failed++; if (same) same_failed++; DEBUG(5) << ip() << ":Lookup failed due to dead node "<< succ_node.ip << endl; IDMap *n = New IDMap(); *n = succ_node; assert(n->ip>0); delaycb(0, &OneHop::test_dead, n); IDMap succ_node = loctable->succ(succ_node.id+1); record_stat(TYPE_USER_LOOKUP,2); bool ok = doRPC(succ_node.ip, &OneHop::lookup_handler, a, &r); if (!alive()) goto LOOKUP_DONE; if (ok) record_stat(TYPE_USER_LOOKUP,1); lookup_bandwidth += 24; if (!ok) { IDMap *nn = New IDMap; *nn = succ_node; assert(nn->ip>0); delaycb(0,&OneHop::test_dead,nn); } else if (r.is_owner) { if (check_correctness(k,succ_node)) record_lookup_stat(me.ip, succ_node.ip, now()-before0, true, true, 2, 1, now()-before1); //jy: record lookup stat else record_lookup_stat(me.ip, succ_node.ip, now()-before0, false, false, 2, 1, now()-before1); //jy: record lookup stat two_failed--; } } else { if (!r.is_owner) { failed++; two_failed++; lookup_bandwidth += 20; if (same) same_failed++; DEBUG(5) << ip() << ":Lookup failed:"<< succ_node.ip << "(" << succ_node.id << ") thinks " << r.correct_owner.ip << "(" << r.correct_owner.id << ")" << endl; IDMap corr_owner = r.correct_owner; record_stat(TYPE_USER_LOOKUP,1); bool ok = doRPC(corr_owner.ip, &OneHop::lookup_handler, a, &r); if (ok) record_stat(TYPE_USER_LOOKUP, 1); if (!alive()) goto LOOKUP_DONE; lookup_bandwidth += 48; if (ok && r.is_owner) { two_failed--; loctable->add_node(corr_owner); LogEntry *e = New LogEntry(corr_owner, ALIVE, now()); leader_log.push_back(*e); delete e; if (check_correctness(k,corr_owner)) record_lookup_stat(me.ip, corr_owner.ip, now()-before0, true, true, 2, 0, 0); //jy: record lookup stat else record_lookup_stat(me.ip, corr_owner.ip, now()-before0, false, false, 2, 0, 0); //jy: record lookup stat }else if (!ok) { record_lookup_stat(me.ip, corr_owner.ip, now()-before0, false, false, 2, 1, now()-before1); //jy: record lookup stat }else { //ok && !r.is_owner record_lookup_stat(me.ip, corr_owner.ip, now()-before0, false, false, 2, 0, 0); //jy: record lookup stat } }else{ if (check_correctness(k,succ_node)) record_lookup_stat(me.ip, succ_node.ip, now()-before0, true, true, 1, 0, 0); //jy: record lookup stat else record_lookup_stat(me.ip, succ_node.ip, now()-before0, false, false, 1, 0, 0); //jy: record lookup stat } }LOOKUP_DONE: delete a;}*/void OneHop::lookup_handler(lookup_args *a, lookup_ret *r) { for( int i = 0; i < a->dead_nodes.size(); i++ ) { loctable->del_node(a->dead_nodes[i]); } IDMap succ_node = loctable->succ(a->sender.id); if (succ_node.id != a->sender.id) { if (!alive()) return; DEBUG(5) << now() << ":" << ip() << "," << printID(id()) << ":Found new node " << a->sender.ip << " via lookup\n"; DEBUG_MSG(a->sender,"lookup_handler",a->sender); loctable->add_node(a->sender); LogEntry *e = New LogEntry(a->sender, ALIVE, now()); leader_log.push_back(*e); delete e; } CHID key = a->key; IDMap corr_succ = loctable->succ(key); if (corr_succ.id == me.id) r->is_owner = true; else { r->is_owner = false; r->correct_owner = corr_succ; }}voidOneHop::join(Args *args){ if (!alive()) return; me.ip = ip(); me.id = ConsistentHash::ip2chid(me.ip); OneHopObserver::Instance(NULL)->addnode(me); //jy last_join = now(); //jy: get a random alive node from observer while (!_wkn.ip || _wkn.ip == me.ip) _wkn = OneHopObserver::Instance(NULL)->get_rand_alive_node(); if (ip() == OneHop::debug_node) DEBUG(0) << now() << ":" << me.ip << ","<< printID(me.id) << " start to join wkn " << _wkn.ip << endl; loctable->add_node(me); if (args && args->nget<uint>("first",0,10)==1) { //jy: a dirty hack, one hop does not like many nodes join at once _join_complete = true; delaycb(_stab_timer/2, &OneHop::stabilize, (void *)0); }else if (_wkn.ip != ip()) { IDMap fr; fr.id = _wkn.id; fr.ip = _wkn.ip; join_leader(fr, fr, args); } else { _join_complete = true; joins++; num_nodes++; stabilize((void *)0); publish((void *)0); } //who is my successor? if (alive()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -