📄 pastry.mac
字号:
update_state(from, field(id), true,true); if (leafset_from == from && curtime <= leafset_timestamp) { foreach_neighbor(neighbor_leafset*,n,myleafset) { route_inform(n->ipaddr,myhash, 0, 0, -1); route_leafset_info(n->ipaddr, myhash, leafset_timestamp, myleafset, 0, 0, -1); } } route_leafset_info_reply(from, myhash, field(time_sent), 0, 0, -1); } joined recv leafset_info_request { update_state(from, field(id), true,true); route_leafset_info(from, myhash, curtime, myleafset, 0, 0, -1); } joined recv leafset_info_reply { if (field(time_sent) < leafset_timestamp) { route_leafset_info(from, myhash, curtime, myleafset, 0, 0, -1); } } //NOTE: Match not covered. joined recv no_match { // [2] Section 4.1 "Node failure" do_table_maintenance(); } joined recv match { // [2] Section 4.1 "Node failure" update_state(field(found_addr),field(found_id), false,true); } any recv mapping { map.insert(field(low_hash), field(high_hash), field(map_ip)); } !joined recv data_path_req { update_state(from, field(one_hop_hash), true, false); update_state(field(source_ip), field(source_hash), true, false); routedatapathreq(field(dest_hash), field(source_ip), field(source_hash), msg, size, field(priority), 0); } joined recv data_path_req { update_state(from, field(one_hop_hash), true, true); update_state(field(source_ip), field(source_hash), true, true); routedatapathreq(field(dest_hash), field(source_ip), field(source_hash), msg, size, field(priority), 0); } recv data { update_state(from, field(from_id), true, false); //false is "safe" int rare_case, nexthop; // set by routedata routedata(field(id), msg, size, field(priority), rare_case, nexthop); if (field(rare_case)) { // the node upstream is "missing" an entry // ([2], Section 4.1, "Node failure") int found, found_addr, found_id; find_match(field(from_id),field(id),found,found_addr,found_id); if (!found) { // I don't have it route_no_match(from, 0, 0, -1); } else { // I have it route_match(from,found_addr,found_id, 0, 0, -1); } } if (rare_case && nexthop == me) { // I don't have the appropriate entry and // neither does my "downstream" neighbor (i.e. me) do_table_maintenance(); } } !joined API route { //NOTE: This may cause problems for scribe, since if( upcall_forward(me, msg, size, COMM_TYPE_UNICAST) == 0 ) { upcall_deliver(msg, size, COMM_TYPE_UNICAST); } return 0; } !joined API routeIP { //NOTE: This may cause problems for scribe, since if( upcall_forward(me, msg, size, COMM_TYPE_UNICAST) == 0 ) { upcall_deliver(msg, size, COMM_TYPE_UNICAST); } return 0; } joined API route { int rare_case, nexthop; // set by routedata return_code = routedata(dest, msg, size, transport, rare_case, nexthop); // there is no one upstream of me in this case, // so I don't have to do those other checks if (rare_case && nexthop == me) { // I don't have the appropriate entry and // neither does my "downstream" neighbor (i.e. me) do_table_maintenance(); } } joined API routeIP { int lower = map.query(dest); if (lower) { // he's in my cache if( upcall_forward(lower, msg, size, COMM_TYPE_UNICAST) == 0 ) { //debug_macro( "NOTE: ip address for %.8x in cache as %.8x, sending reliably.\n", dest, lower); if(lower == me) { upcall_deliver(msg, size, COMM_TYPE_UNICAST); return_code = 0; } else { route_data(lower, dest, 0, myhash, transport, msg, size, transport); return_code = macedon_sendret; } cached_send++; } } else { // not in my cache //debug_macro( "NOTE: ip address for %.8x not in cache, use alternate routing.\n", dest); return_code = routedatapathreq (dest, me, myhash, msg, size, transport, 1); uncached_send++; } } API transport_error { if (transport_error == TCP_PEER_ALREADY_EXISTS || transport_error == TCP_PEER_ALREADY_EXISTS_OTHER) { // ignore for now } else { debug_macro("received error %d for %.8x\n", transport_error, dest_addr); ASSERT(transport_error != NO_ERROR); for(int l = 0; l < ROWS; l++) { for(int Dl = 0; Dl < COLS; Dl++) { if (neighbor_query(mytable[l][Dl],dest_addr)) { debug_macro("Removing %.8x from routing table\n", dest_addr); neighbor_remove(mytable[l][Dl], dest_addr); } } } if (neighbor_query(myleafset,dest_addr)) { debug_macro("Removing %.8x from leafset\n", dest_addr); neighbor_remove(myhashleafset,neighbor_info(myleafset,dest_addr,id)); neighbor_remove(myleafset,dest_addr); neighbor_remove(myleft,dest_addr); neighbor_remove(myright,dest_addr); recompute_leaf_bounds(); upcall_notify(myhashleafset, NBR_TYPE_PEER); } map.remove(dest_addr); } return 0; } // API transport_error !joined API downcall_ext { switch(operation) { case TEST_OWNER: { return true; } default: { //downcall not supported return -1; } } return 0; } joined API downcall_ext { switch(operation) { case TEST_OWNER: { test_owner_arg *targ = (test_owner_arg*)arg; return (isinrange(targ->nodeId,start_space,end_space)); } default: { //downcall not supported return -1; } } return 0; } any timer printer { //dump_state(); debug_macro("REPLAY My nodeid space ( %.8x , %.8x )\n",start_space,end_space); debug_macro("REPLAY Cache performance: total: %d, hits: %d, misses: %d\n",(cached_send+uncached_send),cached_send,uncached_send); #ifdef PRINT_LEAFSET std::string leafsetstr = std::string(); foreach_neighbor(neighbor_leafset*, leaf, myleafset) { char oneleaf[50]; sprintf(oneleaf, "[ %.8x / %.8x ] ", leaf->ipaddr, leaf->id); leafsetstr.append(oneleaf); } debug_macro("REPLAY My leaves: %s\n", leafsetstr.c_str()); #endif } any timer table_maintenance { do_table_maintenance(); } joined timer leafset_maintenance { ASSERT(neighbor_size(myleafset) > 0); macedon_key dest = neighbor_random(myleafset)->ipaddr; route_leafset_info_request(dest, myhash, 0, 0, -1); } !init recv probe { route_probe_reply(from,field(id),field(time_sent), 0, 0, -1); // echo } !init recv probe_reply { if(neighbor_query(probe_history, from)) { if(curtime > neighbor_info(probe_history,from,delay) + MIN_PROBE_PERIOD ) { neighbor_remove(probe_history,from); } else { neighbor_info(probe_history,from,completed)=true; } } update_table_part2(from,field(id),curtime-field(time_sent)); } }routines{ void find_match(int from_id, int id, int& found, int& found_addr, int& found_id) { // see [2], Section 4.1, "Node failure" found=0; int l = shared_prefix_length(from_id,id); int Dl = nth_digit(id,l); // I can do this (use mytable[l][Dl]) because I share at least // 'l' digits (prefix) with 'from_id' (the upstream node) if (neighbor_empty(mytable[l][Dl])) { return; } // return just one? or all of them since they all apply? neighbor_routeset *it = neighbor_closest(mytable[l][Dl]); found_addr = it->ipaddr; found_id = it->id; found = 1; } // For each row, pick a random entry from which to pick a random // primary or secondary node from which to request its routing row void do_table_maintenance() { for (int r=0; r<ROWS; r++) { int i = randint(COLS); if (neighbor_empty(mytable[r][i])) continue; neighbor_routeset *n = neighbor_random(mytable[r][i]); if(n->ipaddr != me) { route_row_request(n->ipaddr,r,0, 0, -1); } } } // Determines how many significant bits are shared by a and b int shared_prefix_length(int a, int b) { unsigned int c = a ^ b; int shift = PASTRY_BITS, count = 0; for (int i=0; i<PASTRY_BITS/B; i++) { shift -= B; if ( c>>shift == 0 ) count++; else break; } return count; } // indexed from n=0,1,..., left to right int nth_digit(int id, int n) { int shift = PASTRY_BITS - (n+1)*B; return (id>>shift) & DIGIT_MASK; } // Sees if the proposed node is a better fit // in either the routing table or leafset void update_state(int ipaddr, int id, bool knownLive, bool joined) { //printf("DEBUG: update_state %.8x(%.8x)\n",id,ipaddr); // fflush(stdout); if (id == 0) { debug_macro("FIXME: WARNING: IGNORING POTENTIAL BOGUS NODE AT IPADDR:%.8x WITH ID 00000000\n",ipaddr); return; } if (ipaddr == me) { return; } update_table(ipaddr,id,knownLive); update_leafset(ipaddr,id,knownLive,joined); } // Kicks off probes to the input node to determine if it should be part of our // routing table void update_table(int ipaddr, int id, bool knownLive) { if (ipaddr == me) return; if(neighbor_query(probe_history,ipaddr) && ( (curtime > neighbor_info(probe_history,ipaddr,delay)+MAX_PROBE_TIME) || (neighbor_info(probe_history,ipaddr,completed) && curtime > neighbor_info(probe_history,ipaddr,delay)+MIN_PROBE_PERIOD) ) ) { neighbor_remove(probe_history,ipaddr); } if(!neighbor_query(probe_history,ipaddr)) { if(!neighbor_space(probe_history)) { neighbor_outstanding_probes* earliest = neighbor_worst(probe_history); neighbor_remove(probe_history, earliest->ipaddr); } neighbor_add(probe_history, ipaddr); neighbor_info(probe_history, ipaddr, delay) = curtime; neighbor_info(probe_history, ipaddr, completed) = false; route_probe(ipaddr,id,myhash,curtime, 0, 0, -1); } update_table_part2(ipaddr,id,-1); //For debug only. } // Once the probe is complete, determine if we should add node to the routing table void update_table_part2(int ipaddr, int id, double delay) { int l = shared_prefix_length(id, myhash); int Dl = nth_digit(id,l); // printf("DEBUG: l %d, Dl %d, id %x myhash %x\n",l, Dl, id, myhash); // fflush(stdout); #ifdef LOCALITY_TRACE debug_macro("Updating Table With Node %.8x(%.8x) [l:%d, dl:%d] which has delay %f\n",ipaddr,id,l,Dl,delay);#else#ifdef JOINING_TRACE debug_macro("Updating Table With Node %.8x(%.8x) [l:%d, dl:%d] which has delay %f\n",ipaddr,id,l,Dl,delay);#endif#endif // if the entry is empty if (neighbor_space(mytable[l][Dl]) && !neighbor_query(mytable[l][Dl], ipaddr)) {#ifdef LOCALITY_TRACE debug_macro("Adding to empty table space %.8x(%.8x) -- %f\n",ipaddr,id,delay);#endif neighbor_add(mytable[l][Dl],ipaddr); neighbor_info(mytable[l][Dl],ipaddr,id) = id; if(delay >= 0) { neighbor_info(mytable[l][Dl],ipaddr,delay) = delay; } else { neighbor_info(mytable[l][Dl],ipaddr,delay) = 9999; } row_timestamp[l] = curtime; } else if (!neighbor_query(mytable[l][Dl], ipaddr) && delay >= 0) { // set is full // replace the neighbor who has worst delay neighbor_routeset* worst = neighbor_worst(mytable[l][Dl]); if (worst->delay > delay) {#ifdef LOCALITY_TRACE debug_macro("Replacing worst neighbor %.8x(%.8x) -- %f < %f %.8x(%.8x)\n",ipaddr,id,delay,worst->delay,worst->ipaddr,worst->id);#endif neighbor_remove(mytable[l][Dl],worst->ipaddr); neighbor_add(mytable[l][Dl],ipaddr); neighbor_routeset* it = neighbor_entry(mytable[l][Dl],ipaddr); it->id = id; it->delay = delay; row_timestamp[l] = curtime; } #ifdef LOCALITY_TRACE else { debug_macro("Not replacing worst neighbor %.8x(%.8x) -- %f > %f %.8x(%.8x)\n",ipaddr,id,delay,worst->delay,worst->ipaddr,worst->id); }#endif } else if (neighbor_query(mytable[l][Dl],ipaddr) && delay > 0) { //Update the delay based on this recent probe#ifdef LOCALITY_TRACE debug_macro("Updating neighbor delay %.8x(%.8x) -- %f from %f\n",ipaddr,id,delay,neighbor_info(mytable[l][Dl],ipaddr,delay));#endif neighbor_info(mytable[l][Dl],ipaddr,delay) = delay; }#ifdef LOCALITY_TRACE else { debug_macro("No action taken %.8x(%.8x) -- %f\n",ipaddr,id,delay); }#endif } void update_leafset(int ipaddr, int id, bool knownLive, bool joined) { if (ipaddr == me) return; if ( !neighbor_query(myleafset,ipaddr) ) { int sl = neighbor_space(myleft); int sr = neighbor_space(myright); int rl = range_left(id); int rr = range_right(id); //FIXME: WHY DO WE CALL UPDATE_LEAFSET AFTER INSERT? if (rl) { if(!knownLive) { route_inform_request(ipaddr,myhash,joined,0,0,-1); } else { if (sl) { add_left(ipaddr,id); } else { insert_left(ipaddr,id); update_leafset(Lmin_ipaddr, Lmin, false, joined); } } } else if (rr) { if(!knownLive) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -