client.c
来自「基于DHT的对等协议」· C语言 代码 · 共 651 行 · 第 1/2 页
C
651 行
rs->timemark (); str data = block_t->produce_block_data (); ptr<dhash_block> ret_block = New refcounted<dhash_block> (data, rs->key.ctype); ret_block->ID = rs->key.ID; ret_block->expiration = block->expiration; ret_block->hops = rs->r.size (); ret_block->errors = rs->errors; ret_block->retries = block->errors; for (size_t i = 1; i < rs->times.size (); i++) { timespec diff = rs->times[i] - rs->times[i - 1]; ret_block->times.push_back (diff.tv_sec * 1000 + int (diff.tv_nsec/1000000)); } rs->complete (DHASH_OK, ret_block); rs = NULL; } else { if (rs->incoming_rpcs == 0 && rs->nextsucc > rs->succs.size ()) { info << myID << ": retrieve (" << rs->key << "): all RPCs returned but no good block.\n"; rs->complete (DHASH_NOENT, NULL); rs = NULL; } }}struct orderer { float d_; size_t i_; static int cmp (const void *a_, const void *b_) { const orderer *a = (orderer *) a_, *b = (orderer *) b_; return (int) (a->d_ - b->d_); }};static voidorder_succs (ptr<locationtable> locations, const Coord &me, const vec<chord_node> &succs, vec<chord_node> &out, u_long max){ //max is the number of successors we should order: the first max in the list u_long lim = max; // 0 means order them all if (max == 0 || max > succs.size ()) lim = succs.size (); orderer *d2me = New orderer [succs.size()]; for (size_t i = 0; i < lim; i++) { ptr<location> l = NULL; if (locations) l = locations->lookup (succs[i].x); if (l) { // Have actual measured latencies, so might as well use them. d2me[i].d_ = l->distance (); } else { Coord cursucc (succs[i]); d2me[i].d_ = Coord::distance_f (me, cursucc); } d2me[i].i_ = i; } qsort (d2me, lim, sizeof (*d2me), &orderer::cmp); out.clear (); for (size_t i = 0; i < lim; i++) {#ifdef VERBOSE_LOG char buf[10]; // argh. please shoot me. sprintf (buf, "%5.2f", d2me[i].d_); modlogger ("orderer", modlogger::TRACE) << d2me[i].i_ << " " << succs[d2me[i].i_] << " " << buf << "\n";#endif /* VERBOSE_LOG */ out.push_back (succs[d2me[i].i_]); } delete[] d2me; //copy any of the ones we didn't consider verbatim if (max == 0) return; long remaining = succs.size () - lim; if (remaining > 0) for (int i = 0; i < remaining; i++) out.push_back (succs[lim+i]);}// Pull block fragments down from a successor list// public interface, used by dhash_impl if completing a FETCHRECvoiddhashcli::assemble (blockID blockID, cb_ret cb, vec<chord_node> succs, route r){ ptr<rcv_state> rs = New refcounted<rcv_state> (blockID, cb); rs->r = r; if (dhash_tcp_transfers) rs->succopt = true; ptr<dhblock> blk = allocate_dhblock (blockID.ctype); doassemble (rs, blk, succs);}//---------------------------- insert -----------------------voiddhashcli::insert (ref<dhash_block> block, cbinsert_path_t cb, int options, ptr<chordID> guess){ ptr<location> l = NULL; if (guess) l = clntnode->locations->lookup (*guess); if (!(options & DHASHCLIENT_EXPIRATION_SUPPLIED)) block->expiration = (default_lifetime < 0) ? 0 : (timenow + default_lifetime); if (!l) lookup (block->ID, wrap (this, &dhashcli::insert_lookup_cb, block, cb, options)); else clntnode->get_succlist (l, wrap (this, &dhashcli::insert_succlist_cb, block, cb, *guess, options)); }voiddhashcli::insert_succlist_cb (ref<dhash_block> block, cbinsert_path_t cb, chordID guess, int options, vec<chord_node> succs, chordstat status){ if (status) { vec<chordID> rrr; cb (DHASH_CHORDERR, rrr); info << "insert_succlist_cb: failure (" << block->ID << "): " << status << "\n"; return; } route r; r.push_back (clntnode->locations->lookup (guess)); insert_lookup_cb (block, cb, options, DHASH_OK, succs, r);}voiddhashcli::insert_lookup_cb (ref<dhash_block> block, cbinsert_path_t cb, int options, dhash_stat status, vec<chord_node> succs, route r){ vec<chordID> mt; if (status) { (*cb) (DHASH_CHORDERR, mt); return; } //XXX run an allocator ptr<dhblock> blk = allocate_dhblock (block->ctype); if (succs.size () < blk->min_put ()) { // this is a failure condition, since we can't hope to reconstruct // the block reliably. info << "Not enough successors for insert: |succs| " << succs.size () << ", DFRAGS " << blk->min_put () << "\n"; (*cb) (DHASH_CHORDERR, mt); return; } if (succs.size () < blk->num_put ()) // benjie: this is not a failure condition, since if we don't // receive num_efrags number of store replies in insert_store_cb, // we are still allowed to proceed. info << "Number of successors less than desired: |succs| " << succs.size () << ", EFRAGS " << blk->num_put () << "\n"; vec<chord_node> unique_succs; filter_succs (succs, unique_succs); succs = unique_succs; while (succs.size () > blk->num_put ()) succs.pop_back (); ref<sto_state> ss = New refcounted<sto_state> (block, cb); ss->succs = succs; ss->r = r; ss->blk = blk; // Track number of times insert_store_cb is to be called. ss->out = succs.size (); for (u_int i = 0; i < succs.size(); i++) { ptr<location> dest = clntnode->locations->lookup_or_create (succs[i]); str frag = blk->generate_fragment (block, i); dhash_store::execute (clntnode, dest, blockID(block->ID, block->ctype), frag, block->expiration, wrap (this, &dhashcli::insert_store_cb, ss, i, getusec ()), get_store_status (block->ctype)); // XXX reactivate retry later } return; }voiddhashcli::insert_store_cb (ref<sto_state> ss, u_int i, u_int64_t t, dhash_stat err, chordID id, bool present){ u_int nstores = ss->blk->num_put (); u_int min_needed = ss->blk->min_put (); ss->out -= 1; info << "store " << ss->block->ID << " (frag " << i + 1 << "/" << nstores << ") -> " << id << " in " << (getusec () - t)/1000 << "ms: " << err << "\n"; if (!err) ss->good += 1; if (err == DHASH_DISKFULL) ss->diskfull = true; // Count down until all outstanding RPCs have returned if (ss->out == 0) { vec<chordID> r_ret; chordID myID = clntnode->my_ID (); if (ss->good < nstores) { warning << myID << ": store (" << ss->block->ID << "): only stored " << ss->good << " of " << nstores << " encoded.\n"; if (ss->good < min_needed) { warning << myID << ": store (" << ss->block->ID << "): failed;" " insufficient frags/blocks stored.\n"; r_ret.push_back (ss->succs[0].x); (*ss->cb) (ss->diskfull ? DHASH_DISKFULL : DHASH_ERR, r_ret); // We should do something here to try and store this fragment // somewhere else. return; } } for (unsigned int i = 0; i < ss->r.size (); i++) r_ret.push_back (ss->r[i]->id ()); (*ss->cb) (DHASH_OK, r_ret); }}//------------------ helper chord wrappers -------------voiddhashcli::lookup (chordID blockID, dhashcli_lookupcb_t cb){ clntnode->find_successor (blockID, wrap (this, &dhashcli::lookup_findsucc_cb, blockID, cb));}voiddhashcli::lookup_findsucc_cb (chordID blockID, dhashcli_lookupcb_t cb, vec<chord_node> s, route path, chordstat err){ if (err) (*cb) (DHASH_CHORDERR, s, path); else (*cb) (DHASH_OK, s, path);}//------------------ sendblock ------------------------voiddhashcli::sendblock (ptr<location> dst, blockID bid, str data, u_int32_t expiration, sendblockcb_t cb, int nonce /* = 0 */){ dhash_store::execute (clntnode, dst, bid, data, expiration, wrap (this, &dhashcli::sendblock_cb, cb, data.len ()), DHASH_FRAGMENT, nonce); //XXX choose DHASH_STORE if appropriate (i.e. full replica)}voiddhashcli::sendblock (ptr<location> dst, blockID bid_to_send, ptr<dhblock_srv> srv, sendblockcb_t cb, int nonce /* = 0 */){ trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send << ") to dest: " << dst << "\n"; srv->fetch (bid_to_send.ID, wrap (this, &dhashcli::sendblock_fetch_cb, dst, bid_to_send, cb, nonce)); trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send << ") adbd request sent\n";}voiddhashcli::sendblock_fetch_cb (ptr<location> dst, blockID bid_to_send, sendblockcb_t cb, int nonce, adb_status stat, adb_fetchdata_t obj){ trace << clntnode->my_ID () << ": dhashcli::sendblock_fetch_cb (" << bid_to_send << ") has been fetched " << dst << ", stat " << stat << "\n"; if (stat != ADB_OK) { cb (DHASH_NOENT, false, 0); return; } dhash_store::execute (clntnode, dst, bid_to_send, obj.data, obj.expiration, wrap (this, &dhashcli::sendblock_cb, cb, obj.data.len ()), get_store_status(bid_to_send.ctype), //XXX store_status broken nonce); }voiddhashcli::sendblock_cb (sendblockcb_t cb, u_int32_t sz, dhash_stat err, chordID dest, bool present){ (*cb) (err, present, sz);}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?