⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client.c

📁 chord 源码 http://pdos.csail.mit.edu/chord/
💻 C
📖 第 1 页 / 共 2 页
字号:
    rs->complete (DHASH_OK, ret_block);    rs = NULL;  } else {    if (rs->incoming_rpcs == 0 && rs->nextsucc > rs->succs.size ()) {      info << myID << ": retrieve (" << rs->key << "): all RPCs returned but no good block.\n";      rs->complete (DHASH_NOENT, NULL);      rs = NULL;    }  }}struct orderer {  float d_;  size_t i_;  static int cmp (const void *a_, const void *b_) {    const orderer *a = (orderer *) a_, *b = (orderer *) b_;    return (int) (a->d_ - b->d_);  }};static voidorder_succs (ptr<locationtable> locations,	     const Coord &me, const vec<chord_node> &succs,	     vec<chord_node> &out, u_long max){  //max is the number of successors we should order: the first max in the list  u_long lim = max;  // 0 means order them all  if (max == 0 || max > succs.size ()) lim = succs.size ();  orderer *d2me = New orderer [succs.size()];  for (size_t i = 0; i < lim; i++) {    ptr<location> l = NULL;    if (locations)       l = locations->lookup (succs[i].x);    if (l) {      // Have actual measured latencies, so might as well use them.      d2me[i].d_ = l->distance ();    } else {      Coord cursucc (succs[i]);      d2me[i].d_ = Coord::distance_f (me, cursucc);    }    d2me[i].i_ = i;  }  qsort (d2me, lim, sizeof (*d2me), &orderer::cmp);  out.clear ();  for (size_t i = 0; i < lim; i++) {#ifdef VERBOSE_LOG    char buf[10]; // argh. please shoot me.    sprintf (buf, "%5.2f", d2me[i].d_);    modlogger ("orderer", modlogger::TRACE) << d2me[i].i_ << " "					    << succs[d2me[i].i_] << " "					    << buf << "\n";#endif /* VERBOSE_LOG */        out.push_back (succs[d2me[i].i_]);  }  delete[] d2me;  //copy any of the ones we didn't consider verbatim  if (max == 0) return;  long remaining = succs.size () - lim;  if (remaining > 0)     for (int i = 0; i < remaining; i++)      out.push_back (succs[lim+i]);}// Pull block fragments down from a successor list// public interface, used by dhash_impl if completing a FETCHRECvoiddhashcli::assemble (blockID blockID, cb_ret cb, vec<chord_node> succs, route r){  ptr<rcv_state> rs = New refcounted<rcv_state> (blockID, cb);  rs->r = r;  if (dhash_tcp_transfers)    rs->succopt = true;  ptr<dhblock> blk = allocate_dhblock (blockID.ctype);  doassemble (rs, blk, succs);}//---------------------------- insert -----------------------voiddhashcli::insert (ref<dhash_block> block, cbinsert_path_t cb, 		  int options, ptr<chordID> guess){  ptr<location> l = NULL;  if (guess)     l =  clntnode->locations->lookup (*guess);  if (!l)     lookup (block->ID, wrap (this, &dhashcli::insert_lookup_cb, 			     block, cb, options));  else     clntnode->get_succlist (l, wrap (this, &dhashcli::insert_succlist_cb, 				     block, cb, *guess, options));  }voiddhashcli::insert_succlist_cb (ref<dhash_block> block, cbinsert_path_t cb,			      chordID guess, int options,			      vec<chord_node> succs, chordstat status){  if (status) {    vec<chordID> rrr;    cb (DHASH_CHORDERR, rrr);    info << "insert_succlist_cb: failure (" << block->ID << "): "	 << status << "\n";    return;  }  route r;  r.push_back (clntnode->locations->lookup (guess));  insert_lookup_cb (block, cb, options, DHASH_OK, succs, r);}u_int64_t start_insert, end_insert, total_insert = 0;voiddhashcli::insert_lookup_cb (ref<dhash_block> block, cbinsert_path_t cb, 			    int options, dhash_stat status, 			    vec<chord_node> succs, route r){  vec<chordID> mt;  if (status) {    (*cb) (DHASH_CHORDERR, mt);    return;  }  //XXX run an allocator   ptr<dhblock> blk = allocate_dhblock (block->ctype);  if (succs.size () < blk->min_put ()) {    // this is a failure condition, since we can't hope to reconstruct    // the block reliably.    info << "Not enough successors for insert: |succs| " << succs.size ()	 << ", DFRAGS " << blk->min_put () << "\n";    (*cb) (DHASH_CHORDERR, mt);    return;  }  if (succs.size () < blk->num_put ())    // benjie: this is not a failure condition, since if we don't    // receive num_efrags number of store replies in insert_store_cb,    // we are still allowed to proceed.    info << "Number of successors less than desired: |succs| " << succs.size ()	 << ", EFRAGS " << blk->num_put () << "\n";  while (succs.size () > blk->num_put ())     succs.pop_back ();  ref<sto_state> ss = New refcounted<sto_state> (block, cb);  ss->succs = succs;    timeval tp;  gettimeofday (&tp, NULL);  start_insert = tp.tv_sec * (u_int64_t) 1000000 + tp.tv_usec;  for (u_int i = 0; i < succs.size(); i++) {      // Count up for each RPC that will be dispatched      ss->out += 1;            ptr<location> dest = clntnode->locations->lookup_or_create (succs[i]);      str frag = blk->generate_fragment (block, i);      dhash_store::execute (clntnode, dest, 			    blockID(block->ID, block->ctype),			    frag,			    wrap (this, &dhashcli::insert_store_cb,  				  ss, r, i,				  blk->num_put (), 				  blk->min_put ()), 			    get_store_status(block->ctype));       // XXX reactivate retry later  }  return;    }voiddhashcli::insert_store_cb (ref<sto_state> ss, route r, u_int i,			   u_int nstores, u_int min_needed,			   dhash_stat err, chordID id, bool present){  ss->out -= 1;  if (err) {    info << "fragment/block store failed: " << ss->block->ID	 << " fragment " << i << "/" << nstores	 << ": " << err << "\n";  } else {    info << "fragment/block store ok: " << ss->block->ID	 << " fragment " << i << "/" << nstores	 << " at " << id << "\n";    ss->good += 1;  }  // Count down until all outstanding RPCs have returned  vec<chordID> r_ret;  if (ss->out == 0) {    chordID myID = clntnode->my_ID ();    if (ss->good < nstores) {      warning << myID << ": store (" << ss->block->ID << "): only stored "	      << ss->good << " of " << nstores << " encoded.\n";      if (ss->good < min_needed) {	warning << myID << ": store (" << ss->block->ID << "): failed;"	  " insufficient frags/blocks stored.\n";		r_ret.push_back (ss->succs[0].x);	(*ss->cb) (DHASH_ERR, r_ret);	// We should do something here to try and store this fragment	// somewhere else.	return;      }    }        for (unsigned int i = 0; i < r.size (); i++)      r_ret.push_back (r[i]->id ());    (*ss->cb) (DHASH_OK, r_ret);  }}//------------------ helper chord wrappers -------------voiddhashcli::lookup (chordID blockID, dhashcli_lookupcb_t cb){  clntnode->find_successor    (blockID, wrap (this, &dhashcli::lookup_findsucc_cb, blockID, cb));}voiddhashcli::lookup_findsucc_cb (chordID blockID, dhashcli_lookupcb_t cb,			      vec<chord_node> s, route path, chordstat err){  if (err)     (*cb) (DHASH_CHORDERR, s, path);  else    (*cb) (DHASH_OK, s, path);}//------------------ sendblock ------------------------voiddhashcli::sendblock (ptr<location> dst, blockID bid, str data,		     sendblockcb_t cb, int nonce /* = 0 */){  dhash_store::execute     (clntnode, dst, bid, data,     wrap (this, &dhashcli::sendblock_cb, cb),     DHASH_FRAGMENT,     nonce); //XXX choose DHASH_STORE if appropriate (i.e. full replica)}voiddhashcli::sendblock (ptr<location> dst, blockID bid_to_send, 		     ptr<dhblock_srv> srv,		     sendblockcb_t cb, int nonce /* = 0 */){  trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send 	<< ") to dest: " << dst << "\n";  srv->fetch (bid_to_send.ID, 		  wrap (this, &dhashcli::sendblock_fetch_cb, dst, 			bid_to_send, cb, nonce));  trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send 	<< ") adbd request sent\n";}voiddhashcli::sendblock_fetch_cb (ptr<location> dst, blockID bid_to_send,			      sendblockcb_t cb, int nonce, adb_status stat,			      chordID key, str data){  trace << clntnode->my_ID () << ": dhashcli::sendblock_fetch_cb (" 	<< bid_to_send << ") has been fetched " << dst << ", stat " 	<< stat << "\n";  if (stat != ADB_OK) {    cb (DHASH_NOENT, false);    return;  }  dhash_store::execute     (clntnode, dst, bid_to_send, data,     wrap (this, &dhashcli::sendblock_cb, cb),     get_store_status(bid_to_send.ctype), //XXX store_status broken     nonce); }voiddhashcli::sendblock_cb (callback<void, dhash_stat, bool>::ref cb, 			  dhash_stat err, chordID dest, bool present){  (*cb) (err, present);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -