📄 chord.c
字号:
/* * Copyright (c) 2003-2005 Jinyang Li * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include "observers/chordobserver.h"#include "accordion.h"#include <iostream>#include <stdio.h>#include <assert.h>#include <math.h>using namespace std;extern bool vis;bool static_sim;unsigned int joins = 0;vector<uint> Chord::rtable_sz;#ifdef RECORD_FETCH_LATENCYdouble _allfetchlat = 0.0;double _allfetchsz = 0.0;unsigned int _allfetchnum = 0;#endifChord::Chord(IPAddress i, Args& a, LocTable *l, const char *name) : P2Protocol(i), _isstable (false){ if(a.find("static_sim") != a.end()) static_sim = true; else static_sim = false; //stabilization timer _stab_basic_timer = a.nget<uint>("basictimer", 10000, 10); _stab_succlist_timer = a.nget<uint>("succlisttimer",_stab_basic_timer,10); //location table timeout values //_timeout = a.nget<uint>("timeout", 5*_stab_succlist_timer, 10); //successors _nsucc = a.nget<uint>("successors", 16, 10); //fragments _frag = a.nget<uint>("m",1,10); //how many successors are fragments on? _allfrag = a.nget<uint>("allfrag",1,10); assert(_allfrag <= _nsucc); //recursive routing? _recurs = a.nget<uint>("recurs",1,10); _recurs_direct = a.nget<uint>("recurs_direct",1,10); _stopearly_overshoot = a.nget<uint>("stopearlyovershoot",0,10); //parallel lookup? parallelism only works in iterative lookup now _parallel = a.nget<uint>("parallelism",1,10); _alpha = a.nget<uint>("alpha",1,10); //lookup using ipkey? _ipkey = a.nget<uint>("ipkey",0,10); _asap = a.nget<uint>("asap",_frag,10); _max_lookup_time = a.nget<uint>("maxlookuptime",4000,10); _to_multiplier = a.nget<uint>("timeout_multiplier", 3, 10); _random_id = a.nget<uint>("randid",0,10); _wkn.ip = 0; _learn = a.nget<uint>("learn",0,10); learntable = NULL; assert(_frag <= _nsucc); me.ip = ip(); assert(me.ip>0); if (_random_id) me.id = ConsistentHash::getRandID(); else { if (name) me.id = ConsistentHash::ipname2chid(name); else me.id = ConsistentHash::ip2chid(me.ip); } if (l) loctable = l; else loctable = New LocTable(); loctable->set_timeout(0); loctable->set_evict(false); loctable->init(me); if (vis) { printf ("vis %llu node %16qx\n", now (), me.id); } _stab_basic_running = false; _stab_basic_outstanding = 0; _join_scheduled = 0; _last_succlist_stabilized = 0;}voidChord::record_stat(IPAddress src, IPAddress dst, uint type, uint num_ids, uint num_else){ Node::record_bw_stat(type,num_ids,num_else); Node::record_inout_bw_stat(src,dst,num_ids,num_else);}Chord::~Chord(){ if (me.ip == 1) { //same hack as tapestry.C so statistics only gets printed once Node::print_stats(); printf("<-----STATS----->\n"); sort(rtable_sz.begin(),rtable_sz.end()); uint totalrtable = 0; uint rsz = rtable_sz.size(); for (uint i = 0; i < rsz; i++) totalrtable += rtable_sz[i]; printf("RTABLE:: 10p:%u 50p:%u 90p:%u avg:%.2f\n", rtable_sz[(uint)0.1*rsz], rtable_sz[(uint)0.5*rsz], rtable_sz[(uint)0.9*rsz], (double)totalrtable/(double)rsz); printf("<-----ENDSTATS----->\n");#ifdef RECORD_FETCH_LATENCY printf("fetch lat: %.3f %.3f %u\n", _allfetchlat/_allfetchnum, _allfetchsz/_allfetchnum, _allfetchnum); #endif printf("total joins seen %u\n",joins); } delete loctable; }char *Chord::ts(){ static char buf[50]; sprintf(buf, "%llu %s(%u,%qx)", now(), proto_name().c_str(), me.ip, me.id); return buf;}stringChord::header(){ char buf[128]; sprintf(buf, "%llu %s(%u,%qx,%u) ", now(),proto_name().c_str(),me.ip,me.id,first_ip()); return string(buf);}stringChord::printID(CHID id){ char buf[128]; sprintf(buf,"%qx ",id); return string(buf);}voidChord::add_edge(int *matrix, int sz){ assert(ip() <= (uint)sz); vector<IDMap> v = loctable->get_all(); for (uint i = 0; i < v.size(); i++) { if (v[i].ip != me.ip && Network::Instance()->getnode(v[i].ip)->alive()) { assert(v[i].ip <= (uint)sz); matrix[(me.ip-1)*sz + v[i].ip -1] = 1; } }}voidChord::check_static_init(){ if ((!static_sim) || (!alive())) return; if (!_inited) { _inited = true; this->initstate(); }}Chord::IDMapChord::next_hop(ConsistentHash::CHID k) { return loctable->next_hop(k-1);}bool Chord::check_correctness(CHID k, vector<IDMap> v){ vector<IDMap> ids = ChordObserver::Instance(NULL)->get_sorted_nodes(); IDMap tmp; tmp.id = k; uint idsz = ids.size(); uint pos = upper_bound(ids.begin(), ids.end(), tmp, Chord::IDMap::cmp) - ids.begin(); uint iter = 0; while (iter<idsz) { if (pos >= idsz) pos = 0; Chord *node = (Chord *)Network::Instance()->getnode(ids[pos].ip); if (Network::Instance()->alive(ids[pos].ip) && node->inited()) break; pos++; iter++; } for (uint i = 0; i < v.size(); i++) { if (ids[pos].ip == v[i].ip) { pos = (pos+1) % idsz; } else if (ConsistentHash::betweenrightincl(k,ids[pos].id,v[i].id)) { CDEBUG(2) << "lookup incorrect(?) key" << printID(k) << " succ should be " << ids[pos].ip << ","<< printID(ids[pos].id) << "instead of " << v[i].ip << "," << printID(v[i].id) << endl; return false; } else { CDEBUG(2) << "lookup incorrect(?) key" << printID(k) << " succ should be " << ids[pos].ip << ","<< printID(ids[pos].id) << "instead of " << v[i].ip << "," << printID(v[i].id) << endl; return false; } } return true;}//XXX Currently, it does not handle losses//Recursive lookup canot deal with duplicate packets//also, add_node() does not reset status flag yettemplate<class BT, class AT, class RT>bool Chord::failure_detect(IDMap dst, void (BT::* fn)(AT *, RT *), AT *args, RT *ret, uint type, uint num_args_id, uint num_args_else, int num_retry){ bool r; Time retry_to = TIMEOUT(me.ip,dst.ip); int checks; //checks = loctable->add_check(dst,0); checks = 2; //if (checks == -1) return false; //int tmp; while (checks < num_retry) { record_stat(me.ip, dst.ip, type,num_args_id,num_args_else); r = doRPC(dst.ip, fn, args, ret, retry_to); if (!alive()) return false; if (r) { return true; } checks++; retry_to = retry_to * 2; } return false;}voidChord::learn_info(IDMap n){ //do nothing;}bool Chord::replace_node(IDMap n, IDMap &replacement){ return false;}voidChord::lookup(Args *args) { check_static_init(); lookup_args *a = New lookup_args; if (!_ipkey) { a->key = args->nget<CHID>("key"); a->ipkey = 0; } else { a->ipkey = args->nget<IPAddress>("key"); if (!Network::Instance()->alive(a->ipkey)) { delete a; return; } a->key = dynamic_cast<Chord *>(Network::Instance()->getnode(a->ipkey))->id() + 1; } CDEBUG(1) << "start looking up key " << printID(a->key) << "ipkey " << a->ipkey << endl; assert(a->key); a->start = now(); a->latency = 0; a->num_to = 0; a->total_to = 0; a->retrytimes = 0; a->hops = 0; lookup_internal(a); //lookup internal deletes a}voidChord::lookup_internal(lookup_args *a){ vector<IDMap> v; IDMap lasthop; lasthop.ip = 0; a->retrytimes++; if (_recurs) { v = find_successors_recurs(a->key, _frag, TYPE_USER_LOOKUP, &lasthop,a); } else { v = find_successors(a->key, _frag, TYPE_USER_LOOKUP, &lasthop,a); } if (!alive()) { delete a; return; } if (_learn) { if (lasthop.ip) learn_info(lasthop); for (uint i = 0; i < v.size(); i++) learn_info(v[i]); } if (a->latency >= _max_lookup_time) { record_lookup_stat(me.ip, lasthop.ip, a->latency, false, false, a->hops, a->num_to, a->total_to); }else if ((!_ipkey && v.size() > 0) || (_ipkey && lasthop.ip == a->ipkey)) { record_lookup_stat(me.ip, lasthop.ip, a->latency, true, true, a->hops, a->num_to, a->total_to);#ifdef RECORD_FETCH_LATENCY assert(static_sim); assert(v.size() >= _frag); vector<uint> tmplat; tmplat.clear(); Topology *t = Network::Instance()->gettopology(); for (uint i = 0; i < v.size(); i++) { if (tmplat.size() < _allfrag) tmplat.push_back(2*t->latency(me.ip, v[i].ip)); } sort(tmplat.begin(),tmplat.end()); _allfetchlat += (double)tmplat[6]; //XXX this is a hack _allfetchsz += (double)tmplat.size(); _allfetchnum++;#endif CDEBUG(1) << "lookup correct key " << printID(a->key) << "interval " << a->latency << endl; }else{ if (_ipkey && a->retrytimes<=1 && (!Network::Instance()->alive(a->ipkey))) { }else if (_ipkey && (a->retrytimes >2 || Network::Instance()->alive(a->ipkey))) { record_lookup_stat(me.ip, lasthop.ip, a->latency, false, false, a->hops, a->num_to, a->total_to); }else{ CDEBUG(1) << "lookup incorrect key " << printID(a->key) << "lastnode " << lasthop.ip << "," << printID(lasthop.id) << "latency " << a->latency << " start " << a->start << endl; a->latency += 100; delaycb(100, &Chord::lookup_internal, a); return; } } delete a;}voidChord::find_successors_handler(find_successors_args *args, find_successors_ret *ret){ check_static_init(); if (_recurs) ret->v = find_successors_recurs(args->key, args->m, TYPE_JOIN_LOOKUP, &(ret->last)); else ret->v = find_successors(args->key, args->m, TYPE_JOIN_LOOKUP, &(ret->last)); if (ret->v.size() > 0) CDEBUG(3) <<" find_successors_handler key "<< printID(args->key) <<"succ " << ret->v[0].ip << "," << printID(ret->v[0].id) << endl; ret->dst = me;}vector<Chord::IDMap>Chord::find_successors(CHID key, uint m, uint type, IDMap *lasthop, lookup_args *a){ //parallelism controls how many queries are inflight Time before; next_args na; hash_map<IPAddress, bool> asked; hop_info lastfinished; list<hop_info> tasks; list<hop_info> savefinished; bool ok; unsigned rpc, donerpc; unsigned totalrpc = 0; nextretinfo *reuse = NULL; nextretinfo *p; RPCSet rpcset; RPCSet alertset; hash_map<unsigned, unsigned> resultmap; //result to rpc number mapping hash_map<unsigned, alert_args*> alertmap; hop_info h; vector<IDMap> results; vector<IDMap> to_be_replaced; to_be_replaced.clear(); uint outstanding, parallel, alertoutstanding; uint rpc_i; h.from = me; h.to = me; h.hop = 1; na.src = me; na.type = type; na.key = key; na.m = m; if (type == TYPE_USER_LOOKUP) { parallel = _parallel; na.alpha = _alpha; } else { na.alpha = 1; parallel = 1; } vector<nextretinfo*> rpcslots; for(uint i = 0; i < parallel; i++) { reuse = New nextretinfo; reuse->free = true; rpcslots.push_back(reuse); } //init my timeout calculations vector<uint> num_timeouts; vector<uint> time_timeouts; vector<uint> num_hops; for (uint i = 0; i < parallel; i++) { num_timeouts.push_back(0); time_timeouts.push_back(0); num_hops.push_back(0); } na.deadnodes.clear(); na.retry = false; savefinished.clear(); lastfinished.from.ip = 0; lastfinished.to.ip = me.ip; lastfinished.to.id = me.id; lastfinished.hop = 0; //put myself in the task queue tasks.push_back(h); asked[me.ip] = true; results.clear(); outstanding = alertoutstanding = 0; /* vector<hop_info> recorded; recorded.clear(); */ CDEBUG(2) << "start lookup key " << printID(key) << "type " << type << endl; while (1) { assert(totalrpc < 100 && (na.deadnodes.size()<20)); if ((tasks.size() == 0) && (outstanding == 0)) tasks.push_back(lastfinished); while ((outstanding < parallel) && (tasks.size() > 0)) { h = tasks.front(); if (ConsistentHash::betweenrightincl(h.to.id, key, lastfinished.to.id) && (outstanding > 0)) break; else if ((h.to.ip == lastfinished.to.ip) && (totalrpc > 0)) { assert(outstanding == 0); na.retry = true; tasks.pop_front(); }else if (ConsistentHash::betweenrightincl(lastfinished.to.id, key, h.to.id) || (totalrpc == 0)) { na.retry = false; tasks.pop_front(); }else{ CDEBUG(2) << "key " << printID(key) << "timeout resume to node " << lastfinished.to.ip << "," << printID(lastfinished.to.id) << endl; na.retry = true; h = lastfinished; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -