⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pastry.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./** *  Implementation of the Pastry protocol * *  Sooraj Bhat, Adolfo Rodriguez, Chip Killian */#include "scribe-ext.h"  // Extended API#include "macedon_cache.h"//#define LOCALITY_TRACE//#define JOINING_TRACE#define PRINT_LEAFSETtypedef char character;#include <string>protocol pastryaddressing iptrace_offconstants {  int PASTRY_BITS = 32 ;  // Number of bits in the address  int B = 4 ;  // Number of bits in each address digit; Must divide PASTRY_BITS evenly  int ROWS = PASTRY_BITS/B ; // Number of rows is the number of digits in the address  int COLS = 16 ; // Number of possible values of each digit: 2^B  int DIGIT_MASK = COLS-1 ; // Used to mask only one digit of address  int MAX_LEAFSET = COLS ; // Size of the leaf set  int MAX_ROUTESET = 10 ; // Number of nodes for each route entry  int HALF = MAX_LEAFSET/2 ; // The size of the leaf set to each side of me  double PRINT_TIMEOUT = 2 ; // How often to dump stat  double MAINTENANCE_TIMEOUT = 60 ; // How often to perform routing table maintenance    double LEAFSET_TIMEOUT = 20 ; // How often to perform routing table maintenance    double MIN_JOIN_TIMEOUT = 5 ; // When to timeout a join  double MAX_JOIN_TIMEOUT = 20 ; // When to timeout a join  double MAX_PROBE_TIME = 30 ; // How long to keep soft state about probe requests  int MAX_PROBE_HISTORY = 300 ; // How many probe requests we can keep track of  double MIN_PROBE_PERIOD = 5;  //The minimum time to wait between probes  int PATHID_SIZE = 64;}states {  joining unready;  joined ready;}neighbor_types{  routeset MAX_ROUTESET {  // Redundant entries in route table    int id;  }  leafset MAX_LEAFSET { // Leafset    int id;  }  halfset HALF { // Half the leafset, one for each "side"    int id;  }  outstanding_probes MAX_PROBE_HISTORY { // Nodes I am currently probing, soft state    bool completed;  }}transports {  TCP HIGHEST;  TCP HIGH;  TCP MED;  TCP LOW;  UDP BEST_EFFORT;}messages {  HIGHEST join {    int id;    int who;    int start_row;    int rare_case; // indicates a routing "rare case"    int from_id; // only used in the rare case  }  HIGH row_info {    int id;    double time_sent;    int which_row;    routeset row COLS;  }  HIGH row_info_reply {    int id;    double time_sent;    int which_row;  }  HIGH leafset_info {    int id;    double time_sent;    leafset leaves;  }  HIGH leafset_info_request {    int id;  }  HIGH leafset_info_reply {    int id;    double time_sent;  }  HIGH inform_request {    int id;    bool joined;  }  HIGH inform {    int id;  }  HIGH row_request {    int which_row;  }	  BEST_EFFORT probe {    int id; // id of node being probed    int senderid; // id of node sending the probe    double time_sent; // time of probe request  }  BEST_EFFORT probe_reply {   // simply echo the info sent in 'probe'    int id; // id of node being probed    double time_sent; // time of probe request to correlate request and reply  }  LOW data {  // Data is sent low priority by default    int id;    int rare_case; // indicates a routing "rare case"    int from_id; // previous hop hash    int priority;  }  LOW data_path_req {    int dest_hash;    int source_ip;    int source_hash;    int one_hop_hash; // previous hop hash    int priority;  }  MED no_match {  }  MED match {    int found_addr;    int found_id;  }  HIGHEST mapping {    int low_hash;    int high_hash;    int map_ip;  }}state_variables {  int myhash; // Hash of my IP address  // Leafset state  leafset myleafset;  // The leafset referenced by IP address  halfset myleft; // The left half of the leafset reference by IP address  halfset myright; // The right half of the leafset reference by IP address  leafset myhashleafset; // The leafset reference by hash address  int Lmin; // Smallest hash in left side of leafset  int Lmax; // Largest has in right side of leafset  int Lmin_ipaddr; // IP address of Lmin  int Lmax_ipaddr; // IP address of Lmax  double leafset_timestamp; // Last time my leafset changed  int leafset_received; // When joining, denotes leafset received  int leafset_from; // When joining, the node who sent me my leafset, Z  double leafset_time_sent; // When joining, this denotes the timestamp of Z's leafset                             //   when he sent it to me  macedon_key start_space;  //For convenience, store the start of our "owned" address space.  macedon_key end_space;    //For convenience, store the end of our "owned" address space.  // The routing table   // It has 3 dimensions since each entry has backups.  routeset mytable ROWS COLS; // The routing table   double row_timestamp ROWS; // When each row last changed  int row_received ROWS; // When joining, denotes each row received  int row_from ROWS; // When joining, who sent me each row  double row_time_sent ROWS; // When joining, the timestamp of each row sent to me  mac_cache map; // Mapper of destination hashes to IP addresses  outstanding_probes probe_history; // Nodes that I am currently probing  int cached_send; //variables for measuring the efficiency of routeIP routing.  int uncached_send;  timer printer PRINT_TIMEOUT;  timer table_maintenance MAINTENANCE_TIMEOUT;  timer leafset_maintenance LEAFSET_TIMEOUT;  timer join_timer;}transitions {  init API init {    myhash = hashof(me);    print_addr(myhash);    Lmin = Lmax = myhash; // Only me in the system at first    start_space = end_space = myhash;    cached_send = uncached_send = 0;        for (int i=0; i<ROWS; i++) { // First clear all route entries      for (int j=0; j<COLS; j++) {        routeset *entry = &(mytable[i][j]);        neighbor_clear(*entry);      }    }    for (int r=0; r<ROWS; r++) { // I am the entry for each of my digits      routeset *entry = &(mytable[r][nth_digit(myhash,r)]);      neighbor_add(*entry, me);      neighbor_info(*entry, me, id) = myhash;      row_timestamp[r] = curtime;      row_received[r] = 0;    }    leafset_timestamp = curtime; // My leafset has just changed to being empty    leafset_received = 0;    timer_resched(printer, PRINT_TIMEOUT);    timer_resched(table_maintenance, MAINTENANCE_TIMEOUT);    timer_resched(leafset_maintenance, LEAFSET_TIMEOUT);    if (source_ == me) { // I am the source, just mark myself joined      state_change(joined);      replay_experiment();    }    else { // Otherwise join at the bootstrap node      state_change(joining);       route_join(source_, myhash, me, 0, 0, myhash, 0, 0, -1);      int timeout = (int)MIN_JOIN_TIMEOUT + randint((int)MAX_JOIN_TIMEOUT - (int)MIN_JOIN_TIMEOUT);      timer_resched(join_timer, timeout); // timeout the join      replay_init();    }  }  joining timer join_timer {    int i;    for (i=0; i<ROWS; i++) {  // Determine which row I am waiting for      if (row_received[i] == 0)         break;    }#ifdef JOINING_TRACE    debug_macro("Pastry: Routing a new join because join failed.  Start row=%d\n",i);#endif    // Route another join, specifying which row to start with    route_join(source_, myhash, me, i, 0, myhash, 0, 0, -1);    int timeout = (int)MIN_JOIN_TIMEOUT + randint((int)MAX_JOIN_TIMEOUT - (int)MIN_JOIN_TIMEOUT);    timer_resched(join_timer, timeout); //send a new join after this time if not yet joined.  }  joined recv join {    int id  = field(id); // joiner's hash    int who = field(who); // joiner's IP address    int rare_case;    int nexthop = make_routing_decision(id,rare_case); // sets rare case if needed#ifdef JOINING_TRACE    debug_macro("Pastry: Join recd from %.8x(%.8x).  Start row=%d\n. NextHop=%.8x me=%.8x myhash=%.8x\n",id,who,field(start_row),nexthop,me,myhash);#endif    // if last hop, send leafset    if (nexthop == me && who != me) {#ifdef JOINING_TRACE      debug_macro("Pastry: Join sending leafset to %.8x(%.8x).\n",id,who);#endif      route_leafset_info(who, myhash, curtime, myleafset, 0, 0, -1);    }    // send the relevant rows    int start = field(start_row); // The join request tells me where to start again    // finish is the last bit our addresses have in common    int finish = (nexthop == me) ? ROWS-1 : shared_prefix_length(myhash, id);#ifdef JOINING_TRACE    debug_macro("Pastry: Join sending rows to %.8x(%.8x). startrow=%d finishrow=%d\n",id,who,start,finish);#endif    for (int i=start; i<=finish && who != me; i++) { // send back each row      route_row_info(who, myhash, curtime, i, mytable[i], 0, 0, -1);    }    // forward the request    if (nexthop != me) {#ifdef JOINING_TRACE      debug_macro("Pastry: Join forwarding join from/to %.8x(%.8x) via nexthop=%.8x.\n",id,who,nexthop);#endif      // route the join, moving forward a row (bit)       route_join(nexthop, id, who, finish+1, rare_case, myhash, 0, 0, -1);    }    //NOTE: The below code has to do with maintenance, thus is not to be accounted for in the task here.    // ([2], Section 4.1, "Node failure")    if (field(rare_case)) {      // the node upstream is "missing" an entry	         int found, found_addr, found_id;      find_match(field(from_id),id,found,found_addr,found_id);      if (!found) {        // I don't have it#ifdef JOINING_TRACE        debug_macro("Pastry: Join sending no_match to %.8x due to field(rare_case).\n",from);#endif        route_no_match(from, 0, 0, -1);       }      else {        // I have it#ifdef JOINING_TRACE        debug_macro("Pastry: Join sending match [%.8x(%.8x)] to %.8x due to field(rare_case).\n",from,found_addr,found_id);#endif        route_match(from,found_addr,found_id, 0, 0, -1);      }    }    if (rare_case && nexthop == me) {      // I don't have the appropriate entry and      // neither does my "downstream" neighbor (i.e. me)#ifdef JOINING_TRACE      debug_macro("Pastry: Join doing table maintenance due to rare_case.\n");#endif      do_table_maintenance();    }    }  joined recv row_request {    sendrow(from,field(which_row));  }  joining recv row_info {#ifdef JOINING_TRACE    debug_macro("Pastry: Joining rcvd row %d from %.8x.\n",field(which_row),from);#endif    for (int i=0; i<COLS; i++) {      foreach_neighbor(neighbor_routeset*,n,field(row)[i]) {        update_state(n->ipaddr,n->id,false,false);      }    }    row_received[field(which_row)] = 1;    // remember who to reply to    row_from[field(which_row)] = from;    row_time_sent[field(which_row)] = field(time_sent);    checkFinishedJoining();  }  joining recv leafset_info {#ifdef JOINING_TRACE    debug_macro("Pastry: Joining rcvd leafset_info from %.8x.\n",from);#endif    foreach_neighbor(neighbor_leafset*,n,field(leaves)) {      update_state(n->ipaddr,n->id,false,false);     }    update_state(from, field(id), true,false); // to get the sender into leafset    leafset_timestamp = curtime;    leafset_received = 1;    // remember who to reply to    leafset_from = from;    leafset_time_sent = field(time_sent);    checkFinishedJoining();  }  joined recv inform_request {    if(field(joined)) {      update_state(from, field(id), true,true);    }    route_inform(from,myhash,0,0,-1);  }  joined recv inform {    update_state(from, field(id), true,true);  }  joined recv row_info {    for (int i=0; i<COLS; i++) {      foreach_neighbor(neighbor_routeset*,n,field(row)[i]) {        update_state(n->ipaddr, n->id, false,true);      }      }    route_row_info_reply(from, myhash, field(time_sent), field(which_row), 0, 0, -1);  }  joined recv row_info_reply {    int i = field(which_row);    if (field(time_sent) < row_timestamp[i]) {      route_row_info(from, myhash, curtime, i, mytable[i], 0, 0, -1);    }  }  //NOTE: Leafsets not covered.  joined recv leafset_info {    foreach_neighbor(neighbor_leafset*, leaf, field(leaves)) {      update_state(leaf->ipaddr, leaf->id, false,true);    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -