client.c

来自「基于DHT的对等协议」· C语言 代码 · 共 651 行 · 第 1/2 页

C
651
字号
    rs->timemark ();    str data = block_t->produce_block_data ();    ptr<dhash_block> ret_block = New refcounted<dhash_block> (data,							      rs->key.ctype);    ret_block->ID = rs->key.ID;    ret_block->expiration = block->expiration;    ret_block->hops = rs->r.size ();    ret_block->errors = rs->errors;    ret_block->retries = block->errors;        for (size_t i = 1; i < rs->times.size (); i++) {      timespec diff = rs->times[i] - rs->times[i - 1];      ret_block->times.push_back (diff.tv_sec * 1000 +			      int (diff.tv_nsec/1000000));    }        rs->complete (DHASH_OK, ret_block);    rs = NULL;  } else {    if (rs->incoming_rpcs == 0 && rs->nextsucc > rs->succs.size ()) {      info << myID << ": retrieve (" << rs->key << "): all RPCs returned but no good block.\n";      rs->complete (DHASH_NOENT, NULL);      rs = NULL;    }  }}struct orderer {  float d_;  size_t i_;  static int cmp (const void *a_, const void *b_) {    const orderer *a = (orderer *) a_, *b = (orderer *) b_;    return (int) (a->d_ - b->d_);  }};static voidorder_succs (ptr<locationtable> locations,	     const Coord &me, const vec<chord_node> &succs,	     vec<chord_node> &out, u_long max){  //max is the number of successors we should order: the first max in the list  u_long lim = max;  // 0 means order them all  if (max == 0 || max > succs.size ()) lim = succs.size ();  orderer *d2me = New orderer [succs.size()];  for (size_t i = 0; i < lim; i++) {    ptr<location> l = NULL;    if (locations)       l = locations->lookup (succs[i].x);    if (l) {      // Have actual measured latencies, so might as well use them.      d2me[i].d_ = l->distance ();    } else {      Coord cursucc (succs[i]);      d2me[i].d_ = Coord::distance_f (me, cursucc);    }    d2me[i].i_ = i;  }  qsort (d2me, lim, sizeof (*d2me), &orderer::cmp);  out.clear ();  for (size_t i = 0; i < lim; i++) {#ifdef VERBOSE_LOG    char buf[10]; // argh. please shoot me.    sprintf (buf, "%5.2f", d2me[i].d_);    modlogger ("orderer", modlogger::TRACE) << d2me[i].i_ << " "					    << succs[d2me[i].i_] << " "					    << buf << "\n";#endif /* VERBOSE_LOG */        out.push_back (succs[d2me[i].i_]);  }  delete[] d2me;  //copy any of the ones we didn't consider verbatim  if (max == 0) return;  long remaining = succs.size () - lim;  if (remaining > 0)     for (int i = 0; i < remaining; i++)      out.push_back (succs[lim+i]);}// Pull block fragments down from a successor list// public interface, used by dhash_impl if completing a FETCHRECvoiddhashcli::assemble (blockID blockID, cb_ret cb, vec<chord_node> succs, route r){  ptr<rcv_state> rs = New refcounted<rcv_state> (blockID, cb);  rs->r = r;  if (dhash_tcp_transfers)    rs->succopt = true;  ptr<dhblock> blk = allocate_dhblock (blockID.ctype);  doassemble (rs, blk, succs);}//---------------------------- insert -----------------------voiddhashcli::insert (ref<dhash_block> block, cbinsert_path_t cb, 		  int options, ptr<chordID> guess){  ptr<location> l = NULL;  if (guess)     l =  clntnode->locations->lookup (*guess);  if (!(options & DHASHCLIENT_EXPIRATION_SUPPLIED))    block->expiration = (default_lifetime < 0) ?      0 :      (timenow + default_lifetime);  if (!l)     lookup (block->ID, wrap (this, &dhashcli::insert_lookup_cb, 			     block, cb, options));  else     clntnode->get_succlist (l, wrap (this, &dhashcli::insert_succlist_cb, 				     block, cb, *guess, options));  }voiddhashcli::insert_succlist_cb (ref<dhash_block> block, cbinsert_path_t cb,			      chordID guess, int options,			      vec<chord_node> succs, chordstat status){  if (status) {    vec<chordID> rrr;    cb (DHASH_CHORDERR, rrr);    info << "insert_succlist_cb: failure (" << block->ID << "): "	 << status << "\n";    return;  }  route r;  r.push_back (clntnode->locations->lookup (guess));  insert_lookup_cb (block, cb, options, DHASH_OK, succs, r);}voiddhashcli::insert_lookup_cb (ref<dhash_block> block, cbinsert_path_t cb, 			    int options, dhash_stat status, 			    vec<chord_node> succs, route r){  vec<chordID> mt;  if (status) {    (*cb) (DHASH_CHORDERR, mt);    return;  }  //XXX run an allocator   ptr<dhblock> blk = allocate_dhblock (block->ctype);  if (succs.size () < blk->min_put ()) {    // this is a failure condition, since we can't hope to reconstruct    // the block reliably.    info << "Not enough successors for insert: |succs| " << succs.size ()	 << ", DFRAGS " << blk->min_put () << "\n";    (*cb) (DHASH_CHORDERR, mt);    return;  }  if (succs.size () < blk->num_put ())    // benjie: this is not a failure condition, since if we don't    // receive num_efrags number of store replies in insert_store_cb,    // we are still allowed to proceed.    info << "Number of successors less than desired: |succs| " << succs.size ()	 << ", EFRAGS " << blk->num_put () << "\n";  vec<chord_node> unique_succs;  filter_succs (succs, unique_succs);  succs = unique_succs;  while (succs.size () > blk->num_put ())     succs.pop_back ();  ref<sto_state> ss = New refcounted<sto_state> (block, cb);  ss->succs = succs;  ss->r = r;  ss->blk = blk;  // Track number of times insert_store_cb is to be called.  ss->out = succs.size ();  for (u_int i = 0; i < succs.size(); i++) {      ptr<location> dest = clntnode->locations->lookup_or_create (succs[i]);      str frag = blk->generate_fragment (block, i);      dhash_store::execute (clntnode, dest, 			    blockID(block->ID, block->ctype),			    frag,			    block->expiration,			    wrap (this, &dhashcli::insert_store_cb,  				  ss, i, getusec ()),			    get_store_status (block->ctype));       // XXX reactivate retry later  }  return;    }voiddhashcli::insert_store_cb (ref<sto_state> ss, u_int i, u_int64_t t,			   dhash_stat err, chordID id, bool present){  u_int nstores = ss->blk->num_put ();  u_int min_needed = ss->blk->min_put ();  ss->out -= 1;  info << "store " << ss->block->ID << " (frag " <<    i + 1 << "/" << nstores << ") -> " << id << " in " <<    (getusec () - t)/1000 << "ms: " << err << "\n";  if (!err)    ss->good += 1;  if (err == DHASH_DISKFULL)    ss->diskfull = true;  // Count down until all outstanding RPCs have returned  if (ss->out == 0) {    vec<chordID> r_ret;    chordID myID = clntnode->my_ID ();    if (ss->good < nstores) {      warning << myID << ": store (" << ss->block->ID << "): only stored "	      << ss->good << " of " << nstores << " encoded.\n";      if (ss->good < min_needed) {	warning << myID << ": store (" << ss->block->ID << "): failed;"	  " insufficient frags/blocks stored.\n";		r_ret.push_back (ss->succs[0].x);	(*ss->cb) (ss->diskfull ? DHASH_DISKFULL : DHASH_ERR, r_ret);	// We should do something here to try and store this fragment	// somewhere else.	return;      }    }        for (unsigned int i = 0; i < ss->r.size (); i++)      r_ret.push_back (ss->r[i]->id ());    (*ss->cb) (DHASH_OK, r_ret);  }}//------------------ helper chord wrappers -------------voiddhashcli::lookup (chordID blockID, dhashcli_lookupcb_t cb){  clntnode->find_successor    (blockID, wrap (this, &dhashcli::lookup_findsucc_cb, blockID, cb));}voiddhashcli::lookup_findsucc_cb (chordID blockID, dhashcli_lookupcb_t cb,			      vec<chord_node> s, route path, chordstat err){  if (err)     (*cb) (DHASH_CHORDERR, s, path);  else    (*cb) (DHASH_OK, s, path);}//------------------ sendblock ------------------------voiddhashcli::sendblock (ptr<location> dst, blockID bid, str data,		     u_int32_t expiration,		     sendblockcb_t cb, int nonce /* = 0 */){  dhash_store::execute     (clntnode, dst, bid, data, expiration,     wrap (this, &dhashcli::sendblock_cb, cb, data.len ()),     DHASH_FRAGMENT,     nonce); //XXX choose DHASH_STORE if appropriate (i.e. full replica)}voiddhashcli::sendblock (ptr<location> dst, blockID bid_to_send, 		     ptr<dhblock_srv> srv,		     sendblockcb_t cb, int nonce /* = 0 */){  trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send 	<< ") to dest: " << dst << "\n";  srv->fetch (bid_to_send.ID, 		  wrap (this, &dhashcli::sendblock_fetch_cb, dst, 			bid_to_send, cb, nonce));  trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send 	<< ") adbd request sent\n";}voiddhashcli::sendblock_fetch_cb (ptr<location> dst, blockID bid_to_send,			      sendblockcb_t cb, int nonce,			      adb_status stat, adb_fetchdata_t obj){  trace << clntnode->my_ID () << ": dhashcli::sendblock_fetch_cb (" 	<< bid_to_send << ") has been fetched " << dst << ", stat " 	<< stat << "\n";  if (stat != ADB_OK) {    cb (DHASH_NOENT, false, 0);    return;  }  dhash_store::execute     (clntnode, dst, bid_to_send, obj.data, obj.expiration,     wrap (this, &dhashcli::sendblock_cb, cb, obj.data.len ()),     get_store_status(bid_to_send.ctype), //XXX store_status broken     nonce); }voiddhashcli::sendblock_cb (sendblockcb_t cb, 		        u_int32_t sz,			dhash_stat err, chordID dest, bool present){  (*cb) (err, present, sz);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?