📄 bullet.mac
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met://// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above copyright// notice, this list of conditions and the following disclaimer in// the documentation and/or other materials provided with the// distribution.// * Neither the names of Duke University nor The University of// California, San Diego, nor the names of its contributors// may be used to endorse or promote products derived from// this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./** * Implementation of the Bullet protocol * * Adolfo Rodriguez * Dejan Kostic */#include "adolfo_filter.h"#include "smooth_filter.h"#include "working_set.h"#include "candidate_set.h"protocol bullet uses randtreeaddressing iptrace_offconstants { int BULLET_MAX_PEERS = 10; int BULLET_MAX_CHILDREN = 15; int BULLET_MAX_CANDS = 10; int BULLET_REFRESH_INTERVAL = 4; int BULLET_DECISION_INTERVAL = 5; int BULLET_PRINTER_INTERVAL = 1; int BULLET_START_RANSUB = 15; int BULLET_NORMAL_RANSUB = 5; int BULLET_SHORT_RANSUB = 1; int BULLET_DATA_PARALLEL = 2; int BULLET_DATA_PARENT = 1; double BULLET_CLOSING_RATIO = 0.5; int BULLET_STRAT_DISJOINT=1; int BULLET_STRAT_SPLITTER=2; int BULLET_STRAT_STREAMTOALL=3; int BULLET_STRAT_TRY=4;}node_types { source; receiver;}states { joined;}neighbor_types { receivers_from_me BULLET_MAX_PEERS { double last_refresh; int sent_this_refresh; double reported_total_bandwidth; double reported_peer_bandwidth; bloom_filter recipient_digest;// bitmap_digest recipient_digest; int parent_last; int parallel_last; list<int> keys; double start_time; } senders_to_me BULLET_MAX_PEERS { adolfo_filter arrivals; adolfo_filter useful; int data_count; int total_data_count; int useful_count; int total_useful_count; double start_time; } ransub_parent 1 { adolfo_filter arrivals; adolfo_filter useful; } ransub_children BULLET_MAX_CHILDREN { int sent; int gas_sent; // only for statistics int not_sent; double bandwidth_factor; // called limiting_factor in the paper double density; // called sending_factor in the paper bloom_filter kid_digest;// bitmap_digest kid_digest; int seq; int represents; candidate_set<cand_bullet_summary_ticket> gathered; }}messages { PRIORITY_HIGHEST collect { int sequence; int descendants; candidate_set<cand_bullet_summary_ticket> mycollect; } PRIORITY_HIGHEST distribute { int sequence; int population; candidate_set<cand_bullet_summary_ticket> mydistribute; } PRIORITY_HIGH update_sender { int low_key; int high_key; short mod; short mod_max; double total_bandwidth; double peer_bandwidth; flat_bloom filter;// flat_bitmap filter; } PRIORITY_HIGH request_denied { } PRIORITY_HIGH update_parent { flat_bloom filter;// flat_bitmap filter; } PRIORITY_HIGH remove { } PRIORITY_HIGH ask { int key_seq; } PRIORITY_MED papa_data { short comm_type; int key_seq; } PRIORITY_LOW peer_data { short comm_type; int key_seq; } PRIORITY_MED collect_data { int receiver; int sender; int comm_type; int priority; }}state_variables { char * got_msg nodump; int got_from; int got_key; int got_type; int got_size; int multicast_success_code; adolfo_filter master; adolfo_filter master_useful; receivers_from_me getters; senders_to_me givers; int curkey; working_set working_file; working_set parent_file; working_set parallel_file; smooth_filter smooth_bandwidth; timer refresh BULLET_REFRESH_INTERVAL; timer decision BULLET_DECISION_INTERVAL; timer printer BULLET_PRINTER_INTERVAL; timer sending; ransub_parent myparent; ransub_children mychildren; int sequence; int total_received; int total_expected; int population; int descendants; int collect_expired; int collect_missing; candidate_set<cand_bullet_summary_ticket> curset; timer ransub; //char * cached_msg;} transitions { init API init { curkey = 0; sequence = 0; total_received = 0; collect_missing = 0; //cached_msg = (char *) malloc(parameters.getint("data_packet_size")); state_change(joined); timer_resched(decision, BULLET_DECISION_INTERVAL); timer_resched(printer, BULLET_PRINTER_INTERVAL); if ( source_ == me ) { neighbor_add (myparent, 0); replay_experiment(); timer_resched(ransub, BULLET_START_RANSUB); } else { replay_init(); timer_resched(refresh, BULLET_REFRESH_INTERVAL); } }// ---------------------------------------------- // refresh// ---------------------------------------------- joined timer refresh { int mysize; unsigned char *my_filter = (unsigned char*)working_file.export_digest(mysize);// if (my_filter->tab_size != BLOOM_SIZE) {// printf("Receiver: something is wrong with your filter %x with size %d\n", my_filter, my_filter->tab_size);// exit(90);// } if (neighbor_size(givers)) { int piece = (working_file.get_last() - working_file.get_earliest())/neighbor_size(givers); sprintf(trace_buf_, "Receiver: dividing range %d (%d - %d)between %d senders\n", working_file.get_last() - working_file.get_earliest(), working_file.get_earliest(), working_file.get_last(), neighbor_size(givers)); trace_print(); int starting = working_file.get_earliest(); int ending = working_file.get_last(); int sender_num=0; foreach_neighbor (neighbor_senders_to_me*, givuh, givers) { int modval = (sender_num+sequence) % neighbor_size(givers); sprintf(trace_buf_, "Filter: sending to %x mod %d max %d low %d high %d\n", givuh->ipaddr, modval, neighbor_size(givers), starting, ending); trace_print(); int bullet_strategy = parameters.getint("bullet_strategy"); if (bullet_strategy == BULLET_STRAT_STREAMTOALL) {#if USE_BLOOM route_update_sender(givuh->ipaddr, starting, ending, 0, 1, master_useful.get_value(), givuh->useful.get_value(), *(flat_bloom*)my_filter, 0, 0, -1); #else route_update_sender(givuh->ipaddr, starting, ending, 0, 1, master_useful.get_value(), givuh->useful.get_value(), *(flat_bitmap*)my_filter, 0, 0, -1); #endif } else {#if USE_BLOOM route_update_sender(givuh->ipaddr, starting, ending, modval, neighbor_size(givers), master_useful.get_value(), givuh->useful.get_value(), *(flat_bloom*)my_filter, 0, 0, -1);#else route_update_sender(givuh->ipaddr, starting, ending, modval, neighbor_size(givers), master_useful.get_value(), givuh->useful.get_value(), *(flat_bitmap*)my_filter, 0, 0, -1);#endif } sender_num++; } } if (neighbor_size(myparent)) { neighbor_ransub_parent *papa = neighbor_random(myparent);#if USE_BLOOM route_update_parent(papa->ipaddr, *(flat_bloom*)my_filter, 0, 0, -1);#else route_update_parent(papa->ipaddr, *(flat_bitmap*)my_filter, 0, 0, -1);#endif } unsigned char *temp = (unsigned char *)my_filter; delete [] temp; } // ---------------------------------------------- // update_sender// ---------------------------------------------- joined recv update_sender { // delivers a bloom filter to start sending data neighbor_receivers_from_me *guy; double application_spacing = (double)(parameters.getint("data_packet_size")) *8/(1000.0*(double) parameters.getint("streaming_rate")); if (neighbor_size(getters)==0) // adding the first guy, start sender timer timer_resched(sending, application_spacing); if (!neighbor_query (getters, from)) { if (!neighbor_space(getters)) { route_request_denied(from, 0, 0, -1); return; } else { neighbor_add (getters, from); neighbor_receivers_from_me *gettuh = neighbor_entry(getters, from); gettuh->start_time = curtime; sprintf(trace_buf_, "Sender: Added %x as a receiver\n", from); trace_print(); } } guy = neighbor_entry (getters, from); guy->last_refresh = curtime; guy->sent_this_refresh = 0; guy->reported_total_bandwidth = field(total_bandwidth); guy->reported_peer_bandwidth = field(peer_bandwidth); guy->recipient_digest.import( (unsigned char *)&(field(filter))); guy->keys.clear(); parent_file.get_modulo_keys(guy->keys, (digest &)guy->recipient_digest, field(low_key), field(high_key), field(mod), field(mod_max) ); parallel_file.get_modulo_keys(guy->keys, (digest &)guy->recipient_digest, field(low_key), field(high_key), field(mod), field(mod_max)); sprintf(trace_buf_, "Filter: updated for %x keys %d, low %d high %d mod %d max %d\n", from, guy->keys.size(), field(low_key), field(high_key), field(mod), field(mod_max)); trace_print(); } joined recv update_parent { neighbor_ransub_children *kid; if (neighbor_query (mychildren, from)) { kid = neighbor_entry(mychildren, from); kid->kid_digest.import( (unsigned char *)&(field(filter))); } } joined recv request_denied { if (neighbor_query(givers, from)) { sprintf(trace_buf_, "Receiver: sender %.8x denied my request\n", from); trace_print(); neighbor_remove (givers, from); } } joined recv remove { // request for this node to stop sending data // printf("Sender: got remove request from %.8x\n", from); if (neighbor_query (getters, from)) { // printf("Sender: remove done from %.8x\n", from); neighbor_remove (getters, from); } } joined recv ask { // asks for a specific key, not yet implemented }// ---------------------------------------------- // sending// ---------------------------------------------- joined timer sending { int should_have_sent; if (neighbor_size(getters)!=0) { foreach_neighbor (neighbor_receivers_from_me*, getta, getters) { if (getta->keys.size()) { sprintf(trace_buf_,"need to send %x keys %d\n", getta->ipaddr, getta->keys.size()); cut_trace(); if (curtime < getta->last_refresh + BULLET_REFRESH_INTERVAL+1.0) { // allow the update to be late up to 1 second, no more double sendtime = min(curtime-getta->last_refresh, (double)BULLET_REFRESH_INTERVAL); double prev_sendtime = sendtime; should_have_sent = (int)ceil((sendtime * (getta->keys.size() + getta->sent_this_refresh)) / (double)BULLET_REFRESH_INTERVAL); if (getta->sent_this_refresh > should_have_sent) continue; int num_send = should_have_sent - getta->sent_this_refresh; num_send = min(num_send, 20); // send up to 20 since that is the queue limit anyway int trash; sprintf(trace_buf_, "Sender: for %.8x keys %d, sent %d should have %d will send %d last %f\n", getta->ipaddr, getta->keys.size(), getta->sent_this_refresh, should_have_sent, num_send, getta->last_refresh); cut_trace(); while (num_send > 0 && getta->keys.size()>0) { list< int>::iterator traverse = getta->keys.begin(); int value = *traverse;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -