⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 onehop.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
  delaycb(_stab_timer, &OneHop::stabilize, (void *)0);}voidOneHop::leader_stabilize(void *x) {  if (_leaderstab_running) return;  _leaderstab_running = true;  if (!alive()) return;  if (!_join_complete) return;  /*     if (is_slice_leader(me.id, me.id)) {     DEBUG(1) << ip() << ":Slice size = " << slice_size << endl;     DEBUG(1) << ip() << ":Unit size = " << loctable->unit_size << endl;     DEBUG(1) << ip() << ":My Id = " << id() << endl;     DEBUG(1) << ip() << "I am succ of (slice mid) " << slice(me.id)*slice_size + slice_size/2 << endl;     DEBUG(1) << ip() << "I should also be succ of (unit mid) " << slice(me.id)*slice_size + unit(me.id)*(loctable->unit_size) + (loctable->unit_size)/2 << endl;     DEBUG(1) << ip() << "Code claims unit succ is " << unit_leader(me.id).ip << " with id " << unit_leader(me.id).id << endl;     DEBUG(1) << ip() << "My unit is " << unit(me.id) << " and claimed succ's unit is " << unit(unit_leader(me.id).id) << endl;     assert(is_unit_leader(me.id, me.id));     }     */  notifyevent_args send_unit;  send_unit.sender = me;  while ((inner_log.size() > 0)     && (send_unit.log.size() < MAX_IDS_MSG)) {    send_unit.log.push_back(inner_log.front());    inner_log.pop_front();  }  bool ok = false;  if (send_unit.log.size() > 0) {    while (!ok) {      IDMap succ = loctable->succ(me.id+1);      //ping the successor to see if it is alive and piggyback a log      general_ret gr;      bw data = 20 + 4*send_unit.log.size();      ok = fd_xRPC(succ.ip, &OneHop::notify_rec_handler, &send_unit, &gr, 	  ONEHOP_LEADER_STAB, send_unit.log.size());      if (ok) record_stat(succ.ip,me.ip,ONEHOP_LEADER_STAB,0,1);      if (!alive()) return;      //very ugly hack to fix accouting -- change after deadline      if (!((slice(succ.id) == slice(me.id)) && (unit(succ.id) == unit(me.id)))) {        if (is_slice_leader(me.id, me.id)) {          leader_bandwidth = leader_bandwidth - data;          leader_messages--;        }        else {          bandwidth = bandwidth - data;          messages--;        }      }      if (!ok) {        LogEntry *e = New LogEntry(succ, DEAD, now());        leader_log.push_back(*e);        send_unit.log.push_back(*e);        delete e;        if (me.id != succ.id)          loctable->del_node(succ.id);      }      if (ok && !gr.has_joined) {        DEBUG(5) << now() << ":" << ip() << "," << printID(id())	  << "Sending to an incompletely joined node\n";        if (me.id != succ.id)          loctable->del_node(succ.id);      }    }    ok = false;    while (!ok) {      IDMap pred = loctable->pred(me.id-1);      //ping the pred to see if it is alive and piggyback a log      general_ret gr;      bw data = 20+ 4*send_unit.log.size();      ok = fd_xRPC(pred.ip, &OneHop::notify_rec_handler, &send_unit, &gr, 	  ONEHOP_LEADER_STAB, send_unit.log.size());      if (ok) record_stat(pred.ip,me.ip,ONEHOP_LEADER_STAB,0,1);      if (!alive()) return;      //very ugly hack to fix accouting -- change after deadline      if (!((slice(pred.id) == slice(me.id)) && (unit(pred.id) == unit(me.id)))) {        if (is_slice_leader(me.id, me.id)) {          leader_bandwidth = leader_bandwidth - data;          leader_messages--;        }        else {          bandwidth = bandwidth - data;          messages--;        }      }      if (!ok) {        LogEntry *e = New LogEntry(pred, DEAD, now());        leader_log.push_back(*e);        send_unit.log.push_back(*e);        delete e;        if (me.id != pred.id)          loctable->del_node(pred.id);      }      if (ok && !gr.has_joined) {        DEBUG(5) << now() << ":" << ip() << "," << printID(id())	  << "Sending to an incompletely joined node\n";        if (me.id != pred.id)          loctable->del_node(pred.id);      }    }    /*       vector<IDMap> all_nodes = loctable->get_all();       for (uint i=0; i < all_nodes.size(); i++) {       if (!alive()) {       DEBUG(1) << ip() << ":Unit leader: received events but died before sending them! Yikes!\n";       return;       }       ok = false;       if ((slice(all_nodes[i].id) == slice(me.id)) && (unit(all_nodes[i].id) == unit(me.id))) {       general_ret gr;        ok = xRPC(all_nodes[i].ip, &OneHop::notify_rec_handler, &send_unit, &gr);       if (!ok) {       LogEntry *e = new LogEntry(all_nodes[i], DEAD, now());       leader_log.push_back(*e);       send_unit.log.push_back(*e);       loctable->del_node(all_nodes[i],true);       }       if (ok && !gr.has_joined) {       DEBUG(1) << ip() << "Sending to an incompletely joined node\n";       loctable->del_node(all_nodes[i],true);       }       }       }*/  }  if (is_slice_leader(me.id, me.id) && (leader_log.size() + outer_log.size() >= 5)) {    notifyevent_args send_in, send_out;    send_in.sender = me;    send_out.sender = me;    while ((leader_log.size() > 0) 	&& (send_in.log.size() < MAX_IDS_MSG)) {      send_in.log.push_back(leader_log.front());      send_out.log.push_back(leader_log.front());      leader_log.pop_front();    }    while ((outer_log.size() > 0) 	&& (send_in.log.size() < MAX_IDS_MSG)) {      send_in.log.push_back(outer_log.front());      outer_log.pop_front();    }    if (send_out.log.size() > 0) tot_interslice++;    if (send_in.log.size() > 0) {      tot_intraslice++;      exp_intraslice += _u ;    }    vector<IDMap> all_nodes = loctable->get_all();    if (send_in.log.size() > 0) {      for (uint i=0; i < all_nodes.size(); i++) {        if (!alive()) {          DEBUG(3) << now() << ":" << ip() << "," << printID(id())	    << "Received events but died before sending them! Yikes!\n";          return;        }        //all logs are already incorporated, do not send to myself        //send mesg to all unit leaders        if ((slice(me.id) == slice(all_nodes[i].id)) && is_unit_leader(all_nodes[i].id, all_nodes[i].id)) {          general_ret gr;          ok = fd_xRPC(all_nodes[i].ip, &OneHop::notify_unit_leaders, &send_in, &gr, 	      ONEHOP_LEADER_STAB, send_in.log.size());	  if (ok) record_stat(all_nodes[i].ip,me.ip,ONEHOP_LEADER_STAB,0,1);	  if (!alive()) return;          if (ok) {            if (gr.has_joined) {              act_intraslice++;              //DEBUG(2) << ip() << ":Sent intra-slice to " << all_nodes[i].id << " in unit " << unit(all_nodes[i].id) << endl;            }            if ((!gr.has_joined) && (me.id != all_nodes[i].id)) {              loctable->del_node(all_nodes[i].id);	      DEBUG(5) << now() << ":" << ip() << "," << printID(id())		<< "Sending to an incompletely joined node -- failed\n";            }          }          else {            LogEntry *e = New LogEntry(all_nodes[i], DEAD, now());            leader_log.push_back(*e);            send_in.log.push_back(*e);            send_out.log.push_back(*e);            delete e;            if (me.id != all_nodes[i].id)              loctable->del_node(all_nodes[i].id);          }        }        if (me.id != all_nodes[i].id) {          if (send_out.log.size() > 0) {            if ((is_slice_leader(slice(all_nodes[i].id)*slice_size, all_nodes[i].id))) {              general_ret gr;	                    ok = fd_xRPC(all_nodes[i].ip, &OneHop::notify_other_leaders, &send_out, &gr, 		  ONEHOP_LEADER_STAB, send_out.log.size());	      if (ok) record_stat(all_nodes[i].ip,me.ip,ONEHOP_LEADER_STAB,0,1);	      if (!alive()) return;              if (ok) {                act_interslice++;                if (!gr.has_joined) {                  if (me.id != all_nodes[i].id)                    loctable->del_node(all_nodes[i].id);		  DEBUG(5) << now() << ":" << ip() << "," << printID(id())                  << "Sending to an incompletely joined node\n";                }              }              if (!ok) {                LogEntry *e = New LogEntry(all_nodes[i], DEAD, now());                outer_log.push_back(*e);                send_in.log.push_back(*e);                delete e;                if (me.id != all_nodes[i].id)                  loctable->del_node(all_nodes[i].id);              }            }          }        }      }      for (int i=0; i < _u; i++) {        CHID n = slice(me.id)*slice_size + i*loctable->unit_size;        if (loctable->is_empty_unit (n))           total_empty++;      }      total_count++;     }  }  _leaderstab_running = false;}voidOneHop::ping_handler(notifyevent_args *args, general_ret *ret) {  ret->correct = true;  if (_join_complete)    ret->has_joined = true;  else ret->has_joined = false;  IDMap succ_node = loctable->succ(args->sender.id);  if (succ_node.id != args->sender.id) {      if (!alive()) return;      DEBUG(5) << now() << ":" << ip() << "," << printID(id())      << ":Found new node " << args->sender.ip << " via ping\n";      DEBUG_MSG(args->sender,"ping_handler directly add sender",args->sender);      loctable->add_node(args->sender);      LogEntry *e = New LogEntry(args->sender, ALIVE, now());      leader_log.push_back(*e);      delete e;  }  if ((slice(args->sender.id) == slice(me.id)) && (unit(args->sender.id) == unit(me.id))) {    for (uint i=0; i < args->log.size(); i++) {      if (args->sender.id >= me.id) { 	high_to_low.push_back(args->log[i]);	sent_low = false;      }      else {        low_to_high.push_back(args->log[i]);        sent_high = false;      }              if (args->log[i]._state == DEAD) {	if (args->log[i]._node.ip == ip()) {	  DEBUG(3) << now() << ":" << ip() << "," << printID(id())	  << ":Panic! People think I am dead, but I'm not\n";	  //exit(-1);	  LogEntry *e = New LogEntry(me, ALIVE, now());	  leader_log.push_back(*e);	  delete e;	if (args->sender.id >= me.id)	  high_to_low.pop_back();	else low_to_high.pop_back();	}	else {	  loctable->del_node(args->log[i]._node.id);	}      }      else {	DEBUG_MSG(args->log[i]._node, "ping_handler", args->sender);	loctable->add_node(args->log[i]._node);       }    }  }  ret->correct = true;  //now that all logs have been absorbed, and I know latest info  //check if I am really the right neighbor  /*if (args->up == 1) { //I am supposed to be the successor    //who succ    IDMap succ_node = loctable->succ(args->sender.id + 1);    if (succ_node.id != me.id) {//I am not the successor      assert(ConsistentHash::betweenleftincl(args->sender.id, me.id, succ_node.id));      ret->act_neighbor = succ_node;      ret->correct = false;    }  }  else {    IDMap pred_node = loctable->pred(args->sender.id - 1);    if (pred_node.id != me.id) {      assert(ConsistentHash::betweenrightincl(me.id, args->sender.id, pred_node.id));      ret->act_neighbor = pred_node;      ret->correct = false;    }  }  */             }  voidOneHop::notifyevent_handler(notifyevent_args *args, general_ret *ret){  if (_join_complete)    ret->has_joined = true;  else ret->has_joined = false;  bool ok = false;  if (!alive()) {    DEBUG(3) << now() << ":" << ip() << "," << printID(id())    << "Received events but died before sending them! Yikes!\n";    return;  }    bool me_leader = is_slice_leader(me.id, me.id);  general_ret gr;  ret->act_sliceleader.id = 0;  ret->act_sliceleader.ip = 0;  if (!me_leader) {    //not slice leader, but still got message, forward to correct slice leader    //must forward message to the correct slice leader    DEBUG(1) << now() << ":" << ip() << "," << printID(id())    << ":I am not slice leader of "<< slice(id()) << ", still I got this message!\n";    while (!ok) {      IDMap sliceleader = loctable->slice_leader(me.id);      DEBUG(1) << now() << ":" << ip() << "," << printID(id())      << ":Forwarding to " << sliceleader.ip << endl ;      ok = fd_xRPC(sliceleader.ip, &OneHop::notifyevent_handler, args, &gr, 	  ONEHOP_NOTIFY, args->log.size());      if (ok) record_stat(sliceleader.ip,me.ip,ONEHOP_NOTIFY, 0,1);      if (!alive()) return;      if (!ok) {        LogEntry *e = New LogEntry(sliceleader, DEAD, now());        leader_log.push_back(*e);        args->log.push_back(*e);        if (me.id != sliceleader.id)        loctable->del_node(sliceleader.id);	DEBUG(5) << now() << ":" << ip() << "," << printID(id())        <<":PING! Informing " << slice_leader(me.id).ip << " that old slice leader "<< sliceleader.ip << " is dead\n";        delete e;      }      else if (!gr.has_joined) {      if (me.id != sliceleader.id)        loctable->del_node(sliceleader.id);      }      ok = ok && gr.has_joined;      if (ok) {         ret->act_sliceleader = sliceleader;        LogEntry *e = New LogEntry(sliceleader, ALIVE, now());        leader_log.push_back(*e);        delete e;      }    }     }  for (uint i=0; i < args->log.size(); i++) {    if (args->log[i]._state == DEAD) {      if (!alive()) return;      if (args->log[i]._node.ip == ip()) {	DEBUG(5) << now() << ":" << ip() << "," << printID(id())        << ":Panic! People think I am dead, but I'm not\n";        LogEntry *e = New LogEntry(me, ALIVE, now());        leader_log.push_back(*e);        delete e;      }      else {        if (!alive()) return;        loctable->del_node(args->log[i]._node.id);        if (me_leader || (is_slice_leader(me.id, me.id))) {          LogEntry *e = New LogEntry(args->log[i]._node, DEAD, now());          leader_log.push_back(*e);          delete e;        }      }    }    else {      if (!alive()) return;      DEBUG_MSG(args->log[i]._node,"notifyevent_handler",args->sender);      loctable->add_node(args->log[i]._node);      if ((me_leader) || is_slice_leader(me.id, me.id)) {        LogEntry *e = New LogEntry(args->log[i]._node, ALIVE, now());        leader_log.push_back(*e);        delete e;      }    }  }  IDMap succ_node = loctable->succ(args->sender.id);  if (succ_node.id != args->sender.id) {      if (!alive()) return;      DEBUG(5) << now() << ":" << ip() << "," << printID(id())      << ":Found New node " << args->sender.ip << " via notify event\n";      DEBUG_MSG(args->sender,"notifyevent_handler directly add sender", args->sender);      loctable->add_node(args->sender);      LogEntry *e = New LogEntry(args->sender, ALIVE, now());      leader_log.push_back(*e);      delete e;  }}  void OneHop::notify_rec_handler(notifyevent_args *args, general_ret *ret){  if (_join_complete)    ret->has_joined = true;  else ret->has_joined = false;  if (!alive()) return; 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -