⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 该代码是采用后缀匹配的经典结构化p2p模型
💻 C
📖 第 1 页 / 共 5 页
字号:
  }  while( now() < starttime + _declare_dead_time ) {    if( dst != ip() ) {      record_stat( type, num_args_id, num_args_else);    }    bool succ = doRPC(dst, fn, args, ret, timeout);    if (succ) {      return true;    }    timeout *= 2;  }  return false;}voidTapestry::check_node_loop(void * args){  ping_args pa;  ping_return pr;  // use ping callinfos   RPCSet ping_rpcset;  HashMap<unsigned, ping_callinfo*> ping_resultmap;  bool waiting = false;  unsigned condvar_token = 1;  TapDEBUG(3) << "Entered check_node_loop" << endl;  while( true ) {    if( !waiting ) {      // initialize the Channel and add it to the condition variable      // add it to the rpc map right away so no one else takes the 1 token      Channel *c = chancreate(sizeof(int*), 0);      assert( !_rpcmap[condvar_token] );      _rpcmap.insert( condvar_token, New RPCHandle(c, New Packet()) );      _check_nodes_waiting->wait_noblock(c);      ping_rpcset.insert(condvar_token);      waiting = true;      // this RPCHandle, channel and packet will get deleted by      // Node::_deleteRPC, called from rcvRPC      TapDEBUG(3) << "Starting loop in check_node_loop" << endl;      // check for any new IDs      for( vector<check_node_args *>::iterator i=_check_nodes->begin();	   i != _check_nodes->end(); i++ ) {		check_node_args *check = *i;	assert( check );	IPAddress checkip = check->ip;		GUID checkid = 	  ((Tapestry *)Network::Instance()->getnode(checkip))->id();	Time timeout;	if( checkid != ip() && _rt->contains( checkid ) ) {	  timeout = _rtt_timeout_factor * _rt->get_time( checkid );	} else {	  timeout = MAXTIME;	}		// start an asyncRPC and save the info	if( checkid != ip() ) {	  record_stat( STAT_PING, 0, 0);	}	unsigned rpc = asyncRPC( checkip, &Tapestry::handle_ping, &pa, &pr, 				 timeout );	assert(rpc);	ping_callinfo *pi = New ping_callinfo( checkip, checkid, now() );	pi->last_timeout = timeout;	ping_resultmap.insert(rpc, pi);	ping_rpcset.insert(rpc);		_rt->set_timeout( pi->id, true );		// now try to find a better node to put in your table from the cache	if( _lookup_learn ) {	  int digit = guid_compare( id(), pi->id );	  assert( digit != -1 );	  uint val = get_digit( pi->id, digit );	  NodeInfo *n = _cachebag->read( digit, val );	  if( n != NULL ) {	    _cachebag->remove( n->_id, false );	    _rt->add( n->_addr, n->_id, n->_distance, false);	    delete n;	  }	}		delete check;	      }      _check_nodes->clear();    }    // now be extremely clever and select over both the asyncRPCs and the    // channel that's waiting for new nodes to check    bool ok;    uint rpc = rcvRPC( &ping_rpcset, ok );        // TODO: what happens if we get an RPC back after dying/re-joining???    // if it's not the condvar one, do some stuff, otherwise go directly    // to the top of the loop and add more nodes    if( rpc != condvar_token ) {      assert(rpc);      ping_callinfo *pi = ping_resultmap[rpc];      if( ok ) {	// this RPC succeeded, so this node wasn't dead after all.  apologize.	if( pi->ip != ip() ) {	  record_stat( STAT_PING, 0, 0);	}	_rt->set_timeout( pi->id, false );	delete pi;      } else if( now() - pi->pingstart >= _declare_dead_time ) {	// this node is officially dead.  make it so.	_rt->remove( pi->id, false );	_rt->remove_backpointer( pi->ip, pi->id );	_recently_dead.push_back(pi->id);    	TapDEBUG(2) << "Declaring (" << pi->ip << "/" << print_guid(pi->id) 		    << ") dead in check_node_loop" << endl;	delete pi;      } else {	// otherwise schedule this person again, doubling the timeout	pi->last_timeout *= 2;	if( pi->ip != ip() ) {	  record_stat( STAT_PING, 0, 0);	}	unsigned newrpc = asyncRPC( pi->ip, &Tapestry::handle_ping, &pa, &pr, 				    pi->last_timeout );	assert(newrpc);	ping_resultmap.insert(newrpc, pi);	ping_rpcset.insert(newrpc);	      }    } else {      TapDEBUG(3) << "Notified via condvar in check_node_loop" << endl;      waiting = false;    }  }}voidTapestry::have_joined(){   joined = true;   _joining = false;   _waiting_for_join->notifyAll();   if( _verbose ) {     TapDEBUG(0) << "Finishing joining." << endl;   }   if( !_stab_scheduled && _stabtimer > 0 ) {     delaycb( random()%_stabtimer, &Tapestry::check_rt, (void *) 0 );     _stab_scheduled = true;   }}// External event that tells a node to contact the well-known node// and try to join.voidTapestry::join(Args *args){  TapDEBUG(5) << "j enter" << endl;  IPAddress wellknown_ip = args->nget<IPAddress>("wellknown");  TapDEBUG(3) << ip() << " Wellknown: " << wellknown_ip << endl;  if( _join_num == 0 && wellknown_ip == ip() ) {    notifyObservers();  } else if( _join_num > 0 ) {    notifyObservers( (ObserverInfo *) "join" );  }  if( _joining ) {    TapDEBUG(0) << "Tried to join while joining -- ignoring" << endl;    return;  }  uint curr_join = ++_join_num;  // might already be joined if init_state was used  if( joined ) {    TapDEBUG(5) << "j exit" << endl;    return;  }  _joining = true;  if( _verbose ) {    TapDEBUG(0) << "Tapestry join " << curr_join << endl;  }  // if we're the well known node, we're done  if( ip() == wellknown_ip ) {    have_joined();    TapDEBUG(5) << "j exit" << endl;    return;  }  // contact the well known machine, and have it start routing to the surrogate  join_args ja;  ja.ip = ip();  ja.id = id();  join_return jr;  uint attempts = 30;  // keep attempting to join until you can't attempt no mo'  do {    attempts--;    jr.failed = false;    bool succ = retryRPC( wellknown_ip, &Tapestry::handle_join, &ja, &jr, 			  STAT_JOIN, 1, 0);    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      return;    }        if( !succ ) {      TapDEBUG(0) << "Well known ip died!  BAD!" << endl;      return;    } else {      record_stat(STAT_JOIN, 1, 1);    }  } while( jr.failed && attempts >= 0 );  if( jr.failed ) {    TapDEBUG(0) << "All my joins failed!  BAD!" << endl;    return;  }  // now that the multicast is over, it's time for nearest neighbor  // ping everyone on the initlist  int init_level = guid_compare( jr.surr_id, id_digits() );   vector<NodeInfo *> seeds;  TapDEBUG(2) << "init level of " << init_level << endl;  for( int i = init_level; i >= 0; i-- ) {        // go through each member of the init list, add them to your routing     // table, and ask them for their forward and backward pointers     // to do this, set off two sets of asynchRPCs at once, one to ping all    // the nodes and find out their distances, and another to get the nearest    // neighbors.  these can't easily be combined since the nearest neighbor    // call does a ping itself (so measured rtt would be double).    RPCSet ping_rpcset;    HashMap<unsigned, ping_callinfo*> ping_resultmap;    Time before_ping = now();    TapDEBUG(3) << "initing level " << i << " out of " << init_level 		<< " size = " << initlist.size() << endl;    multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &initlist, 			   NULL, true );    RPCSet nn_rpcset;    HashMap<unsigned, nn_callinfo*> nn_resultmap;    unsigned int num_nncalls = 0;    for( uint j = 0; j < initlist.size(); j++ ) {      NodeInfo ni = *(initlist[j]);      // also do an async nearest neighbor call      nn_args *na = New nn_args();      na->ip = ip();      na->id = id();      na->alpha = i;      nn_return *nr = New nn_return();      record_stat(STAT_NN, 1, 1);      Time timeout = MAXTIME;      if( _rt->contains( ni._id ) ) {	timeout = _rtt_timeout_factor*_rt->get_time( ni._id );      }      unsigned rpc = asyncRPC( ni._addr, &Tapestry::handle_nn, na, nr, 			       timeout );      assert(rpc);      nn_resultmap.insert(rpc, New nn_callinfo(ni._addr, na, nr));      nn_rpcset.insert(rpc);      num_nncalls++;    }    multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, 			 false );    for( uint j = 0; j < num_nncalls; j++ ) {      bool ok;      unsigned donerpc = rcvRPC( &nn_rpcset, ok );      nn_callinfo *ncall = nn_resultmap[donerpc];      nn_return nnr = *(ncall->nr);      TapDEBUG(2) << "done with nn with " << ncall->ip << endl;      if( ok ) {	record_stat( STAT_NN, nnr.nodelist.size(), 0 );	for( uint k = 0; k < nnr.nodelist.size(); k++ ) {	  // make sure this one isn't on there yet	  // TODO: make this more efficient.  Maybe use a hash set or something	  NodeInfo *currseed = nnr.nodelist[k];	  	  // don't add ourselves, duh	  if( currseed->_id == id() ) {	    delete currseed;	    continue;	  }	  bool add = true;	  for( uint l = 0; l < seeds.size(); l++ ) {	    if( *currseed == *(seeds[l]) ) {	      add = false;	      break;	    }	  }	  if( add ) {	    seeds.push_back( currseed );	    TapDEBUG(3) << " has a seed of " << currseed->_addr << " and " << 	      print_guid( currseed->_id ) << endl;	  } else {	    delete currseed;	  }	}	TapDEBUG(3) << "done with adding seeds with " << ncall->ip << endl;      }      // TODO: for some reason the compiler gets mad if I try to delete nr      // in the destructor      delete ncall->nr;      ncall->nr = NULL;      delete ncall;    }    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      // need to delete all the seeds      for( uint j = 0; j < seeds.size(); j++ ) {	NodeInfo *currseed = seeds[j];	delete currseed;      }      return;    }    // now that we have all the seeds we want from this one,     // see if they are in the closest k    NodeInfo * closestk[_k];    // initialize to NULL    for( uint k = 0; k < _k; k++ ) {      closestk[k] = NULL;    }    bool closest_full = false;    // add these guys to routing table    map< IPAddress, Time> timing;    TapDEBUG(3) << "About to add seeds to the routing table." << endl;    multi_add_to_rt( &seeds, &timing );    TapDEBUG(3) << "Done with adding seeds to the routing table." << endl;    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      // need to delete all the seeds      for( uint j = 0; j < seeds.size(); j++ ) {	NodeInfo *currseed = seeds[j];	delete currseed;      }      return;    }    for( uint j = 0; j < seeds.size(); j++ ) {      NodeInfo *currseed = seeds[j];      // add them all to the routing table (this gets us the ping time for free      TapDEBUG(3) << "about to get distance for " << currseed->_addr << endl;      //add_to_rt( currseed->_addr, currseed->_id );      TapDEBUG(3) << "added to rt for " << currseed->_addr << endl;      //bool ok = false;      currseed->_distance = timing[currseed->_addr];      TapDEBUG(3) << "got distance for " << currseed->_addr << endl;      // is there anyone on the list farther than you?  if so, replace      bool added = false;      for( uint k = 0; k < _k; k++ ) {	NodeInfo * currclose = closestk[k];	if( currclose == NULL || 	    (closest_full && currclose->_distance > currseed->_distance ) ) {	  if( currclose != NULL ) {	    delete currclose;	  }	  closestk[k] = currseed;	  added = true;	  if( k == _k - 1 ) {	    closest_full = true;	  }	  //TapDEBUG(2) << "close is " << currseed->_addr << endl;	  break;	}      }      if( !added ) {	// we're throwing this person away, so delete the memory	delete currseed;      }    }    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      // need to delete all the currclose now      for( uint j = 0; j < _k; j++ ) {	NodeInfo *currseed = closestk[j];	delete currseed;      }      return;    }    TapDEBUG(2) << "gathered closest for level " << i << endl;    // these k are the next initlist    for( uint l = 0; l < initlist.size(); l++ ) {      delete initlist[l];    }    initlist.clear();    seeds.clear();    for( uint k = 0; k < _k; k++ ) {      NodeInfo *currclose = closestk[k];      if( currclose != NULL ) {	TapDEBUG(2) << "close is " << currclose->_addr << endl;	initlist.push_back( currclose );      }    }  }  for( uint l = 0; l < initlist.size(); l++ ) {    delete initlist[l];  }  initlist.clear();  have_joined();  TapDEBUG(5) << "j exit" << endl;  TapDEBUG(2) << "join done" << endl;  TapDEBUG(2) << *_rt << endl;}voidTapestry::handle_join(join_args *args, join_return *ret){  TapDEBUG(5) << "hj enter" << endl;  TapDEBUG(2) << "got a join message from " << args->ip << "/" <<     print_guid(args->id) << endl;  // not allowed to participate in your own join!  if( args->ip == ip() ) {    ret->failed = true;    TapDEBUG(5) << "hj exit" << endl;    return;  }  // if our join has not yet finished, we must delay the handling of this  // person's join.  while( !joined ) {    _waiting_for_join->wait();    // hmmm. if we're now dead, indicate some kind of timeout probably    if( !alive() ) {      ret->failed = true;      TapDEBUG(5) << "hj exit" << endl;      return; //TODO: ????    }  }  // route toward the root  IPAddress *ips = New IPAddress[_redundant_lookup_num];  for( uint i = 0; i < _redundant_lookup_num; i++ ) {    ips[i] = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -