📄 client.c
字号:
rs->complete (DHASH_OK, ret_block); rs = NULL; } else { if (rs->incoming_rpcs == 0 && rs->nextsucc > rs->succs.size ()) { info << myID << ": retrieve (" << rs->key << "): all RPCs returned but no good block.\n"; rs->complete (DHASH_NOENT, NULL); rs = NULL; } }}struct orderer { float d_; size_t i_; static int cmp (const void *a_, const void *b_) { const orderer *a = (orderer *) a_, *b = (orderer *) b_; return (int) (a->d_ - b->d_); }};static voidorder_succs (ptr<locationtable> locations, const Coord &me, const vec<chord_node> &succs, vec<chord_node> &out, u_long max){ //max is the number of successors we should order: the first max in the list u_long lim = max; // 0 means order them all if (max == 0 || max > succs.size ()) lim = succs.size (); orderer *d2me = New orderer [succs.size()]; for (size_t i = 0; i < lim; i++) { ptr<location> l = NULL; if (locations) l = locations->lookup (succs[i].x); if (l) { // Have actual measured latencies, so might as well use them. d2me[i].d_ = l->distance (); } else { Coord cursucc (succs[i]); d2me[i].d_ = Coord::distance_f (me, cursucc); } d2me[i].i_ = i; } qsort (d2me, lim, sizeof (*d2me), &orderer::cmp); out.clear (); for (size_t i = 0; i < lim; i++) {#ifdef VERBOSE_LOG char buf[10]; // argh. please shoot me. sprintf (buf, "%5.2f", d2me[i].d_); modlogger ("orderer", modlogger::TRACE) << d2me[i].i_ << " " << succs[d2me[i].i_] << " " << buf << "\n";#endif /* VERBOSE_LOG */ out.push_back (succs[d2me[i].i_]); } delete[] d2me; //copy any of the ones we didn't consider verbatim if (max == 0) return; long remaining = succs.size () - lim; if (remaining > 0) for (int i = 0; i < remaining; i++) out.push_back (succs[lim+i]);}// Pull block fragments down from a successor list// public interface, used by dhash_impl if completing a FETCHRECvoiddhashcli::assemble (blockID blockID, cb_ret cb, vec<chord_node> succs, route r){ ptr<rcv_state> rs = New refcounted<rcv_state> (blockID, cb); rs->r = r; if (dhash_tcp_transfers) rs->succopt = true; ptr<dhblock> blk = allocate_dhblock (blockID.ctype); doassemble (rs, blk, succs);}//---------------------------- insert -----------------------voiddhashcli::insert (ref<dhash_block> block, cbinsert_path_t cb, int options, ptr<chordID> guess){ ptr<location> l = NULL; if (guess) l = clntnode->locations->lookup (*guess); if (!l) lookup (block->ID, wrap (this, &dhashcli::insert_lookup_cb, block, cb, options)); else clntnode->get_succlist (l, wrap (this, &dhashcli::insert_succlist_cb, block, cb, *guess, options)); }voiddhashcli::insert_succlist_cb (ref<dhash_block> block, cbinsert_path_t cb, chordID guess, int options, vec<chord_node> succs, chordstat status){ if (status) { vec<chordID> rrr; cb (DHASH_CHORDERR, rrr); info << "insert_succlist_cb: failure (" << block->ID << "): " << status << "\n"; return; } route r; r.push_back (clntnode->locations->lookup (guess)); insert_lookup_cb (block, cb, options, DHASH_OK, succs, r);}u_int64_t start_insert, end_insert, total_insert = 0;voiddhashcli::insert_lookup_cb (ref<dhash_block> block, cbinsert_path_t cb, int options, dhash_stat status, vec<chord_node> succs, route r){ vec<chordID> mt; if (status) { (*cb) (DHASH_CHORDERR, mt); return; } //XXX run an allocator ptr<dhblock> blk = allocate_dhblock (block->ctype); if (succs.size () < blk->min_put ()) { // this is a failure condition, since we can't hope to reconstruct // the block reliably. info << "Not enough successors for insert: |succs| " << succs.size () << ", DFRAGS " << blk->min_put () << "\n"; (*cb) (DHASH_CHORDERR, mt); return; } if (succs.size () < blk->num_put ()) // benjie: this is not a failure condition, since if we don't // receive num_efrags number of store replies in insert_store_cb, // we are still allowed to proceed. info << "Number of successors less than desired: |succs| " << succs.size () << ", EFRAGS " << blk->num_put () << "\n"; while (succs.size () > blk->num_put ()) succs.pop_back (); ref<sto_state> ss = New refcounted<sto_state> (block, cb); ss->succs = succs; timeval tp; gettimeofday (&tp, NULL); start_insert = tp.tv_sec * (u_int64_t) 1000000 + tp.tv_usec; for (u_int i = 0; i < succs.size(); i++) { // Count up for each RPC that will be dispatched ss->out += 1; ptr<location> dest = clntnode->locations->lookup_or_create (succs[i]); str frag = blk->generate_fragment (block, i); dhash_store::execute (clntnode, dest, blockID(block->ID, block->ctype), frag, wrap (this, &dhashcli::insert_store_cb, ss, r, i, blk->num_put (), blk->min_put ()), get_store_status(block->ctype)); // XXX reactivate retry later } return; }voiddhashcli::insert_store_cb (ref<sto_state> ss, route r, u_int i, u_int nstores, u_int min_needed, dhash_stat err, chordID id, bool present){ ss->out -= 1; if (err) { info << "fragment/block store failed: " << ss->block->ID << " fragment " << i << "/" << nstores << ": " << err << "\n"; } else { info << "fragment/block store ok: " << ss->block->ID << " fragment " << i << "/" << nstores << " at " << id << "\n"; ss->good += 1; } // Count down until all outstanding RPCs have returned vec<chordID> r_ret; if (ss->out == 0) { chordID myID = clntnode->my_ID (); if (ss->good < nstores) { warning << myID << ": store (" << ss->block->ID << "): only stored " << ss->good << " of " << nstores << " encoded.\n"; if (ss->good < min_needed) { warning << myID << ": store (" << ss->block->ID << "): failed;" " insufficient frags/blocks stored.\n"; r_ret.push_back (ss->succs[0].x); (*ss->cb) (DHASH_ERR, r_ret); // We should do something here to try and store this fragment // somewhere else. return; } } for (unsigned int i = 0; i < r.size (); i++) r_ret.push_back (r[i]->id ()); (*ss->cb) (DHASH_OK, r_ret); }}//------------------ helper chord wrappers -------------voiddhashcli::lookup (chordID blockID, dhashcli_lookupcb_t cb){ clntnode->find_successor (blockID, wrap (this, &dhashcli::lookup_findsucc_cb, blockID, cb));}voiddhashcli::lookup_findsucc_cb (chordID blockID, dhashcli_lookupcb_t cb, vec<chord_node> s, route path, chordstat err){ if (err) (*cb) (DHASH_CHORDERR, s, path); else (*cb) (DHASH_OK, s, path);}//------------------ sendblock ------------------------voiddhashcli::sendblock (ptr<location> dst, blockID bid, str data, sendblockcb_t cb, int nonce /* = 0 */){ dhash_store::execute (clntnode, dst, bid, data, wrap (this, &dhashcli::sendblock_cb, cb), DHASH_FRAGMENT, nonce); //XXX choose DHASH_STORE if appropriate (i.e. full replica)}voiddhashcli::sendblock (ptr<location> dst, blockID bid_to_send, ptr<dhblock_srv> srv, sendblockcb_t cb, int nonce /* = 0 */){ trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send << ") to dest: " << dst << "\n"; srv->fetch (bid_to_send.ID, wrap (this, &dhashcli::sendblock_fetch_cb, dst, bid_to_send, cb, nonce)); trace << clntnode->my_ID () << ": dhashcli::sendblock (" << bid_to_send << ") adbd request sent\n";}voiddhashcli::sendblock_fetch_cb (ptr<location> dst, blockID bid_to_send, sendblockcb_t cb, int nonce, adb_status stat, chordID key, str data){ trace << clntnode->my_ID () << ": dhashcli::sendblock_fetch_cb (" << bid_to_send << ") has been fetched " << dst << ", stat " << stat << "\n"; if (stat != ADB_OK) { cb (DHASH_NOENT, false); return; } dhash_store::execute (clntnode, dst, bid_to_send, data, wrap (this, &dhashcli::sendblock_cb, cb), get_store_status(bid_to_send.ctype), //XXX store_status broken nonce); }voiddhashcli::sendblock_cb (callback<void, dhash_stat, bool>::ref cb, dhash_stat err, chordID dest, bool present){ (*cb) (err, present);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -