⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 accordion.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 4 页
字号:
/* * Copyright (c) 2003-2005 Jinyang Li *                    Massachusetts Institute of Technology *  * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: *  * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. *  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */#include "accordion.h"#include "ratecontrolqueue.h"#include <math.h>#include <stdio.h>#include <assert.h>#include <iostream>vector<uint> Accordion::rtable_sz;#define EST_TIMEOUT_SZ 100vector<IDMap> Accordion::ids;bool Accordion::sorted;vector<double> Accordion::sort_live;vector<double> Accordion::sort_dead;Accordion::Accordion(IPAddress i, Args& a) : P2Protocol(i){  _stab_basic_timer = a.nget<uint>("basictimer", 144000, 10);  _fixed_lookup_to = (double)(a.nget<uint>("fixed_lookup_to",90,10))/100.0;  _fixed_stab_to = (double)(a.nget<uint>("fixed_stab_to",100,10))/100.0;  _fixed_stab_int = a.nget<uint>("fixed_stabtimer",0,10);  if (_fixed_stab_int)    _parallelism = a.nget<uint>("fixed_para",0,10);  else    _parallelism = 1;  _recurs = (bool) a.nget<uint>("recurs",1,10);  _bw_overhead = a.nget<uint>("overhead_rate", 10, 10);  _burst_sz = a.nget<uint>("burst_size",0,10);  uint _burst;  if (!_burst_sz) {    _burst = a.nget<uint>("burst",10,10);    _burst_sz = _burst * _bw_overhead;  }else    _burst = _burst_sz/_bw_overhead;  uint _min_bw_overhead = a.nget<uint>("min_rate",0,10);  uint _max_bw_overhead = a.nget<uint>("max_rate",0,10);  uint numnodes = Network::Instance()->gettopology()->num();  if (_min_bw_overhead && _max_bw_overhead) {    _bw_overhead = _min_bw_overhead +       (uint)((this->first_ip()-1)*((double)(_max_bw_overhead-_min_bw_overhead+1)/(double)numnodes));    uint mid =  (_min_bw_overhead+_max_bw_overhead)/2;    if (_bw_overhead == _min_bw_overhead)      _special = 1;    else if (_bw_overhead == mid)      _special = 2;    else if (_bw_overhead == _max_bw_overhead)      _special = 3;    else      _special = 0;    _burst_sz = _burst * _bw_overhead;    _adjust_interval = _burst * 500;  }else {    _adjust_interval = 500*_burst_sz/_bw_overhead;    _special = 0;  }  _me.ip = ip();  _me.id = ConsistentHash::ip2chid(_me.ip);  _me.timestamp = 0;  _me.alivetime = 0;  _id = _me.id;  _rate_queue = New RateControlQueue(this, (double)_bw_overhead, _burst_sz, _fixed_stab_int, Accordion::empty_cb);  _next_adjust = _adjust_interval;  _last_calculated = 0;  _stat.clear();  for (uint i = 0;i < 10; i++)     _calculated_prob.push_back(1-0.1*i);  _lookup_times = 0;  _empty_times = 0;  _nsucc = a.nget<uint>("successors",16,10);  _to_multiplier = a.nget<uint>("timeout_multiplier", 3, 10);  _learn_num = a.nget<uint>("learn_num",5,10);  _max_p = _burst_sz/(40 + 8 * _learn_num);  if (_max_p > 6)    _max_p = 6;  else if (_max_p < 1)     _max_p = 1;  _stab_basic_running = false;  _join_scheduled = 0;  _last_stab = 0;  loctable = New LocTable();  loctable->init(_me);  _wkn.ip = 0;  ids.push_back(_me);  _top = Network::Instance()->gettopology();  _max_succ_gap = 0;  _est_n = 100;  if (_fixed_stab_int)    _tt = _fixed_lookup_to;  else    _tt = 0.9;}Accordion::~Accordion(){  if (alive()) {    vector<IDMap>::iterator p = find(ids.begin(),ids.end(),_me);    ids.erase(p);    for (HashMap<ConsistentHash::CHID, Time>::iterator i = _outstanding_lookups.begin();	i != _outstanding_lookups.end(); ++i) {      ADEBUG(2) << "done lookup key " << printID(i.key()) << "timeout failed started " 	<< i.value() << endl;      record_lookup_stat(_me.ip, _me.ip, now()-i.value(), false, false, 0, 0, 0);    }    if (ids.size() == 0)  {      Node::print_stats();      printf("<-----STATS----->\n");      sort(rtable_sz.begin(),rtable_sz.end());      uint totalrtable = 0;      uint rsz = rtable_sz.size();      for (uint i = 0; i < rsz; i++) 	totalrtable += rtable_sz[i];      printf("RTABLE:: 10p:%u 50p:%u 90p:%u avg:%.2f\n", rtable_sz[(uint)(0.1*rsz)], rtable_sz[(uint)(0.5*rsz)],	  rtable_sz[(uint)(0.9*rsz)], (double)totalrtable/(double)rsz);      printf("<-----ENDSTATS----->\n");    }  }  delete loctable;  delete _rate_queue;}unsignedAccordion::PKT_SZ(unsigned ids, unsigned others){  return PKT_OVERHEAD + 4 * ids + others;}/* -------------- initstate ---------------- */voidAccordion::initstate(){  if (!sorted)    sort(ids.begin(),ids.end(), IDMap::cmp);  uint sz = ids.size();  //add successors  uint my_pos = find(ids.begin(), ids.end(), _me) - ids.begin();  IDMap n;  for (uint i = 1; i <= _nsucc; i++)  {    n = ids[(my_pos+i) % sz];    n.timestamp = now();    loctable->add_node(n,true);  }  _est_n = ((ConsistentHash::CHID)-1)/ConsistentHash::distance(ids[my_pos%sz].id,ids[(my_pos+_nsucc)%sz].id);  _est_n = _nsucc * _est_n;  //add predecessor  loctable->add_node(ids[(my_pos-1) % sz]);  //add random nodes  for (uint i = 0; i < 20; i++) {    uint r = random() % sz;    if (ids[r].id != _me.id) {      n = ids[r];      n.timestamp = now();      loctable->add_node(n);    }  }  IDMap succ = loctable->succ(_me.id+1);  ADEBUG(3) << "inited succ " << succ.ip << "," << printID(succ.id)     << " locsz " << loctable->size() << " succsz " << loctable->succ_size()     << " maxp " << _max_p << endl;}/* -------------- join --------------------- */voidAccordion::join(Args *args){  if (!alive()) return;  _last_bytes = 0;  _last_bytes_time = now();  while ((!_wkn.ip) || (!Network::Instance()->alive(_wkn.ip))) {    _wkn.ip = Network::Instance()->getnode(ids[random()%ids.size()].ip)->ip();    _wkn.id = Network::Instance()->getnode(_wkn.ip)->id();    _wkn.alivetime = 0;  }  if (args) {    _tt = 0.9;    _last_joined_time = now();    _wkn.timestamp = now();    _me.ip = ip();    _me.id = ConsistentHash::ip2chid(_me.ip);    _me.timestamp = now();    _id = _me.id;    loctable->init(_me);    loctable->add_node(_wkn);    vector<IDMap>::iterator p = upper_bound(ids.begin(),ids.end(),_me, IDMap::cmp);    if (p->id!=_me.id)      ids.insert(p,1,_me);    if (!_fixed_stab_int)       _parallelism = 1;    ADEBUG(1) << "start to join " << printID(_me.id-1) << " locsz " << loctable->size()        << " livesz " << loctable->live_size()       << " succsz " << loctable->succ_size() << " wkn " << _wkn.ip << endl;  }else{    ADEBUG(1) << "repeated join " << printID(_me.id-1) << " wkn " << _wkn.ip << " locsz "       << loctable->size() << endl;  }    IDMap succ = loctable->succ(_me.id+1);  if ((args && args->nget<uint>("first",0,10)==1) || (succ.ip && succ.ip!=_me.ip)) {    //start basic successor stabilization    _last_joined_time = 0;    _join_scheduled = 0;    if (!_stab_basic_running) {      _stab_basic_running = true;      delaycb(0,&Accordion::stab_succ,(void *)0);    } else {      delaycb(0,&Accordion::fix_succ, (void *)0);    }    return;  }  _join_scheduled = now();  lookup_args *la = New lookup_args;  lookup_ret *lr = New lookup_ret;  bzero(la,sizeof(lookup_args));  la->key = _me.id - 1;  la->m = _nsucc;  la->ori = _me;  la->ori.alivetime = now()-_last_joined_time;  la->ori.timestamp = 1;  la->no_drop = true;  la->parallelism = 1;  la->type = TYPE_JOIN_LOOKUP;  la->learnsz = _learn_num;  la->overshoot = 0;  la->nexthop = _wkn;  _rate_queue->do_rpc(_wkn.ip, &Accordion::find_successors_handler,      &Accordion::null_cb, la, lr, (uint)0, la->type,      PKT_SZ(1,1), PKT_SZ(2*la->m,1),TIMEOUT(_me.ip, _wkn.ip));}intAccordion::null_cb(bool b, lookup_args *a, lookup_ret *r){  if (!a->nexthop.ip)    abort();  a->nexthop.timestamp = now();  loctable->update_ifexists(a->nexthop);  if (a) delete a;  if (r) delete r;  if (b)    return PKT_OVERHEAD;  else     return 0;}voidAccordion::join_handler(lookup_args *la, lookup_ret *lr){  la->src.timestamp = now();  loctable->update_ifexists(la->src);  la->from.timestamp = now();  if (lr->v.size() > 0)     loctable->add_node(la->from);  for (uint i = 0; i < lr->v.size(); i++) {    if (lr->v[i].ip != _me.ip)  {      loctable->add_node(lr->v[i],true);    }    /*    if (i!=0) {      ConsistentHash::CHID gap = lr->v[i].id - lr->v[i-1].id;      if (_max_succ_gap == 0 || gap > _max_succ_gap) 	_max_succ_gap = gap;    }     */  }  IDMap succ = loctable->succ(_me.id+1);  if (!succ.ip) {    ADEBUG(1) << "join_handler join failed sz " << lr->v.size()       << " locsz " << loctable->size() << endl;    delaycb(5000, &Accordion::join, (Args *)0);  } else {    _join_scheduled = 0;    //start basic successor stabilization    if (!_stab_basic_running) {      _stab_basic_running = true;      delaycb(0,&Accordion::stab_succ,(void *)0);    } else {      delaycb(0,&Accordion::fix_pred,(void *)0);    }    join_learn();    IDMap succ = loctable->succ(_me.id+1);    IDMap pred = loctable->pred(_me.id-1);    vector<IDMap> scs = loctable->succs(_me.id+1,100);    ADEBUG(1) << "joined succ " << succ.ip << "," << printID(succ.id)       << "locsz " << loctable->size() << " livesz "       << loctable->live_size() << " succs: " << print_succs(scs) <<       " scs: " << print_succs(lr->v)  << " pred " << la->from.ip       << "," << printID(la->from.id) << " mypred " << pred.ip << endl;  }}voidAccordion::join_learn(){  vector<IDMap> scs = loctable->succs(_me.id+1,_nsucc);  if (scs.size() < (_nsucc/2)) return;  Time min = 1000000;  IDMap min_n;  min_n.ip = 0;  for (uint i = 0; i < scs.size(); i++) {    if (_top->latency(_me.ip, scs[i].ip) < min) {      min = _top->latency(_me.ip, scs[i].ip);      min_n = scs[i];    }  }  learn_args *la = New learn_args;  learn_ret *lr = New learn_ret;  la->m = 3 * _learn_num; //means i want to learn from all  la->n = min_n;  la->src = _me;  la->src.alivetime = now()-_last_joined_time;  la->end = _me;  ADEBUG(2) << "join_learn from " << la->n.ip << ","     << printID(la->n.id) << endl;  _rate_queue->do_rpc(min_n.ip, &Accordion::learn_handler,       &Accordion::learn_cb, la, lr, 3, TYPE_FINGER_UP,       PKT_SZ(0,1), PKT_SZ(2*la->m,0),TIMEOUT(_me.ip,min_n.ip));}voidAccordion::find_successors_handler(lookup_args *la, lookup_ret *lr){  if (la->nexthop.ip == _me.ip)    la->nexthop.alivetime = now()-_last_joined_time;  else    abort();  IDMap succ = loctable->succ(_me.id+1);  if (!succ.ip) {    /*      lookup_args *lla = New lookup_args;      lla->src = _me;      lla->src.alivetime = now()-_last_joined_time;      lla->from = _me;      lla->from.alivetime = now()-_last_joined_time;      bcopy(la,lla,sizeof(lookup_args));      lookup_ret *llr = New lookup_ret;      llr->v.clear();      lla->nexthop.ip = 0;      _rate_queue->do_rpc(la->ori.ip, &Accordion::join_handler,	  &Accordion::null_cb, lla, llr, 1, TYPE_JOIN_LOOKUP, 	  PKT_SZ(2*llr->v.size(),0),PKT_SZ(0,0),TIMEOUT(_me.ip, la->ori.ip));	  */	ADEBUG(2) << "find_successors_handler failed for " << la->ori.ip 	  << "," << printID(la->ori.id) << " not joined" << endl;    }else{      lookup_args lla;      bcopy(la,&lla,sizeof(lookup_args));      lla.src = _me;      lla.src.alivetime = now()-_last_joined_time;      lla.src.timestamp = now();      lla.from = _me;      lla.from.alivetime = lla.src.alivetime;      lla.no_drop = true;      ADEBUG(3)<<"find_successors_handler key " << printID(lla.key) << " from " 	<< lla.ori.ip << "," << printID(lla.ori.id) << endl;      next_recurs(&lla,NULL);    }  ADEBUG(5) << " find_successors_handler reply " << PKT_SZ(0,1) << " bytes to "     << la->ori.ip << " quota " << _rate_queue->quota() << endl;}/* ------------------------ crash ------------------------------------*/voidAccordion::crash(Args *args){  ADEBUG(1) << "crashed rawsz " << loctable->size(LOC_DEAD) << " locsz " << loctable->size() << " livesz " << loctable->live_size()     << " locsz_used " << loctable->size(LOC_HEALTHY, _tt) << " livesz_used "     << loctable->live_size(_tt) << " live_time " << now()-_last_joined_time     << " para " << _parallelism << " timeout " << _tt << " est_n " << _est_n << endl;  _last_joined_time = now();  _rate_queue->stop_queue();  loctable->del_all();  for (HashMap<ConsistentHash::CHID, Time>::iterator i = _outstanding_lookups.begin();      i != _outstanding_lookups.end(); ++i) {    ADEBUG(2) << "done lookup key " << printID(i.key()) << "timeout failed started "      << i.value() << endl;    record_lookup_stat(_me.ip, _me.ip, now()-i.value(), false, false, 0, 0, 0);  }  _outstanding_lookups.clear();  _forwarded.clear();  _forwarded_nodrop.clear();  vector<IDMap>::iterator p = find(ids.begin(),ids.end(),_me);  ids.erase(p);  _max_succ_gap = 0;  _stat.clear();  for (uint i = 0; i < 10; i++)    _calculated_prob[i] = (1-0.1*i);  _progress.clear();  /*  for (HashMap<ConsistentHash::CHID, Time>::iterator i = _sent.begin();            i != _sent.end(); ++i) {    list<IDMap>* s = (*i).value();    if (s)  delete s;  }  _sent.clear();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -