⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pastry.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
    update_state(from, field(id), true,true);    if (leafset_from == from && curtime <= leafset_timestamp) {      foreach_neighbor(neighbor_leafset*,n,myleafset) {        route_inform(n->ipaddr,myhash, 0, 0, -1);        route_leafset_info(n->ipaddr, myhash, leafset_timestamp, myleafset, 0, 0, -1);      }    }    route_leafset_info_reply(from, myhash, field(time_sent), 0, 0, -1);  }  joined recv leafset_info_request {    update_state(from, field(id), true,true);    route_leafset_info(from, myhash, curtime, myleafset, 0, 0, -1);  }  joined recv leafset_info_reply {    if (field(time_sent) < leafset_timestamp) {      route_leafset_info(from, myhash, curtime, myleafset, 0, 0, -1);    }  }  //NOTE: Match not covered.  joined recv no_match { // [2] Section 4.1 "Node failure"    do_table_maintenance();  }  joined recv match { // [2] Section 4.1 "Node failure"    update_state(field(found_addr),field(found_id), false,true);  }  any recv mapping {    map.insert(field(low_hash), field(high_hash), field(map_ip));    }  !joined recv data_path_req {    update_state(from, field(one_hop_hash), true, false);    update_state(field(source_ip), field(source_hash), true, false);    routedatapathreq(field(dest_hash), field(source_ip), field(source_hash), msg, size, field(priority), 0);  }  joined recv data_path_req {    update_state(from, field(one_hop_hash), true, true);    update_state(field(source_ip), field(source_hash), true, true);    routedatapathreq(field(dest_hash), field(source_ip), field(source_hash), msg, size, field(priority), 0);  }  recv data {    update_state(from, field(from_id), true, false); //false is "safe"    int rare_case, nexthop; // set by routedata    routedata(field(id), msg, size, field(priority), rare_case, nexthop);    if (field(rare_case)) {      // the node upstream is "missing" an entry      // ([2], Section 4.1, "Node failure")      int found, found_addr, found_id;      find_match(field(from_id),field(id),found,found_addr,found_id);      if (!found) {        // I don't have it        route_no_match(from, 0, 0, -1);       } else {        // I have it        route_match(from,found_addr,found_id, 0, 0, -1);      }    }    if (rare_case && nexthop == me) {      // I don't have the appropriate entry and      // neither does my "downstream" neighbor (i.e. me)      do_table_maintenance();    }  }  !joined API route {    //NOTE: This may cause problems for scribe, since     if( upcall_forward(me, msg, size, COMM_TYPE_UNICAST) == 0 ) {      upcall_deliver(msg, size, COMM_TYPE_UNICAST);    }    return 0;  }  !joined API routeIP {    //NOTE: This may cause problems for scribe, since     if( upcall_forward(me, msg, size, COMM_TYPE_UNICAST) == 0 ) {      upcall_deliver(msg, size, COMM_TYPE_UNICAST);    }    return 0;  }  joined API route {    int rare_case, nexthop; // set by routedata    return_code = routedata(dest, msg, size, transport, rare_case, nexthop);    // there is no one upstream of me in this case,    // so I don't have to do those other checks    if (rare_case && nexthop == me) {      // I don't have the appropriate entry and      // neither does my "downstream" neighbor (i.e. me)      do_table_maintenance();    }    }  joined API routeIP {    int lower = map.query(dest);    if (lower) {   // he's in my cache      if( upcall_forward(lower, msg, size, COMM_TYPE_UNICAST) == 0 ) {        //debug_macro( "NOTE: ip address for %.8x in cache as %.8x, sending reliably.\n", dest, lower);        if(lower == me) {          upcall_deliver(msg, size, COMM_TYPE_UNICAST);          return_code = 0;        } else {          route_data(lower, dest, 0, myhash, transport, msg, size, transport);          return_code = macedon_sendret;        }        cached_send++;      }    }    else {             // not in my cache      //debug_macro( "NOTE: ip address for %.8x not in cache, use alternate routing.\n", dest);      return_code = routedatapathreq (dest, me, myhash, msg, size, transport, 1);      uncached_send++;    }  }  API transport_error {    if (transport_error == TCP_PEER_ALREADY_EXISTS ||        transport_error == TCP_PEER_ALREADY_EXISTS_OTHER) {      // ignore for now    }    else {      debug_macro("received error %d for %.8x\n", transport_error, dest_addr);      ASSERT(transport_error != NO_ERROR);      for(int l = 0; l < ROWS; l++) {        for(int Dl = 0; Dl < COLS; Dl++) {          if (neighbor_query(mytable[l][Dl],dest_addr)) {            debug_macro("Removing %.8x from routing table\n", dest_addr);            neighbor_remove(mytable[l][Dl], dest_addr);          }        }      }      if (neighbor_query(myleafset,dest_addr)) {        debug_macro("Removing %.8x from leafset\n", dest_addr);        neighbor_remove(myhashleafset,neighbor_info(myleafset,dest_addr,id));        neighbor_remove(myleafset,dest_addr);        neighbor_remove(myleft,dest_addr);        neighbor_remove(myright,dest_addr);        recompute_leaf_bounds();        upcall_notify(myhashleafset, NBR_TYPE_PEER);      }      map.remove(dest_addr);    }    return 0;  } // API transport_error  !joined API downcall_ext {    switch(operation) {      case TEST_OWNER:          {          return true;        }      default:         { //downcall not supported          return -1;        }    }    return 0;  }  joined API downcall_ext {    switch(operation) {      case TEST_OWNER:          {          test_owner_arg *targ = (test_owner_arg*)arg;          return (isinrange(targ->nodeId,start_space,end_space));        }      default:         { //downcall not supported          return -1;        }    }    return 0;  }  any timer printer {    //dump_state();    debug_macro("REPLAY My nodeid space ( %.8x , %.8x )\n",start_space,end_space);    debug_macro("REPLAY Cache performance: total: %d, hits: %d, misses: %d\n",(cached_send+uncached_send),cached_send,uncached_send);    #ifdef PRINT_LEAFSET    std::string leafsetstr = std::string();    foreach_neighbor(neighbor_leafset*, leaf, myleafset) {      char oneleaf[50];      sprintf(oneleaf, "[ %.8x / %.8x ] ", leaf->ipaddr, leaf->id);      leafsetstr.append(oneleaf);    }    debug_macro("REPLAY My leaves: %s\n", leafsetstr.c_str());    #endif  }  any timer table_maintenance {    do_table_maintenance();  }  joined timer leafset_maintenance {    ASSERT(neighbor_size(myleafset) > 0);    macedon_key dest = neighbor_random(myleafset)->ipaddr;    route_leafset_info_request(dest, myhash, 0, 0, -1);  }  !init recv probe {    route_probe_reply(from,field(id),field(time_sent), 0, 0, -1); // echo  }  !init recv probe_reply {    if(neighbor_query(probe_history, from)) {      if(curtime > neighbor_info(probe_history,from,delay) + MIN_PROBE_PERIOD ) {        neighbor_remove(probe_history,from);      } else {        neighbor_info(probe_history,from,completed)=true;      }    }    update_table_part2(from,field(id),curtime-field(time_sent));  }  }routines{  void find_match(int from_id, int id, int& found, int& found_addr, int& found_id)  {    // see [2], Section 4.1, "Node failure"    found=0;    int l = shared_prefix_length(from_id,id);    int Dl = nth_digit(id,l);    // I can do this (use mytable[l][Dl]) because I share at least    // 'l' digits (prefix) with 'from_id' (the upstream node)    if (neighbor_empty(mytable[l][Dl])) {      return;    }      // return just one? or all of them since they all apply?    neighbor_routeset *it = neighbor_closest(mytable[l][Dl]);    found_addr = it->ipaddr;    found_id = it->id;    found = 1;  }  // For each row, pick a random entry from which to pick a random  // primary or secondary node from which to request its routing row  void do_table_maintenance()  {    for (int r=0; r<ROWS; r++) {      int i = randint(COLS);      if (neighbor_empty(mytable[r][i])) 	continue;      neighbor_routeset *n = neighbor_random(mytable[r][i]);      if(n->ipaddr != me) {        route_row_request(n->ipaddr,r,0, 0, -1);      }    }  }  // Determines how many significant bits are shared by a and b  int shared_prefix_length(int a, int b)  {    unsigned int c = a ^ b;    int shift = PASTRY_BITS, count = 0;    for (int i=0; i<PASTRY_BITS/B; i++) {      shift -= B;      if ( c>>shift == 0 )        count++;      else        break;    }    return count;  }  // indexed from n=0,1,..., left to right  int nth_digit(int id, int n)  {    int shift = PASTRY_BITS - (n+1)*B;    return (id>>shift) & DIGIT_MASK;  }  // Sees if the proposed node is a better fit  // in either the routing table or leafset  void update_state(int ipaddr, int id, bool knownLive, bool joined)  {    //printf("DEBUG: update_state %.8x(%.8x)\n",id,ipaddr);    //    fflush(stdout);    if (id == 0) {      debug_macro("FIXME: WARNING: IGNORING POTENTIAL BOGUS NODE AT IPADDR:%.8x WITH ID 00000000\n",ipaddr);      return;    }    if (ipaddr == me) {      return;    }      update_table(ipaddr,id,knownLive);    update_leafset(ipaddr,id,knownLive,joined);  }  // Kicks off probes to the input node to determine if it should be part of our  // routing table  void update_table(int ipaddr, int id, bool knownLive)  {				    if (ipaddr == me) return;    if(neighbor_query(probe_history,ipaddr) &&         ( (curtime > neighbor_info(probe_history,ipaddr,delay)+MAX_PROBE_TIME) ||        (neighbor_info(probe_history,ipaddr,completed) && curtime > neighbor_info(probe_history,ipaddr,delay)+MIN_PROBE_PERIOD) ) )        {      neighbor_remove(probe_history,ipaddr);     }      if(!neighbor_query(probe_history,ipaddr)) {      if(!neighbor_space(probe_history)) {        neighbor_outstanding_probes* earliest = neighbor_worst(probe_history);        neighbor_remove(probe_history, earliest->ipaddr);      }      neighbor_add(probe_history, ipaddr);      neighbor_info(probe_history, ipaddr, delay) = curtime;      neighbor_info(probe_history, ipaddr, completed) = false;      route_probe(ipaddr,id,myhash,curtime, 0, 0, -1);    }      update_table_part2(ipaddr,id,-1); //For debug only.  }  // Once the probe is complete, determine if we should add node to the routing table  void update_table_part2(int ipaddr, int id, double delay)  {    int l = shared_prefix_length(id, myhash);    int Dl = nth_digit(id,l);        //    printf("DEBUG: l %d, Dl %d, id %x myhash %x\n",l, Dl, id, myhash);    //    fflush(stdout);    #ifdef LOCALITY_TRACE    debug_macro("Updating Table With Node %.8x(%.8x) [l:%d, dl:%d] which has delay %f\n",ipaddr,id,l,Dl,delay);#else#ifdef JOINING_TRACE    debug_macro("Updating Table With Node %.8x(%.8x) [l:%d, dl:%d] which has delay %f\n",ipaddr,id,l,Dl,delay);#endif#endif        // if the entry is empty    if (neighbor_space(mytable[l][Dl]) && !neighbor_query(mytable[l][Dl], ipaddr)) {#ifdef LOCALITY_TRACE      debug_macro("Adding to empty table space %.8x(%.8x) -- %f\n",ipaddr,id,delay);#endif            neighbor_add(mytable[l][Dl],ipaddr);      neighbor_info(mytable[l][Dl],ipaddr,id) = id;      if(delay >= 0) {        neighbor_info(mytable[l][Dl],ipaddr,delay) = delay;      } else {        neighbor_info(mytable[l][Dl],ipaddr,delay) = 9999;      }      row_timestamp[l] = curtime;    }		        else if (!neighbor_query(mytable[l][Dl], ipaddr) && delay >= 0) { // set is full      // replace the neighbor who has worst delay      neighbor_routeset* worst = neighbor_worst(mytable[l][Dl]);      if (worst->delay > delay)      {#ifdef LOCALITY_TRACE        debug_macro("Replacing worst neighbor %.8x(%.8x) -- %f < %f %.8x(%.8x)\n",ipaddr,id,delay,worst->delay,worst->ipaddr,worst->id);#endif        neighbor_remove(mytable[l][Dl],worst->ipaddr);        neighbor_add(mytable[l][Dl],ipaddr);        neighbor_routeset* it = neighbor_entry(mytable[l][Dl],ipaddr);        it->id = id;        it->delay = delay;        row_timestamp[l] = curtime;      } #ifdef LOCALITY_TRACE      else {        debug_macro("Not replacing worst neighbor %.8x(%.8x) -- %f > %f %.8x(%.8x)\n",ipaddr,id,delay,worst->delay,worst->ipaddr,worst->id);      }#endif    } else if (neighbor_query(mytable[l][Dl],ipaddr) && delay > 0) {      //Update the delay based on this recent probe#ifdef LOCALITY_TRACE      debug_macro("Updating neighbor delay %.8x(%.8x) -- %f from %f\n",ipaddr,id,delay,neighbor_info(mytable[l][Dl],ipaddr,delay));#endif      neighbor_info(mytable[l][Dl],ipaddr,delay) = delay;    }#ifdef LOCALITY_TRACE    else {      debug_macro("No action taken %.8x(%.8x) -- %f\n",ipaddr,id,delay);    }#endif  }  void update_leafset(int ipaddr, int id, bool knownLive, bool joined)  {    if (ipaddr == me) return;    if ( !neighbor_query(myleafset,ipaddr) ) {      int sl = neighbor_space(myleft);      int sr = neighbor_space(myright);      int rl = range_left(id);      int rr = range_right(id);      //FIXME: WHY DO WE CALL UPDATE_LEAFSET AFTER INSERT?      if (rl) {        if(!knownLive) {          route_inform_request(ipaddr,myhash,joined,0,0,-1);        } else {          if (sl) {            add_left(ipaddr,id);          }          else {            insert_left(ipaddr,id);            update_leafset(Lmin_ipaddr, Lmin, false, joined);          }        }      }      else if (rr) {        if(!knownLive) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -