⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 该代码是采用后缀匹配的经典结构化p2p模型
💻 C
📖 第 1 页 / 共 5 页
字号:
  }  ips[0] = args->ip;  next_hop( args->id, &ips, _redundant_lookup_num );  uint i = 0;  for( ; i < _redundant_lookup_num; i++ ) {    IPAddress next = ips[i];    if( next == 0 ) {      continue;    }    if( next == ip() ) {      // we are the surrogate root for this node, start the integration          TapDEBUG(2) << "is the surrogate root for " << args->ip << endl;            // start by sending the new node all of the nodes in your table      // up to the digit you share      int alpha = guid_compare( args->id, id_digits() );      vector<NodeInfo *> thisrow;            bool stop = true;      for( int i = 0; i <= alpha; i++ ) {		for( uint j = 0; j < _base; j++ ) {	  NodeInfo *ni = _rt->read( i, j );	  if( ni != NULL ) {	    // keep going if there is someone else on this level	    if( ni->_addr != ip() ) {	      stop = false;	    }	    thisrow.push_back( ni );	  }	}		if( stop ) break;	stop = true;	      }      nodelist_args na;      na.nodelist = thisrow;      nodelist_return nr;      record_stat(STAT_NODELIST, na.nodelist.size(), 0);      unsigned rpc = asyncRPC( args->ip, &Tapestry::handle_nodelist, 			       &na, &nr, MAXTIME );      // start the multicast      mc_args mca;      mca.new_ip = args->ip;      mca.new_id = args->id;      mca.alpha = alpha;      mca.from_lock = false;      mc_return mcr;            // make the watchlist      vector<bool *> wl;      for( int i = 0; i < alpha+1; i++ ) {	bool *level = New bool[_base];	wl.push_back(level);	for( uint j = 0; j < _base; j++ ) {	  wl[i][j] = false;	}      }      mca.watchlist = wl;            handle_mc( &mca, &mcr );      // finish up the nodelist rpc      RPCSet rpcset;      rpcset.insert(rpc);      bool ok;      rcvRPC( &rpcset, ok );      if( ok ) {	record_stat(STAT_NODELIST, 0, 0);      } else {	if( retryRPC( args->ip, &Tapestry::handle_nodelist, 		      &na, &nr, STAT_NODELIST, na.nodelist.size(), 0 ) ) {	  record_stat(STAT_NODELIST, 0, 0);	}      }      // free the nodelist      for( uint i = 0; i < na.nodelist.size(); i++ ) {	delete na.nodelist[i];      }            // free the bools!      for( int i = 0; i < alpha+1; i++ ) {	bool *level = wl[i];	delete level;      }      TapDEBUG(2) << "surrogate for " << args->ip 		  << " is done with handlejoin" << endl;      ret->surr_id = id();      break;    } else {      // not the surrogate      // recursive routing yo      TapDEBUG(2) << "Forwarding join req for " << args->ip << " to " 		  << next << endl;      bool succ = retryRPC( next, &Tapestry::handle_join, args, ret, 			    STAT_JOIN, 1, 0);      if( succ ) {	record_stat(STAT_JOIN, 1, 1);      }      if( succ && !ret->failed ) {	// success!	break;      } else {	ret->failed = false;      }    }  }  // if we were never successful, set the failed flag  if( i == _redundant_lookup_num ) {    ret->failed = true;  }  delete ips;  TapDEBUG(5) << "hj exit" << endl;}void Tapestry::handle_nodelist(nodelist_args *args, nodelist_return *ret){  TapDEBUG(5) << "nhl enter" << endl;  TapDEBUG(3) << "handling a nodelist message" << endl;  // add each to your routing table  multi_add_to_rt( &(args->nodelist), NULL );  /*  for( uint i = 0; i < args->nodelist.size(); i++ ) {    NodeInfo * curr_node = args->nodelist[i];    add_to_rt( curr_node->_addr, curr_node->_id );  }  */  TapDEBUG(5) << "nhl exit" << endl;}voidTapestry::handle_mc(mc_args *args, mc_return *ret){  TapDEBUG(5) << "mc enter" << endl;  TapDEBUG(2) << "got multicast message for " << args->new_ip << endl;  // not allowed to participate in your own join!  if( args->new_ip == ip() ) {    TapDEBUG(5) << "mc exit" << endl;    return;  }  // lock this node  _rt->set_lock( args->new_ip, args->new_id );  // make sure that it knows about you as well  mcnotify_args mca;  mca.ip = ip();  mca.id = id();  mcnotify_return mcr;  // check the watchlist and see if you know about any nodes that  // can fill those prefixes  vector<NodeInfo *> nodelist;  for( uint i = 0; i < args->watchlist.size(); i++ ) {    for( uint j = 0; j < _base; j++ ) {      if( !args->watchlist[i][j] ) {	NodeInfo * ni = _rt->read( i, j );	if( ni != NULL && ni->_addr != ip() && ni->_addr != args->new_ip ) {	  nodelist.push_back( ni );	  args->watchlist[i][j] = true;	} else {	  if( ni != NULL ) {	    delete ni;	  }	}      }    }  }  mca.nodelist = nodelist;  record_stat(STAT_MCNOTIFY, 1+nodelist.size(), 0);  unsigned mcnrpc = asyncRPC( args->new_ip, &Tapestry::handle_mcnotify, 			       &mca, &mcr, MAXTIME );  // don't go on if this is from a lock  if( !args->from_lock ) {    RPCSet rpcset;    HashMap<unsigned, mc_callinfo*> resultmap;    unsigned int numcalls = 0;    // then, find any other node that shares this prefix and multicast    for( uint i = args->alpha; i < _digits_per_id; i++ ) {      for( uint j = 0; j < _base; j++ ) {	NodeInfo *ni = _rt->read( i, j );	// don't RPC to ourselves or to the new node	if( ni == NULL || ni->_addr == ip() || ni->_addr == args->new_ip ) {	  if( ni != NULL ) {	    delete ni;	  }	  continue;	} else {	  mc_args *mca = New mc_args();	  mc_return *mcr = New mc_return();	  mca->new_ip = args->new_ip;	  mca->new_id = args->new_id;	  mca->alpha = i + 1;	  mca->from_lock = false;	  mca->watchlist = args->watchlist;	  TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << 	    ni->_addr << "/" << print_guid( ni->_id ) << endl;	  record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base);	  unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr,				   _rtt_timeout_factor*_rt->get_time(ni->_id));	  assert(rpc);	  resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr));	  rpcset.insert(rpc);	  numcalls++;	  delete ni;	}	      }    }        // also, multicast to any locks that might need it    vector <NodeInfo *> * locks = _rt->get_locks( args->new_id );    for( uint i = 0; i < locks->size(); i++ ) {      NodeInfo *ni = (*locks)[i];      if( ni->_addr != args->new_ip ) {	mc_args *mca = New mc_args();	mc_return *mcr = New mc_return();	mca->new_ip = args->new_ip;	mca->new_id = args->new_id;	mca->alpha = args->alpha;	mca->from_lock = true;	mca->watchlist = args->watchlist;	TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << 	  ni->_addr << "/" << print_guid( ni->_id ) << " as a lock " << endl;	record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base);	Time timeout = MAXTIME;	if( _rt->contains( ni->_id ) ) {	  timeout = _rtt_timeout_factor*_rt->get_time( ni->_id );	}	unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr, 				 timeout );	assert(rpc);	resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr));	rpcset.insert(rpc);	numcalls++;      }    }        Time startmc_time = now();    // wait for them all to return    while( rpcset.size() > 0 ) {      bool ok;      unsigned donerpc = rcvRPC( &rpcset, ok );      if( ok || now() >= startmc_time + _declare_dead_time ) {	if( ok ) {	  record_stat(STAT_MC, 0, 0);	}	mc_callinfo *ci = resultmap[donerpc];	TapDEBUG(2) << "mc to " << ci->ip << " about " << args->new_ip << 	  " is done.  ok = " << ok << endl;	delete ci;      } else {	// do it again sam	mc_callinfo *ci = resultmap[donerpc];	resultmap.remove( donerpc );	GUID ci_id = ((Tapestry *)Network::Instance()->getnode(ci->ip))->id();	Time timeout = MAXTIME;	if( _rt->contains( ci_id ) ) {	  timeout = _rtt_timeout_factor*_rt->get_time( ci_id );	}	record_stat(STAT_MC, 1, 2+ci->ma->watchlist.size()*_base);	unsigned rpc = asyncRPC( ci->ip, &Tapestry::handle_mc, ci->ma, 				 ci->mr, timeout );	assert(rpc);	rpcset.insert(rpc);	resultmap.insert(rpc, ci);      }    }  }  // finish up mcnotify  RPCSet mcnrpcset;  mcnrpcset.insert(mcnrpc);  bool ok;  rcvRPC( &mcnrpcset, ok );  if( ok ) {    record_stat(STAT_MCNOTIFY, 0, 0);  } else {    ok = retryRPC( args->new_ip, &Tapestry::handle_mcnotify, 		   &mca, &mcr, STAT_MCNOTIFY, 1+nodelist.size(), 0);    if( ok ) {      record_stat(STAT_MCNOTIFY, 0, 0);    }  }  // free the nodelist  for( uint i = 0; i < nodelist.size(); i++ ) {    delete nodelist[i];  }  if( !ok ) {    _rt->remove_lock( args->new_ip, args->new_id );    TapDEBUG(3) << "Notify to new node failed, abandoning mc!" << endl;    TapDEBUG(5) << "mc exit" << endl;    return;  }  // we wait until here to add the new node to your table  // since other simultaneous inserts will require sending the mc to  // at least one unlocked pointer, and if this guy is in the routing table  // already, we may send to it even though its locked.  add_to_rt( args->new_ip, args->new_id );  _rt->remove_lock( args->new_ip, args->new_id );  TapDEBUG(2) << "multicast done for " << args->new_ip << endl;  // TODO: object pointer transferal  TapDEBUG(5) << "mc exit" << endl;}void Tapestry::handle_nn(nn_args *args, nn_return *ret){  TapDEBUG(5) << "nn enter" << endl;  // send back all backward pointers  vector<NodeInfo *> nns;  vector<NodeInfo *> *bps = _rt->get_backpointers( args->alpha );  for( uint i = 0; i < bps->size(); i++ ) {    NodeInfo *newnode = New NodeInfo( ((*bps)[i])->_addr, ((*bps)[i])->_id );     nns.push_back( newnode );  }  // send all forward pointers at that level  for( uint i = 0; i < _base; i++ ) {    NodeInfo *newnode = _rt->read( args->alpha, i );    if( newnode != NULL ) {      nns.push_back( newnode );    }  }  // add yourself to the list  nns.push_back( New NodeInfo( ip(), id() ) );  ret->nodelist = nns;  // finally, add this guy to our table  add_to_rt( args->ip, args->id );  TapDEBUG(5) << "nn exit" << endl;}void Tapestry::handle_repair(repair_args *args, repair_return *ret){  TapDEBUG(5) << "rep enter" << endl;  // send back all backward pointers  vector<NodeInfo> nns;  for( uint j = 0; j < args->bad_ids->size(); j++ ) {    GUID bad = (*(args->bad_ids))[j];    uint level = (*(args->levels))[j];    uint digit = (*(args->digits))[j];    assert( level < _digits_per_id && digit < _base );        // TODO: limit the number?    RouteEntry *re = _rt->get_entry( level, digit );    for( uint i = 0; re != NULL && i < re->size(); i++ ) {      NodeInfo *next = re->get_at(i);      if( next != NULL && next->_addr != ip() && next->_id != bad ) {	nns.push_back( *next );      }    }      }  ret->nodelist = nns;  TapDEBUG(5) << "rep exit" << endl;}void Tapestry::handle_ping(ping_args *args, ping_return *ret){  TapDEBUG(4) << "pinged." << endl;  // do nothing}void Tapestry::handle_mcnotify(mcnotify_args *args, mcnotify_return *ret){  TapDEBUG(5) << "mcn enter" << endl;  TapDEBUG(3) << "got mcnotify from " << args->ip << endl;  NodeInfo *mc_node = New NodeInfo( args->ip, args->id );  initlist.push_back( mc_node );  // add all the nodes on the nodelist as well  nodelist_args na;  na.nodelist = args->nodelist;  nodelist_return nr;  handle_nodelist( &na, &nr );  TapDEBUG(5) << "mcn exit" << endl;}void Tapestry::add_to_rt( IPAddress new_ip, GUID new_id ){  // first get the distance to this node  // TODO: asynchronous pings would be nice  // TODO: also, maybe a ping cache of some kind  bool ok = true;  Time distance = ping( new_ip, new_id, ok );  if( ok ) {    // the RoutingTable takes care of placing (and removing) backpointers     // for us    _rt->add( new_ip, new_id, distance );    if( _lookup_learn ) {      _cachebag->remove( new_id, false );    }  }  return;}boolTapestry::stabilized(vector<GUID> lid){    // if we don't think we've joined, we can't possibly be stable  if( !joined ) {    TapDEBUG(1) << "Haven't even joined yet." << endl;    return false;  }  // for every GUID, make sure it either exists in your routing table,  // or someone else does  // TODO: do we need locking in this function?  for( uint i = 0; i < lid.size(); i++ ) {    GUID currguid = lid[i];    if( !_rt->contains(currguid) ) {      int match = guid_compare( currguid, id_digits() );      if( match == -1 ) {	TapDEBUG(1) << "doesn't have itself in the routing table!" << endl;	return false;      } else {	NodeInfo * ni = _rt->read( match, get_digit( currguid, match ) );	if( ni == NULL ) {	  TapDEBUG(1) << "has a hole in the routing table at (" << match <<	    "," << get_digit( currguid, match ) << ") where " << 	    print_guid( currguid ) << " would fit." << endl;	    return false;	} else {	  delete ni;	}      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -