⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 5 页
字号:
    }    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      // need to delete all the currclose now      for( uint j = 0; j < _k; j++ ) {	NodeInfo *currseed = closestk[j];	delete currseed;      }      return;    }    TapDEBUG(2) << "gathered closest for level " << i << endl;    // these k are the next initlist    for( uint l = 0; l < initlist.size(); l++ ) {      delete initlist[l];    }    initlist.clear();    seeds.clear();    for( uint k = 0; k < _k; k++ ) {      NodeInfo *currclose = closestk[k];      if( currclose != NULL ) {	TapDEBUG(2) << "close is " << currclose->_addr << endl;	initlist.push_back( currclose );      }    }  }  for( uint l = 0; l < initlist.size(); l++ ) {    delete initlist[l];  }  initlist.clear();  have_joined();  TapDEBUG(5) << "j exit" << endl;  TapDEBUG(2) << "join done" << endl;  TapDEBUG(2) << *_rt << endl;}voidTapestry::handle_join(join_args *args, join_return *ret){  TapDEBUG(5) << "hj enter" << endl;  TapDEBUG(2) << "got a join message from " << args->ip << "/" <<     print_guid(args->id) << endl;  // not allowed to participate in your own join!  if( args->ip == ip() ) {    ret->failed = true;    TapDEBUG(5) << "hj exit" << endl;    return;  }  // if our join has not yet finished, we must delay the handling of this  // person's join.  while( !joined ) {    _waiting_for_join->wait();    // hmmm. if we're now dead, indicate some kind of timeout probably    if( !alive() ) {      ret->failed = true;      TapDEBUG(5) << "hj exit" << endl;      return; //TODO: ????    }  }  // route toward the root  IPAddress *ips = New IPAddress[_redundant_lookup_num];  for( uint i = 0; i < _redundant_lookup_num; i++ ) {    ips[i] = 0;  }  GUID *ids = New GUID[_redundant_lookup_num];  for( uint i = 0; i < _redundant_lookup_num; i++ ) {    ids[i] = 0;  }  ips[0] = args->ip;  next_hop( args->id, &ips, &ids, _redundant_lookup_num );  uint i = 0;  for( ; i < _redundant_lookup_num; i++ ) {    IPAddress next = ips[i];    if( next == 0 ) {      continue;    }    if( next == ip() ) {      // we are the surrogate root for this node, start the integration          TapDEBUG(2) << "is the surrogate root for " << args->ip << endl;            // start by sending the new node all of the nodes in your table      // up to the digit you share      int alpha = guid_compare( args->id, id_digits() );      vector<NodeInfo *> thisrow;            bool stop = true;      for( int i = 0; i <= alpha; i++ ) {		for( uint j = 0; j < _base; j++ ) {	  NodeInfo *ni = _rt->read( i, j );	  if( ni != NULL ) {	    // keep going if there is someone else on this level	    if( ni->_addr != ip() ) {	      stop = false;	    }	    thisrow.push_back( ni );	  }	}		if( stop ) break;	stop = true;	      }      nodelist_args na;      na.nodelist = thisrow;      nodelist_return nr;      record_stat(STAT_NODELIST, na.nodelist.size(), 0);      unsigned rpc = asyncRPC( args->ip, &Tapestry::handle_nodelist, 			       &na, &nr, MAXTIME );      // start the multicast      mc_args mca;      mca.new_ip = args->ip;      mca.new_id = args->id;      mca.alpha = alpha;      mca.from_lock = false;      mc_return mcr;            // make the watchlist      vector<bool *> wl;      for( int i = 0; i < alpha+1; i++ ) {	bool *level = New bool[_base];	wl.push_back(level);	for( uint j = 0; j < _base; j++ ) {	  wl[i][j] = false;	}      }      mca.watchlist = wl;            handle_mc( &mca, &mcr );      // finish up the nodelist rpc      RPCSet rpcset;      rpcset.insert(rpc);      bool ok;      rcvRPC( &rpcset, ok );      if( ok ) {	record_stat(STAT_NODELIST, 0, 0);      } else {	if( retryRPC( args->ip, &Tapestry::handle_nodelist, 		      &na, &nr, STAT_NODELIST, na.nodelist.size(), 0 ) ) {	  record_stat(STAT_NODELIST, 0, 0);	}      }      // free the nodelist      for( uint i = 0; i < na.nodelist.size(); i++ ) {	delete na.nodelist[i];      }            // free the bools!      for( int i = 0; i < alpha+1; i++ ) {	bool *level = wl[i];	delete level;      }      TapDEBUG(2) << "surrogate for " << args->ip 		  << " is done with handlejoin" << endl;      ret->surr_id = id();      break;    } else {      // not the surrogate      // recursive routing yo      TapDEBUG(2) << "Forwarding join req for " << args->ip << " to " 		  << next << endl;      bool succ = retryRPC( next, &Tapestry::handle_join, args, ret, 			    STAT_JOIN, 1, 0);      if( succ ) {	record_stat(STAT_JOIN, 1, 1);      }      if( succ && !ret->failed ) {	// success!	break;      } else {	ret->failed = false;      }    }  }  // if we were never successful, set the failed flag  if( i == _redundant_lookup_num ) {    ret->failed = true;  }  delete [] ips;  delete [] ids;  TapDEBUG(5) << "hj exit" << endl;}void Tapestry::handle_nodelist(nodelist_args *args, nodelist_return *ret){  TapDEBUG(5) << "nhl enter" << endl;  TapDEBUG(3) << "handling a nodelist message" << endl;  // add each to your routing table  multi_add_to_rt( &(args->nodelist), NULL );  /*  for( uint i = 0; i < args->nodelist.size(); i++ ) {    NodeInfo * curr_node = args->nodelist[i];    add_to_rt( curr_node->_addr, curr_node->_id );  }  */  TapDEBUG(5) << "nhl exit" << endl;}voidTapestry::handle_mc(mc_args *args, mc_return *ret){  TapDEBUG(5) << "mc enter" << endl;  TapDEBUG(2) << "got multicast message for " << args->new_ip << endl;  // not allowed to participate in your own join!  if( args->new_ip == ip() ) {    TapDEBUG(5) << "mc exit" << endl;    return;  }  // lock this node  _rt->set_lock( args->new_ip, args->new_id );  // make sure that it knows about you as well  mcnotify_args mca;  mca.ip = ip();  mca.id = id();  mcnotify_return mcr;  // check the watchlist and see if you know about any nodes that  // can fill those prefixes  vector<NodeInfo *> nodelist;  for( uint i = 0; i < args->watchlist.size(); i++ ) {    for( uint j = 0; j < _base; j++ ) {      if( !args->watchlist[i][j] ) {	NodeInfo * ni = _rt->read( i, j );	if( ni != NULL && ni->_addr != ip() && ni->_addr != args->new_ip ) {	  nodelist.push_back( ni );	  args->watchlist[i][j] = true;	} else {	  if( ni != NULL ) {	    delete ni;	  }	}      }    }  }  mca.nodelist = nodelist;  record_stat(STAT_MCNOTIFY, 1+nodelist.size(), 0);  unsigned mcnrpc = asyncRPC( args->new_ip, &Tapestry::handle_mcnotify, 			       &mca, &mcr, MAXTIME );  // don't go on if this is from a lock  if( !args->from_lock ) {    RPCSet rpcset;    HashMap<unsigned, mc_callinfo*> resultmap;    unsigned int numcalls = 0;    // then, find any other node that shares this prefix and multicast    for( uint i = args->alpha; i < _digits_per_id; i++ ) {      for( uint j = 0; j < _base; j++ ) {	NodeInfo *ni = _rt->read( i, j );	// don't RPC to ourselves or to the new node	if( ni == NULL || ni->_addr == ip() || ni->_addr == args->new_ip ) {	  if( ni != NULL ) {	    delete ni;	  }	  continue;	} else {	  mc_args *mca = New mc_args();	  mc_return *mcr = New mc_return();	  mca->new_ip = args->new_ip;	  mca->new_id = args->new_id;	  mca->alpha = i + 1;	  mca->from_lock = false;	  mca->watchlist = args->watchlist;	  TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << 	    ni->_addr << "/" << print_guid( ni->_id ) << endl;	  record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base);	  unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr,				   _rtt_timeout_factor*_rt->get_time(ni->_id));	  assert(rpc);	  resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr));	  rpcset.insert(rpc);	  numcalls++;	  delete ni;	}	      }    }        // also, multicast to any locks that might need it    vector <NodeInfo *> * locks = _rt->get_locks( args->new_id );    for( uint i = 0; i < locks->size(); i++ ) {      NodeInfo *ni = (*locks)[i];      if( ni->_addr != args->new_ip ) {	mc_args *mca = New mc_args();	mc_return *mcr = New mc_return();	mca->new_ip = args->new_ip;	mca->new_id = args->new_id;	mca->alpha = args->alpha;	mca->from_lock = true;	mca->watchlist = args->watchlist;	TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << 	  ni->_addr << "/" << print_guid( ni->_id ) << " as a lock " << endl;	record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base);	Time timeout = MAXTIME;	if( _rt->contains( ni->_id ) ) {	  timeout = _rtt_timeout_factor*_rt->get_time( ni->_id );	}	unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr, 				 timeout );	assert(rpc);	resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr));	rpcset.insert(rpc);	numcalls++;      }    }        Time startmc_time = now();    // wait for them all to return    while( rpcset.size() > 0 ) {      bool ok;      unsigned donerpc = rcvRPC( &rpcset, ok );      if( ok || now() >= startmc_time + _declare_dead_time ) {	if( ok ) {	  record_stat(STAT_MC, 0, 0);	}	mc_callinfo *ci = resultmap[donerpc];	TapDEBUG(2) << "mc to " << ci->ip << " about " << args->new_ip << 	  " is done.  ok = " << ok << endl;	delete ci;      } else {	// do it again sam	mc_callinfo *ci = resultmap[donerpc];	resultmap.remove( donerpc );	GUID ci_id = get_id_from_ip(ci->ip);	Time timeout = MAXTIME;	if( _rt->contains( ci_id ) ) {	  timeout = _rtt_timeout_factor*_rt->get_time( ci_id );	}	if( timeout == 0 ) {	  TapDEBUG(2) << "Timeout of 0 for node " << ci->ip << "/"		      << print_guid(ci_id) << "; why?" << endl;	}	record_stat(STAT_MC, 1, 2+ci->ma->watchlist.size()*_base);	unsigned rpc = asyncRPC( ci->ip, &Tapestry::handle_mc, ci->ma, 				 ci->mr, timeout );	assert(rpc);	rpcset.insert(rpc);	resultmap.insert(rpc, ci);      }    }  }  // finish up mcnotify  RPCSet mcnrpcset;  mcnrpcset.insert(mcnrpc);  bool ok;  rcvRPC( &mcnrpcset, ok );  if( ok ) {    record_stat(STAT_MCNOTIFY, 0, 0);  } else {    ok = retryRPC( args->new_ip, &Tapestry::handle_mcnotify, 		   &mca, &mcr, STAT_MCNOTIFY, 1+nodelist.size(), 0);    if( ok ) {      record_stat(STAT_MCNOTIFY, 0, 0);    }  }  // free the nodelist  for( uint i = 0; i < nodelist.size(); i++ ) {    delete nodelist[i];  }  if( !ok ) {    _rt->remove_lock( args->new_ip, args->new_id );    TapDEBUG(3) << "Notify to new node failed, abandoning mc!" << endl;    TapDEBUG(5) << "mc exit" << endl;    return;  }  // we wait until here to add the new node to your table  // since other simultaneous inserts will require sending the mc to  // at least one unlocked pointer, and if this guy is in the routing table  // already, we may send to it even though its locked.  add_to_rt( args->new_ip, args->new_id );  _rt->remove_lock( args->new_ip, args->new_id );  TapDEBUG(2) << "multicast done for " << args->new_ip << endl;  // TODO: object pointer transferal  TapDEBUG(5) << "mc exit" << endl;}void Tapestry::handle_nn(nn_args *args, nn_return *ret){  TapDEBUG(5) << "nn enter" << endl;  // send back all backward pointers  vector<NodeInfo *> nns;  vector<NodeInfo *> *bps = _rt->get_backpointers( args->alpha );  for( uint i = 0; i < bps->size() && (!_nn_random || i < _k); i++ ) {    uint index = i;    // pick the nodes randomly    if( _nn_random ) {      bool chosen;      do {	chosen = true;	index = random()%bps->size();	for( uint j = 0; j < nns.size(); j++ ) {	  if( nns[j]->_id == (*bps)[index]->_id ) {	    chosen = false;	    break;	  }	}	if( !chosen ) {	  TapDEBUG(5) << "Couldn't choose " << index << "(" 		      << (*bps)[index]->_addr << "/" 		      << print_guid((*bps)[index]->_id) 		      << ") for element " << i		      << " out of " << bps->size() << "; trying again" << endl;	} else {	  TapDEBUG(5) << "Chose " << index << "(" 		      << (*bps)[index]->_addr << "/" 		      << print_guid((*bps)[index]->_id) 		      << ") for element " << i << endl;	}      } while( !chosen );    }        NodeInfo *newnode = New NodeInfo( ((*bps)[index])->_addr, 				      ((*bps)[index])->_id );     nns.push_back( newnode );  }  // send all forward pointers at that level  for( uint i = 0; i < _base; i++ ) {    NodeInfo *newnode = _rt->read( args->alpha, i );    if( newnode != NULL ) {      nns.push_back( newnode );    }  }  // add yourself to the list  nns.push_back( New NodeInfo( ip(), id() ) );  ret->nodelist = nns;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -