📄 tapestry.c
字号:
/* * Copyright (c) 2003-2005 Jeremy Stribling * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *//* $Id: tapestry.C,v 1.56 2005/04/15 20:51:30 thomer Exp $ */#include "tapestry.h"#include "p2psim/network.h"#include <stdio.h>#include <math.h>#include <iostream>#include <map>#include "p2psim/bighashmap.hh"using namespace std;unsigned long long Tapestry::_num_lookups = 0;unsigned long long Tapestry::_num_succ_lookups = 0;unsigned long long Tapestry::_num_inc_lookups = 0;unsigned long long Tapestry::_num_fail_lookups = 0;unsigned long long Tapestry::_num_hops = 0;unsigned long long Tapestry::_total_latency = 0;Tapestry::Tapestry(IPAddress i, Args a) : P2Protocol(i), _base(a.nget<uint>("base", 16, 10)), _bits_per_digit((uint) (log10(((double) _base))/log10((double) 2))), _digits_per_id((uint) 8*sizeof(GUID)/_bits_per_digit), _redundant_lookup_num(a.nget<uint>("redundant_lookup_num", 3, 10)), _redundancy(a.nget<uint>("redundancy", 3, 10)){ assert( _base <= 256 ); // for printing reasons joined = false; _joining = false; _stab_scheduled = false; _my_id = get_id_from_ip(ip()); TapDEBUG(2) << "Constructing" << endl; _rt = New RoutingTable(this, _redundancy); _waiting_for_join = New ConditionVar(); _check_nodes_waiting = New ConditionVar(); //stabilization timer _stabtimer = a.nget<uint>("stabtimer", 10000, 10); _join_num = 0; _repair_backups = a.nget<uint>("repair_backups", 0, 10); _verbose = a.nget<bool>("verbose", 0, 10); _lookup_learn = a.nget<bool>("lookuplearn", 1, 10); _direct_reply = a.nget<bool>("direct_reply", 1, 10); _lookup_cheat = a.nget<bool>("lookupcheat", 1, 10); _check_backpointers = a.nget<bool>("checkbp", 0, 10); _nn_random = a.nget<bool>("nnrandom", 0, 10); if( _lookup_learn ) { _cachebag = New RoutingTable(this, _redundancy); } _max_lookup_time = a.nget<Time>("maxlookuptime", 4000, 10); _declare_dead_time = a.nget<Time>("declare_dead", 30000, 10); _declare_dead_num = a.nget<uint>("declare_dead_num", 4, 10); _rtt_timeout_factor = a.nget<Time>("timeout_factor", 3, 10); _max_repair_num = a.nget<uint>("max_repair_num", 5, 10); _check_nodes = New vector<check_node_args *>; // init stats while (stat.size() < (uint) STAT_SIZE) { stat.push_back(0); num_msgs.push_back(0); } // initialize an array of guid digits _my_id_digits = New uint[_digits_per_id]; for( uint i = 0; i < _digits_per_id; i++ ) { _my_id_digits[i] = get_digit( id(), i ); } _recently_dead.clear(); // start the checking thread delaycb( 0, &Tapestry::check_node_loop, (void *) 0 );}Tapestry::~Tapestry(){ delete _rt; if( _lookup_learn ) { delete _cachebag; } delete _waiting_for_join; delete _check_nodes; delete _check_nodes_waiting; delete [] _my_id_digits; TapDEBUG(2) << "Destructing" << endl; // print out statistics print_stats();}voidTapestry::record_stat(stat_type type, uint num_ids, uint num_else ){ TapDEBUG(5) << "record stat " << type << endl; assert(stat.size() > (uint) type); Node::record_bw_stat( type, num_ids, num_else ); stat[type] += 20 + 4*num_ids + num_else; num_msgs[type]++;}voidTapestry::print_stats(){ TapDEBUG(1) << "STATS: " << "join " << stat[STAT_JOIN] << " " << num_msgs[STAT_JOIN] << " lookup " << stat[STAT_LOOKUP] << " " << num_msgs[STAT_LOOKUP] << " nodelist " << stat[STAT_NODELIST] << " " << num_msgs[STAT_NODELIST] << " mc " << stat[STAT_MC] << " " << num_msgs[STAT_MC] << " ping " << stat[STAT_PING] << " " << num_msgs[STAT_PING] << " backpointer " << stat[STAT_BACKPOINTER] << " " << num_msgs[STAT_BACKPOINTER] << " mcnotify " << stat[STAT_MCNOTIFY] << " " << num_msgs[STAT_MCNOTIFY] << " nn " << stat[STAT_NN] << " " << num_msgs[STAT_NN] << " repair " << stat[STAT_REPAIR] << " " << num_msgs[STAT_REPAIR] << endl; // let's print out global lookup stats if( _num_lookups > 0 ) { cout << "PARAMS: base " << _base << " stabtimer " << _stabtimer << " redundant_lookup_num " << _redundant_lookup_num << endl; cout << "total lookups: " << _num_lookups << endl; cout << "average lookup latency: " << (((double)_total_latency)/((double) _num_lookups)) << endl; cout << "average hops: " << (((double)_num_hops)/((double) _num_lookups)) << endl; cout << "success rate: " << (((double)_num_succ_lookups)/((double) _num_lookups)) << endl; cout << "incorrect rate: " << (((double)_num_inc_lookups)/((double) _num_lookups)) << endl; cout << "fail rate: " << (((double)_num_fail_lookups)/((double) _num_lookups)) << endl; _num_lookups = 0; Node::print_stats(); }}voidTapestry::lookup(Args *args) { if( !joined ) { return; } GUID key = args->nget<GUID>("key"); if( _verbose ) { TapDEBUG(0) << "Tapestry Lookup for key " << print_guid(key) << endl; } wrap_lookup_args *wla = New wrap_lookup_args(); wla->key = key; wla->starttime = now(); wla->num_tries = 1; wla->hopcount = 0; wla->num_timeouts = 0; wla->time_timeouts = 0; lookup_wrapper( wla );}voidTapestry::lookup_wrapper(wrap_lookup_args *args){ lookup_args la; la.key = args->key; la.looker = ip(); la.starttime = args->starttime; la.lasthop = ip(); la.lastrtt = 0; lookup_return lr; lr.hopcount = 0; lr.failed = false; lr.time_done = 0; lr.num_timeouts = 0; lr.time_timeouts = 0; uint curr_join = _join_num; handle_lookup( &la, &lr ); if( !alive() || _join_num != curr_join ) { delete args; TapDEBUG(2) << "Lookup aborting in wrapper, dead or rejoined" << endl; return; } args->hopcount += lr.hopcount + lr.num_timeouts; // args.hopcount is TOTAL args->num_timeouts += lr.num_timeouts; args->time_timeouts += lr.time_timeouts; if( !lr.failed && (!_lookup_cheat || (lr.owner_id == lr.real_owner_id)) && now() - args->starttime < _max_lookup_time ) { if( _direct_reply && lr.time_done == 0 ) { // I was dead when this recursive query got to me. Ignore; return; } if( _verbose ) { TapDEBUG(0) << "Lookup complete for key " << print_guid(args->key) << ": ip " << lr.owner_ip << ", id " << print_guid(lr.owner_id) << ", hops " << args->hopcount << ", numtries " << args->num_tries << endl; } _num_lookups++; _num_succ_lookups++; _num_hops += args->hopcount; if( _direct_reply ) { _total_latency += ( lr.time_done - args->starttime ); record_lookup_stat( ip(), lr.owner_ip, lr.time_done - args->starttime, true, true, args->hopcount, args->num_timeouts, args->time_timeouts); } else { _total_latency += ( now() - args->starttime ); record_lookup_stat( ip(), lr.owner_ip, now() - args->starttime, true, true, args->hopcount, args->num_timeouts, args->time_timeouts); } delete args; } else { if( now() - args->starttime < _max_lookup_time ) { args->num_tries = args->num_tries+1; if( _verbose ) { TapDEBUG(1) << "retrying failed or incorrect lookup for key " << print_guid(args->key) << ", numtries " << args->num_tries << endl; } delaycb( 100, &Tapestry::lookup_wrapper, args ); } else { // failed or timed out if( lr.failed || !_lookup_cheat || lr.owner_id == lr.real_owner_id ) { if( _verbose ) { TapDEBUG(0) << "Lookup failed for key " << print_guid(args->key) << endl; } record_lookup_stat( ip(), ip(), _max_lookup_time, false, false, args->hopcount, args->num_timeouts, args->time_timeouts ); _num_fail_lookups++; } else if( _lookup_cheat && lr.owner_id != lr.real_owner_id ) { if( _direct_reply && lr.time_done == 0 ) { // I was dead when this recursive query got to me. Ignore; return; } if( _verbose ) { TapDEBUG(0) << "Lookup incorrect for key " << print_guid(args->key) << ": ip " << lr.owner_ip << ", id " << print_guid(lr.owner_id) << ", real root " << print_guid(lr.real_owner_id) << " hops " << args->hopcount << ", numtries " << args->num_tries << endl; } record_lookup_stat( ip(), ip(), _max_lookup_time, true, false, args->hopcount, args->num_timeouts, args->time_timeouts ); _num_inc_lookups++; } else { if( _verbose ) { TapDEBUG(0) << "failed: " << lr.failed << ", lookupcheat " << _lookup_cheat << ", owner " << lr.owner_id << ", real owner " << lr.real_owner_id << ", start " << args->starttime << ", key " << print_guid(args->key) << endl; } assert(0); // this can't be! } _num_lookups++; _num_hops += args->hopcount; // all failures get the max time _total_latency += _max_lookup_time; delete args; } }}void Tapestry::handle_lookup_done(lookup_args *args, lookup_return *ret){ ret->time_done = now();}void Tapestry::handle_lookup(lookup_args *args, lookup_return *ret){ TapDEBUG(2) << "Looking up key " << print_guid(args->key) << " for node " << args->looker << endl; TapDEBUG(5) << "hl enter" << endl; // if we're learning from lookups, learn about the last hop as well as // the source of the query if( _lookup_learn && args->lasthop != ip() ) { GUID id1 = get_id_from_ip(args->looker); GUID id2 = get_id_from_ip(args->lasthop); if( !_rt->contains( id1 ) ) { _cachebag->add( args->looker, id1, MAXTIME, false ); } if( !_rt->contains( id2 ) ) { _cachebag->add( args->lasthop, id2, args->lastrtt, false ); } } // find the next hop for the key. if it's me, i'm done IPAddress *ips = New IPAddress[_redundant_lookup_num]; for( uint i = 0; i < _redundant_lookup_num; i++ ) { ips[i] = 0; } GUID *ids = New GUID[_redundant_lookup_num]; for( uint i = 0; i < _redundant_lookup_num; i++ ) { ids[i] = 0; } next_hop( args->key, &ips, &ids, _redundant_lookup_num ); uint i = 0; uint curr_join = _join_num; for( ; i < _redundant_lookup_num; i++ ) { IPAddress next = ips[i]; TapDEBUG(3) << "Trying " << next << endl; if( next == 0 ) { continue; } if( next == ip() ) { ret->owner_ip = ip(); ret->owner_id = id(); if( _lookup_cheat ) { ret->real_owner_id = lookup_cheat( args->key ); } ret->failed = false; if( _direct_reply ) { bool succ = retryRPC( args->looker, &Tapestry::handle_lookup_done, args, ret, STAT_LOOKUP, 1, 0); // no need to count return bytes, since it's recursive if( !succ ) { // the originator is no longer alive, so ignore ret->time_done = 0; } } break; } else { // it's not me, so forward the query record_stat(STAT_LOOKUP, 1, 0); Time before = now(); GUID nextid = ids[i]; // don't want to route to nodes who have recently timed out if( _rt->get_timeout( nextid ) ) { continue; } args->lasthop = ip(); args->lastrtt = _rt->get_time( nextid ); bool succ = doRPC( next, &Tapestry::handle_lookup, args, ret, _rtt_timeout_factor*_rt->get_time(nextid) ); if( succ ) { _last_heard_map[next] = now(); // every successful RPC counts as a "hop" ret->hopcount++; if( !_direct_reply || (ret->failed && args->looker == ip()) ) { // only record in the non-direct reply case OR // if it's a direct reply and a failure, we should include // those bytes exactly once at the source (to simulate a direct // reply) record_stat( STAT_LOOKUP, 1, 2 ); } } else { // remove it from our routing table check_node_args *cna = New check_node_args(); cna->ip = next; cna->id = nextid; _check_nodes->push_back( cna ); _check_nodes_waiting->notifyAll(); TapDEBUG(3) << "Forking off check (due to lookup) of (" << cna->ip << "/" << print_guid(nextid) << ")" << endl; // record the timeout stats // every unsuccessful RPC counts as a "timeout" ret->num_timeouts++; ret->time_timeouts += (now() - before); } if( succ && !ret->failed ) { break; } else { // since we're using recursive routing, we only do this check in the // case of non-success. // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { ret->failed = true; TapDEBUG(2) << "Lookup aborting, dead or rejoined" << endl; delete ips; delete ids; return; } // print out that a failure happened if( _verbose ) { TapDEBUG(1) << "Failure happened during the lookup of key " << print_guid(args->key) << ", trying to reach node " << next << " for node " << args->looker << endl; } ret->failed = false; } } // we're retrying if we've gotten down here, so if we've // exceeded the max time we should quit if( now() - args->starttime > _max_lookup_time ) { i = _redundant_lookup_num; TapDEBUG(1) << "Timed out looking for " << print_guid(args->key) << endl; break; } } // if we were never successful, set the failed flag if( i == _redundant_lookup_num ) { TapDEBUG(1) << "setting failed to true for key " << print_guid(args->key) << endl; ret->failed = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -