⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 onehop.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
    IDMap succ_node = loctable->succ(me.id+1);    DEBUG(1) << now() << ":" << me.ip << "," << printID(me.id) << " succ_ip "       << succ_node.ip << " succ_id " << succ_node.id << " join_complete " << _join_complete<< endl;  }}voidOneHop::publish(void *v) {  if ((((now() - 1) % 100000) == 0) || (now() > 2000000 && now() < 2500000)) {  Time interval = (now() - old_time)/1000;  DEBUG(1) << "----------------------------------------------\n";  DEBUG(1) << "GRAND STATS\n";  DEBUG(1) << "Total number of lookups in the last cycle:" << lookups - old_lookups << endl;  DEBUG(1) << "Total number of one hop failures in the last cycle:" << failed - old_failed<< endl;  DEBUG(1) << "Fraction of lookup failures:" << (double)(failed - old_failed)/(double)(lookups - old_lookups) << endl;  DEBUG(1) << "Fraction of two hop lookup failures:" << (double)(two_failed - old_two_failed)/(double)(lookups - old_lookups) << endl;  DEBUG(1) << "Total number of same slice lookups:" << same_lookups << endl;  DEBUG(1) << "Total number of same slice failures:" << same_failed << endl;  DEBUG(1) << "Fraction of same slice failures:" << (double)same_failed/(double)same_lookups << endl;  DEBUG(1) << "Number of intra-slice messages:" << tot_intraslice << endl;  DEBUG(1) << "Number of intra-slice actually received:" << act_intraslice << endl;  DEBUG(1) << "Exp number of received intra-slice mesgs:" << exp_intraslice << endl;  DEBUG(1) << "Avg number of intra-slice recepients:" << (double)act_intraslice/(double)tot_intraslice << endl;  DEBUG(1) << "Avg expected number of intra-slice recepients:" << (double)exp_intraslice/(double)tot_intraslice << endl;  DEBUG(1) << "Avg number of empty units:" << (double)total_empty/(double)total_count << endl;  DEBUG(1) << "Number of inter-slice messages:" << tot_interslice << endl;  DEBUG(1) << "Number of inter-slice actually received:" << act_interslice << endl;  DEBUG(1) << "Avg number of inter-slice recepients:" << (double)act_interslice/(double)tot_interslice << endl;  DEBUG(1) << "Total number of alive and functioning nodes:" << num_nodes << endl;  DEBUG(1) << "Total number of successful joins so far:" << joins << endl;  DEBUG(1) << "Total number of crashes so far:" << crashes << endl;  DEBUG(1) << "Total number of crashes with non-empty outers:" << nonempty_outers << endl;  DEBUG(1) << "Total number of crashes with non-empty leaders:" << nonempty_leaders << endl;  DEBUG(1) << "Average number of bytes used per second in the last cycle by normal nodes" << (double)(bandwidth - old_bandwidth)/(double)(num_nodes*interval) << endl;  DEBUG(1) << "Average number of bytes used per second in the last cycle by slice leaders" << (double)(leader_bandwidth - old_leader_bandwidth)/(double)(_k*interval) << endl;  DEBUG(1) << "Total maintenance traffic per second in the last cycle" << (double)(bandwidth + leader_bandwidth - old_bandwidth - old_leader_bandwidth)/(interval) << endl;  DEBUG(1) << "Total lookup traffic per second in the last cycle" << (double)(lookup_bandwidth - old_lookup_bandwidth)/interval << endl;  DEBUG(1) << "Average maintenance bandwidth so far " << (double) (bandwidth+leader_bandwidth)*1000/(double)now() << endl;  DEBUG(1) << "Average lookup bandwidth so far " << (double) (lookup_bandwidth)*1000/(double)now() << endl;  DEBUG(1) << "Average number of messages by normal nodes" << (double)messages*1000/((double)now()*num_nodes) << endl;  DEBUG(1) << "Average number of messages by slice leaders" << (double)leader_messages*1000/((double)now()*_k) << endl;  DEBUG(1) << "----------------------------------------------\n";  old_lookups = lookups;  old_failed = failed;  old_two_failed = two_failed;  old_bandwidth = bandwidth;  old_leader_bandwidth = leader_bandwidth;  old_lookup_bandwidth = lookup_bandwidth;  old_time = now();  }  if (_publish_time>0)    delaycb((Time)_publish_time, &OneHop::publish, (void *)0);  }voidOneHop::crash(Args *args) {  OneHopObserver::Instance(NULL)->delnode(me);  DEBUG(1) << now() << ":" << me.ip << ","<< printID(me.id) << " crashed" << endl;  if (is_slice_leader(me.id, me.id)) {    DEBUG(1) << " -- Slice leader\n";    if (join_time > 0 && Node::collect_stat() && ((now()-join_time) > 180000)) {      double avg = (double)1000.0*node_live_outbytes/(double)(now()-join_time);      if (avg < 0.01) {	printf("%llu: me %u what?! avg %.2f too small last join %llu total_bytes_sent %u\n",	    now(),ip(),avg, join_time, node_live_outbytes);      }else{	OneHop::sliceleader_bw_avg.push_back(avg);      }    }  }  //else DEBUG(1) << "\n";  num_nodes--;  crashes++;  if (outer_log.size() > 0)    nonempty_outers++;  if (leader_log.size() > 0)    nonempty_leaders++;  /*jy: don't delete old loctable  delete loctable;  loctable = New OneHopLocTable(_k, _u);  loctable->size();  loctable->set_timeout(0); //no timeouts on loctable entries   */  loctable->del_all();  leader_log.clear();  outer_log.clear();  low_to_high.clear();  high_to_low.clear();  _join_complete = false;  _wkn.ip = 0;}voidOneHop::join_leader(IDMap la, IDMap sender, Args *args) {  IPAddress leader_ip = la.ip;  DEBUG(1) << now() << ":" << ip() << "," << printID(id()) << " joining " << leader_ip << " in slice " << slice(id()) << endl;  assert(leader_ip);  join_leader_args ja;  join_leader_ret* jr = New join_leader_ret (_k, _u);  ja.key = id();  ja.ip = ip();  //send mesg to node, if it is slice leader will respond with  //routing table, else will respond with correct slice leader  record_stat( me.ip,leader_ip,ONEHOP_JOIN_LOOKUP,1);  bool ok = doRPC (leader_ip, &OneHop::join_handler, &ja, jr,       TIMEOUT(me.ip,leader_ip));  if (ok) record_stat(leader_ip,me.ip,ONEHOP_JOIN_LOOKUP,jr->table.size(),1);  if (!alive()) {    delete jr;    return;  }  bool tmpok = false;  if (ok && !jr->is_join_complete) {    DEBUG(1) << now()<< ":" << ip() << "," << printID(id()) << " the leader " << leader_ip << " is still in the join process\n";     ok = false;    tmpok = true;  }  if (ok) {    //Anjali: Fix below -- will cause n pain later, check timestamp before ignoring    //if (jr->exists) return;    if (jr->is_slice_leader) {      //found correct slice leader or there is no leader, should have initial routing table      for (uint i=0; i < jr->table.size(); i++) {	DEBUG_MSG(jr->table[i],"join",la);        loctable->add_node(jr->table[i]);      }      _join_complete = true;      num_nodes++;      joins++;      LogEntry *e = New LogEntry(me, ALIVE, now());      leader_log.push_back(*e);      delete e;            DEBUG(1) << now() << ":" << ip() <<"," << printID(id()) << ":Joined successfully" << endl;      stabilize ((void *)0);       }    else {      //should contain updated slice leader info      IDMap new_leader = jr->leader;      DEBUG(1) << now() << ":" << ip() << "," <<  printID(id()) 	<< " join_leader new " << new_leader.ip << "," << printID(id()) << endl;      join_leader(new_leader, la, args);    }  }  else {    //the process failed somewhere, try to contact well known node again    //if it could successfully inform, good, else schedule after some time     //bool dead_ok = false;    //node is really dead, not just in the process of joining,    //inform redirecting node    if (!tmpok) {      DEBUG(4) << now() << ":" << ip() << "," <<  printID(id()) 	<< ": " << leader_ip << "failed repeating informing " << sender.ip 	<< " that " << leader_ip << " has failed " << endl;      //dead_ok = inform_dead (la, sender);      test_inform_dead_args *aa = New test_inform_dead_args;      aa->justdelete = false;      aa->suspect = la;      aa->informed = sender;      delaycb(0,&OneHop::test_dead_inform,aa);    }    DEBUG(1) << now() << ":" << ip() << "," <<  printID(id())       << "rescheduling join" << endl;    delaycb (_retry_timer, &OneHop::join, (Args *)0);  }  delete jr;}voidOneHop::join_handler(join_leader_args *args, join_leader_ret *ret) {  CHID node = args->key;  //contacted looks into loctable and checks if it is the correct leader  //or if there is no correct leader  DEBUG(3) << now() << ":" << ip() << "," <<  printID(id())     << ": Contacted by " << args->ip << " for join\n";  if (!_join_complete) {    ret->is_join_complete = false;    return;  }  ret->is_join_complete = true;  if (is_slice_leader(node, id()) || loctable->is_empty(node)) {    ret->is_slice_leader = true;    ret->table = loctable->get_all();    IDMap newnode;    newnode.id = node;    newnode.ip = args->ip;    DEBUG_MSG(newnode,"join_handler",newnode);    loctable->add_node(newnode);    LogEntry *e = New LogEntry(newnode, ALIVE, now());    leader_log.push_back(*e);    delete e;    DEBUG(3) << now() << ":" << ip() << "," <<  printID(id()) << ":accepting " << newnode.ip << " for join\n";  }  else {    ret->is_slice_leader = false;    ret->leader = slice_leader(node);    DEBUG(3) << now() << ":" << ip() << "," <<  printID(id())       << ":not slice leader, not empty, so redirecting "<< args->ip << " to " << (ret->leader).ip <<endl;  }}OneHop::~OneHop() {  if (alive() && _join_complete) {    DEBUG(1) << now() << ":" << ip() << "," <<  printID(id())      <<":In slice " << slice(id()) << endl;    if (is_slice_leader(me.id, me.id)) {      DEBUG(1) << now() << ":" << ip() << "," <<  printID(id()) 	<< ":Slice leader of slice " << slice(me.id) << endl;    }  }  if (me.ip == 1) {    if (OneHop::num_violations > 0)       printf("Number of violations of sending too big a message %u\n", OneHop::num_violations);    Node::print_stats();    printf("<-----STATS----->\n");    sort(OneHop::sliceleader_bw_avg.begin(),OneHop::sliceleader_bw_avg.end());    uint sz = OneHop::sliceleader_bw_avg.size();    if (sz > 0)      printf("SLICELEADER_BW:: 50p:%.2f 90p:%.2f 95p:%.2f 99p:%.2f sz:%u\n",	  OneHop::sliceleader_bw_avg[(uint)(sz*0.5)],	  OneHop::sliceleader_bw_avg[(uint)(sz*0.9)],	  OneHop::sliceleader_bw_avg[(uint)(sz*0.95)],	  OneHop::sliceleader_bw_avg[(uint)(sz*0.99)],sz);    printf("<-----ENDSTATS----->\n");  }  delete loctable;}  voidOneHop::stabilize(void* x){  if (!alive()) return;  if (!_join_complete) return;    DEBUG(1) << now() << ":" << ip() << "," <<  printID(id())    <<" stabilize last " << last_stabilize << " slice leader "     << slice(id()) << " unit leader " << unit(id()) << endl;  last_stabilize = now();  /*  //if (now() < 2000000)  //  countertime = now();  if ((now() >= 2000000) && (now() < 2100000))    //if (((now() - countertime) % 5000) == 0) {    //  DEBUG(1) << ip() << "Stabilizing \n";      if (is_slice_leader(me.id, me.id))          DEBUG(1) << ip() << ":Slice leader of slice "<< slice(id()) << endl;    //}    */        if (is_slice_leader(me.id, me.id) || is_unit_leader(me.id, me.id))    leader_stabilize((void *)0);   if (!alive()) return;  notifyevent_args na, piggyback;  na.sender = me;   piggyback.sender = me;  piggyback.log.clear();  bool ok = false;    //I may have been slice leader some time, not any more. inform slice leader  while ((!is_slice_leader(me.id, me.id)) && (leader_log.size () > 0)      && (na.log.size() < MAX_IDS_MSG)) { //jy: limit size of one msg    DEBUG_MSG(leader_log.front()._node,"stabilize non-leader leader->na log", me);    na.log.push_back(leader_log.front());    leader_log.pop_front();  }  while ((!is_slice_leader(me.id, me.id)) && (outer_log.size () > 0)      && (na.log.size() < MAX_IDS_MSG)) { //jy: limit size of one msg    DEBUG_MSG(outer_log.front()._node,"stabilize non-leader outer->na log", me);    na.log.push_back(outer_log.front());    outer_log.pop_front();  }    while ((low_to_high.size() > 0)       && (piggyback.log.size()< MAX_IDS_MSG)){ //jy: limit size of one msg    DEBUG_MSG(low_to_high.front()._node,"stabilize non-leader low_to_high->piggyback log", me);    piggyback.log.push_back(low_to_high.front());    low_to_high.pop_front();  }  //successor ping  while (!ok) {    IDMap succ = loctable->succ(me.id+1);   //ping the successor to see if it is alive and piggyback a log    general_ret gr;    piggyback.up = 1;    bw data = 4*piggyback.log.size();    ok = fd_xRPC(succ.ip,&OneHop::ping_handler,&piggyback,&gr, 	ONEHOP_PING, piggyback.log.size());    if (ok) record_stat(succ.ip,me.ip,ONEHOP_PING,0,1);    DEBUG(4) << now() << ":" << ip() << "," << printID(me.id)       << " succ " << succ.ip << "," << printID(succ.id) << " ok? " << (ok?1:0) << endl;    if (!alive()) return;       //very ugly hack to fix accouting -- clean up     if (!((slice(succ.id) == slice(me.id)) && (unit(succ.id) == unit(me.id)))) {        if (is_slice_leader(me.id, me.id)) {          leader_bandwidth = leader_bandwidth - 20 - data;          leader_messages--;        }        else {           bandwidth = bandwidth - 20 - data;          messages--;        }    }        if (!ok) {      if (me.id != succ.id)      loctable->del_node(succ.id);      DEBUG(5) << now() << ":" << ip() << "," <<  printID(id())	<<":PING! Informing " << slice_leader(me.id).ip <<" that successor "<< succ.ip << " is dead\n";      LogEntry *e = New LogEntry(succ, DEAD, now());          na.log.push_back(*e);      piggyback.log.push_back(*e);      delete e;    }     else if (!gr.has_joined) {      if (me.id != succ.id)        loctable->del_node(succ.id);    }    ok = ok && gr.has_joined;    //if (ok && (!gr.correct))   //   loctable->add_node(gr.act_neighbor);    }      //predecessor ping and piggyback  piggyback.log.clear();  while ((high_to_low.size() > 0)       && (piggyback.log.size() < MAX_IDS_MSG)) {//jy: limit max ids in one msg    piggyback.log.push_back(high_to_low.front());    high_to_low.pop_front();  }      ok = false;  while (!ok) {    IDMap pred = loctable->pred(me.id-1);    DEBUG(4) << now() << ":" << ip() << "," <<  printID(id())      << " pred " << pred.ip << "," << printID(pred.id) << " ok? " << (ok?1:0) << endl;    general_ret gr;    piggyback.up = 0;     bw data = 20+ 4*piggyback.log.size();    ok = fd_xRPC(pred.ip, &OneHop::ping_handler, &piggyback, &gr, 	ONEHOP_PING,piggyback.log.size());    record_stat(pred.ip,me.ip,ONEHOP_PING, 0,1);    if (!alive()) return;    //very ugly hack to fix accouting -- change after deadline    if (!((slice(pred.id) == slice(me.id)) && (unit(pred.id) == unit(me.id)))) {        if (is_slice_leader(me.id, me.id)) {          leader_bandwidth = leader_bandwidth - data;          leader_messages--;        }        else {          bandwidth = bandwidth - data;          messages--;        }    }    if (!ok) {      if (me.id != pred.id)      loctable->del_node(pred.id);      DEBUG(5) << now() << ":" << ip() << "," <<  printID(id())	<<":PING! Informing " << slice_leader(me.id).ip << " that predecessor "<< pred.ip << " is dead\n";      LogEntry *e = New LogEntry(pred, DEAD, now());      na.log.push_back(*e);      piggyback.log.push_back(*e);      delete e;    }    else if (!gr.has_joined) {      if (me.id != pred.id)      loctable->del_node(pred.id);    }    ok = ok && gr.has_joined;    //if (ok && (!gr.correct))    //  loctable->add_node(gr.act_neighbor);    }    //if either successor or predecessor has died, notify slice leader   ok = false;  if (na.log.size() > 0) {    while (!ok) {      IDMap sliceleader = loctable->slice_leader(me.id);      general_ret gr;      DEBUG(1) << now() << ":" << ip() << "," << printID(id())	<< " stabilize "<< " notifyevent log sz " <<  na.log.size() << "to sliceleader "<<sliceleader.ip << endl;      ok = fd_xRPC(sliceleader.ip, &OneHop::notifyevent_handler,&na,&gr, 	  ONEHOP_NOTIFY,na.log.size());      if (ok) record_stat(sliceleader.ip,me.ip,ONEHOP_NOTIFY,0,1);      if (!alive()) return;      if (!ok) {        LogEntry *e = New LogEntry(sliceleader, DEAD, now());        na.log.push_back(*e);        if (me.id != sliceleader.id)        loctable->del_node(sliceleader.id);        DEBUG(5) << now() << ":" << ip() <<"," << printID(id())	  <<":PING! Informing " << slice_leader(me.id).ip << " that old slice leader "<< sliceleader.ip << " is dead\n";        delete e;      }       else if (!gr.has_joined) {      if (me.id != sliceleader.id)        loctable->del_node(sliceleader.id);      }      ok = ok && gr.has_joined;      if (ok) {         if (gr.act_sliceleader.ip) {	  DEBUG_MSG(gr.act_sliceleader,"stabilize",sliceleader);          loctable->add_node(gr.act_sliceleader);	}      }    }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -