📄 accordion.c
字号:
/* * Copyright (c) 2003-2005 Jinyang Li * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include "accordion.h"#include "ratecontrolqueue.h"#include <math.h>#include <stdio.h>#include <assert.h>#include <iostream>vector<uint> Accordion::rtable_sz;#define EST_TIMEOUT_SZ 100vector<IDMap> Accordion::ids;bool Accordion::sorted;vector<double> Accordion::sort_live;vector<double> Accordion::sort_dead;Accordion::Accordion(IPAddress i, Args& a) : P2Protocol(i){ _stab_basic_timer = a.nget<uint>("basictimer", 144000, 10); _fixed_lookup_to = (double)(a.nget<uint>("fixed_lookup_to",90,10))/100.0; _fixed_stab_to = (double)(a.nget<uint>("fixed_stab_to",100,10))/100.0; _fixed_stab_int = a.nget<uint>("fixed_stabtimer",0,10); if (_fixed_stab_int) _parallelism = a.nget<uint>("fixed_para",0,10); else _parallelism = 1; _recurs = (bool) a.nget<uint>("recurs",1,10); _bw_overhead = a.nget<uint>("overhead_rate", 10, 10); _burst_sz = a.nget<uint>("burst_size",0,10); uint _burst; if (!_burst_sz) { _burst = a.nget<uint>("burst",10,10); _burst_sz = _burst * _bw_overhead; }else _burst = _burst_sz/_bw_overhead; uint _min_bw_overhead = a.nget<uint>("min_rate",0,10); uint _max_bw_overhead = a.nget<uint>("max_rate",0,10); uint numnodes = Network::Instance()->gettopology()->num(); if (_min_bw_overhead && _max_bw_overhead) { _bw_overhead = _min_bw_overhead + (uint)((this->first_ip()-1)*((double)(_max_bw_overhead-_min_bw_overhead+1)/(double)numnodes)); uint mid = (_min_bw_overhead+_max_bw_overhead)/2; if (_bw_overhead == _min_bw_overhead) _special = 1; else if (_bw_overhead == mid) _special = 2; else if (_bw_overhead == _max_bw_overhead) _special = 3; else _special = 0; _burst_sz = _burst * _bw_overhead; _adjust_interval = _burst * 500; }else { _adjust_interval = 500*_burst_sz/_bw_overhead; _special = 0; } _me.ip = ip(); _me.id = ConsistentHash::ip2chid(_me.ip); _me.timestamp = 0; _me.alivetime = 0; _id = _me.id; _rate_queue = New RateControlQueue(this, (double)_bw_overhead, _burst_sz, _fixed_stab_int, Accordion::empty_cb); _next_adjust = _adjust_interval; _last_calculated = 0; _stat.clear(); for (uint i = 0;i < 10; i++) _calculated_prob.push_back(1-0.1*i); _lookup_times = 0; _empty_times = 0; _nsucc = a.nget<uint>("successors",16,10); _to_multiplier = a.nget<uint>("timeout_multiplier", 3, 10); _learn_num = a.nget<uint>("learn_num",5,10); _max_p = _burst_sz/(40 + 8 * _learn_num); if (_max_p > 6) _max_p = 6; else if (_max_p < 1) _max_p = 1; _stab_basic_running = false; _join_scheduled = 0; _last_stab = 0; loctable = New LocTable(); loctable->init(_me); _wkn.ip = 0; ids.push_back(_me); _top = Network::Instance()->gettopology(); _max_succ_gap = 0; _est_n = 100; if (_fixed_stab_int) _tt = _fixed_lookup_to; else _tt = 0.9;}Accordion::~Accordion(){ if (alive()) { vector<IDMap>::iterator p = find(ids.begin(),ids.end(),_me); ids.erase(p); for (HashMap<ConsistentHash::CHID, Time>::iterator i = _outstanding_lookups.begin(); i != _outstanding_lookups.end(); ++i) { ADEBUG(2) << "done lookup key " << printID(i.key()) << "timeout failed started " << i.value() << endl; record_lookup_stat(_me.ip, _me.ip, now()-i.value(), false, false, 0, 0, 0); } if (ids.size() == 0) { Node::print_stats(); printf("<-----STATS----->\n"); sort(rtable_sz.begin(),rtable_sz.end()); uint totalrtable = 0; uint rsz = rtable_sz.size(); for (uint i = 0; i < rsz; i++) totalrtable += rtable_sz[i]; printf("RTABLE:: 10p:%u 50p:%u 90p:%u avg:%.2f\n", rtable_sz[(uint)(0.1*rsz)], rtable_sz[(uint)(0.5*rsz)], rtable_sz[(uint)(0.9*rsz)], (double)totalrtable/(double)rsz); printf("<-----ENDSTATS----->\n"); } } delete loctable; delete _rate_queue;}unsignedAccordion::PKT_SZ(unsigned ids, unsigned others){ return PKT_OVERHEAD + 4 * ids + others;}/* -------------- initstate ---------------- */voidAccordion::initstate(){ if (!sorted) sort(ids.begin(),ids.end(), IDMap::cmp); uint sz = ids.size(); //add successors uint my_pos = find(ids.begin(), ids.end(), _me) - ids.begin(); IDMap n; for (uint i = 1; i <= _nsucc; i++) { n = ids[(my_pos+i) % sz]; n.timestamp = now(); loctable->add_node(n,true); } _est_n = ((ConsistentHash::CHID)-1)/ConsistentHash::distance(ids[my_pos%sz].id,ids[(my_pos+_nsucc)%sz].id); _est_n = _nsucc * _est_n; //add predecessor loctable->add_node(ids[(my_pos-1) % sz]); //add random nodes for (uint i = 0; i < 20; i++) { uint r = random() % sz; if (ids[r].id != _me.id) { n = ids[r]; n.timestamp = now(); loctable->add_node(n); } } IDMap succ = loctable->succ(_me.id+1); ADEBUG(3) << "inited succ " << succ.ip << "," << printID(succ.id) << " locsz " << loctable->size() << " succsz " << loctable->succ_size() << " maxp " << _max_p << endl;}/* -------------- join --------------------- */voidAccordion::join(Args *args){ if (!alive()) return; _last_bytes = 0; _last_bytes_time = now(); while ((!_wkn.ip) || (!Network::Instance()->alive(_wkn.ip))) { _wkn.ip = Network::Instance()->getnode(ids[random()%ids.size()].ip)->ip(); _wkn.id = Network::Instance()->getnode(_wkn.ip)->id(); _wkn.alivetime = 0; } if (args) { _tt = 0.9; _last_joined_time = now(); _wkn.timestamp = now(); _me.ip = ip(); _me.id = ConsistentHash::ip2chid(_me.ip); _me.timestamp = now(); _id = _me.id; loctable->init(_me); loctable->add_node(_wkn); vector<IDMap>::iterator p = upper_bound(ids.begin(),ids.end(),_me, IDMap::cmp); if (p->id!=_me.id) ids.insert(p,1,_me); if (!_fixed_stab_int) _parallelism = 1; ADEBUG(1) << "start to join " << printID(_me.id-1) << " locsz " << loctable->size() << " livesz " << loctable->live_size() << " succsz " << loctable->succ_size() << " wkn " << _wkn.ip << endl; }else{ ADEBUG(1) << "repeated join " << printID(_me.id-1) << " wkn " << _wkn.ip << " locsz " << loctable->size() << endl; } IDMap succ = loctable->succ(_me.id+1); if ((args && args->nget<uint>("first",0,10)==1) || (succ.ip && succ.ip!=_me.ip)) { //start basic successor stabilization _last_joined_time = 0; _join_scheduled = 0; if (!_stab_basic_running) { _stab_basic_running = true; delaycb(0,&Accordion::stab_succ,(void *)0); } else { delaycb(0,&Accordion::fix_succ, (void *)0); } return; } _join_scheduled = now(); lookup_args *la = New lookup_args; lookup_ret *lr = New lookup_ret; bzero(la,sizeof(lookup_args)); la->key = _me.id - 1; la->m = _nsucc; la->ori = _me; la->ori.alivetime = now()-_last_joined_time; la->ori.timestamp = 1; la->no_drop = true; la->parallelism = 1; la->type = TYPE_JOIN_LOOKUP; la->learnsz = _learn_num; la->overshoot = 0; la->nexthop = _wkn; _rate_queue->do_rpc(_wkn.ip, &Accordion::find_successors_handler, &Accordion::null_cb, la, lr, (uint)0, la->type, PKT_SZ(1,1), PKT_SZ(2*la->m,1),TIMEOUT(_me.ip, _wkn.ip));}intAccordion::null_cb(bool b, lookup_args *a, lookup_ret *r){ if (!a->nexthop.ip) abort(); a->nexthop.timestamp = now(); loctable->update_ifexists(a->nexthop); if (a) delete a; if (r) delete r; if (b) return PKT_OVERHEAD; else return 0;}voidAccordion::join_handler(lookup_args *la, lookup_ret *lr){ la->src.timestamp = now(); loctable->update_ifexists(la->src); la->from.timestamp = now(); if (lr->v.size() > 0) loctable->add_node(la->from); for (uint i = 0; i < lr->v.size(); i++) { if (lr->v[i].ip != _me.ip) { loctable->add_node(lr->v[i],true); } /* if (i!=0) { ConsistentHash::CHID gap = lr->v[i].id - lr->v[i-1].id; if (_max_succ_gap == 0 || gap > _max_succ_gap) _max_succ_gap = gap; } */ } IDMap succ = loctable->succ(_me.id+1); if (!succ.ip) { ADEBUG(1) << "join_handler join failed sz " << lr->v.size() << " locsz " << loctable->size() << endl; delaycb(5000, &Accordion::join, (Args *)0); } else { _join_scheduled = 0; //start basic successor stabilization if (!_stab_basic_running) { _stab_basic_running = true; delaycb(0,&Accordion::stab_succ,(void *)0); } else { delaycb(0,&Accordion::fix_pred,(void *)0); } join_learn(); IDMap succ = loctable->succ(_me.id+1); IDMap pred = loctable->pred(_me.id-1); vector<IDMap> scs = loctable->succs(_me.id+1,100); ADEBUG(1) << "joined succ " << succ.ip << "," << printID(succ.id) << "locsz " << loctable->size() << " livesz " << loctable->live_size() << " succs: " << print_succs(scs) << " scs: " << print_succs(lr->v) << " pred " << la->from.ip << "," << printID(la->from.id) << " mypred " << pred.ip << endl; }}voidAccordion::join_learn(){ vector<IDMap> scs = loctable->succs(_me.id+1,_nsucc); if (scs.size() < (_nsucc/2)) return; Time min = 1000000; IDMap min_n; min_n.ip = 0; for (uint i = 0; i < scs.size(); i++) { if (_top->latency(_me.ip, scs[i].ip) < min) { min = _top->latency(_me.ip, scs[i].ip); min_n = scs[i]; } } learn_args *la = New learn_args; learn_ret *lr = New learn_ret; la->m = 3 * _learn_num; //means i want to learn from all la->n = min_n; la->src = _me; la->src.alivetime = now()-_last_joined_time; la->end = _me; ADEBUG(2) << "join_learn from " << la->n.ip << "," << printID(la->n.id) << endl; _rate_queue->do_rpc(min_n.ip, &Accordion::learn_handler, &Accordion::learn_cb, la, lr, 3, TYPE_FINGER_UP, PKT_SZ(0,1), PKT_SZ(2*la->m,0),TIMEOUT(_me.ip,min_n.ip));}voidAccordion::find_successors_handler(lookup_args *la, lookup_ret *lr){ if (la->nexthop.ip == _me.ip) la->nexthop.alivetime = now()-_last_joined_time; else abort(); IDMap succ = loctable->succ(_me.id+1); if (!succ.ip) { /* lookup_args *lla = New lookup_args; lla->src = _me; lla->src.alivetime = now()-_last_joined_time; lla->from = _me; lla->from.alivetime = now()-_last_joined_time; bcopy(la,lla,sizeof(lookup_args)); lookup_ret *llr = New lookup_ret; llr->v.clear(); lla->nexthop.ip = 0; _rate_queue->do_rpc(la->ori.ip, &Accordion::join_handler, &Accordion::null_cb, lla, llr, 1, TYPE_JOIN_LOOKUP, PKT_SZ(2*llr->v.size(),0),PKT_SZ(0,0),TIMEOUT(_me.ip, la->ori.ip)); */ ADEBUG(2) << "find_successors_handler failed for " << la->ori.ip << "," << printID(la->ori.id) << " not joined" << endl; }else{ lookup_args lla; bcopy(la,&lla,sizeof(lookup_args)); lla.src = _me; lla.src.alivetime = now()-_last_joined_time; lla.src.timestamp = now(); lla.from = _me; lla.from.alivetime = lla.src.alivetime; lla.no_drop = true; ADEBUG(3)<<"find_successors_handler key " << printID(lla.key) << " from " << lla.ori.ip << "," << printID(lla.ori.id) << endl; next_recurs(&lla,NULL); } ADEBUG(5) << " find_successors_handler reply " << PKT_SZ(0,1) << " bytes to " << la->ori.ip << " quota " << _rate_queue->quota() << endl;}/* ------------------------ crash ------------------------------------*/voidAccordion::crash(Args *args){ ADEBUG(1) << "crashed rawsz " << loctable->size(LOC_DEAD) << " locsz " << loctable->size() << " livesz " << loctable->live_size() << " locsz_used " << loctable->size(LOC_HEALTHY, _tt) << " livesz_used " << loctable->live_size(_tt) << " live_time " << now()-_last_joined_time << " para " << _parallelism << " timeout " << _tt << " est_n " << _est_n << endl; _last_joined_time = now(); _rate_queue->stop_queue(); loctable->del_all(); for (HashMap<ConsistentHash::CHID, Time>::iterator i = _outstanding_lookups.begin(); i != _outstanding_lookups.end(); ++i) { ADEBUG(2) << "done lookup key " << printID(i.key()) << "timeout failed started " << i.value() << endl; record_lookup_stat(_me.ip, _me.ip, now()-i.value(), false, false, 0, 0, 0); } _outstanding_lookups.clear(); _forwarded.clear(); _forwarded_nodrop.clear(); vector<IDMap>::iterator p = find(ids.begin(),ids.end(),_me); ids.erase(p); _max_succ_gap = 0; _stat.clear(); for (uint i = 0; i < 10; i++) _calculated_prob[i] = (1-0.1*i); _progress.clear(); /* for (HashMap<ConsistentHash::CHID, Time>::iterator i = _sent.begin(); i != _sent.end(); ++i) { list<IDMap>* s = (*i).value(); if (s) delete s; } _sent.clear();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -