⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bullet.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./** *  Implementation of the Bullet protocol * *  Adolfo Rodriguez *  Dejan Kostic */#include "adolfo_filter.h"#include "smooth_filter.h"#include "working_set.h"#include "candidate_set.h"protocol bullet uses randtreeaddressing iptrace_offconstants {  int BULLET_MAX_PEERS = 10;  int BULLET_MAX_CHILDREN = 15;  int BULLET_MAX_CANDS = 10;  int BULLET_REFRESH_INTERVAL = 4;  int BULLET_DECISION_INTERVAL = 5;  int BULLET_PRINTER_INTERVAL = 1;  int BULLET_START_RANSUB = 15;  int BULLET_NORMAL_RANSUB = 5;  int BULLET_SHORT_RANSUB = 1;  int BULLET_DATA_PARALLEL = 2;  int BULLET_DATA_PARENT = 1;  double BULLET_CLOSING_RATIO = 0.5;  int BULLET_STRAT_DISJOINT=1;  int BULLET_STRAT_SPLITTER=2;  int BULLET_STRAT_STREAMTOALL=3;  int BULLET_STRAT_TRY=4;}node_types {  source;  receiver;}states {  joined;}neighbor_types {  receivers_from_me BULLET_MAX_PEERS {    double last_refresh;    int sent_this_refresh;    double reported_total_bandwidth;    double reported_peer_bandwidth;    bloom_filter recipient_digest;//      bitmap_digest recipient_digest;    int parent_last;    int parallel_last;    list<int> keys;    double start_time;  }  senders_to_me BULLET_MAX_PEERS {    adolfo_filter arrivals;    adolfo_filter useful;    int data_count;    int total_data_count;    int useful_count;    int total_useful_count;    double start_time;  }  ransub_parent 1 {    adolfo_filter arrivals;    adolfo_filter useful;  }  ransub_children BULLET_MAX_CHILDREN {    int sent;    int gas_sent;   // only for statistics    int not_sent;    double bandwidth_factor;  // called limiting_factor in the paper    double density; // called sending_factor in the paper    bloom_filter kid_digest;//      bitmap_digest kid_digest;    int seq;    int represents;    candidate_set<cand_bullet_summary_ticket> gathered;  }}messages {  PRIORITY_HIGHEST collect {    int sequence;    int descendants;    candidate_set<cand_bullet_summary_ticket> mycollect;  }  PRIORITY_HIGHEST distribute {    int sequence;    int population;    candidate_set<cand_bullet_summary_ticket> mydistribute;  }  PRIORITY_HIGH update_sender {    int low_key;    int high_key;    short mod;    short mod_max;    double total_bandwidth;    double peer_bandwidth;    flat_bloom filter;//      flat_bitmap filter;  }  PRIORITY_HIGH request_denied {  }  PRIORITY_HIGH update_parent {    flat_bloom filter;//      flat_bitmap   filter;  }  PRIORITY_HIGH remove {  }  PRIORITY_HIGH ask {    int key_seq;	      }  PRIORITY_MED papa_data {    short comm_type;    int key_seq;  }  PRIORITY_LOW peer_data {    short comm_type;    int key_seq;  }  PRIORITY_MED collect_data {     int receiver;    int sender;    int comm_type;    int priority;  }}state_variables {	   char * got_msg nodump;  int got_from;  int got_key;  int got_type;  int got_size;  int multicast_success_code;  adolfo_filter master;  adolfo_filter master_useful;  receivers_from_me getters;  senders_to_me givers;  int curkey;  working_set working_file;  working_set parent_file;  working_set parallel_file;  smooth_filter smooth_bandwidth;  timer refresh BULLET_REFRESH_INTERVAL;  timer decision BULLET_DECISION_INTERVAL;  timer printer BULLET_PRINTER_INTERVAL;  timer sending;  ransub_parent myparent;  ransub_children mychildren;  int sequence;  int total_received;  int total_expected;  int population;  int descendants;  int collect_expired;  int collect_missing;  candidate_set<cand_bullet_summary_ticket> curset;  timer ransub;  //char * cached_msg;}	  transitions {  init API init {    curkey = 0;    sequence = 0;    total_received = 0;    collect_missing = 0;    //cached_msg = (char *) malloc(parameters.getint("data_packet_size"));	      state_change(joined);    timer_resched(decision, BULLET_DECISION_INTERVAL);    timer_resched(printer, BULLET_PRINTER_INTERVAL);    if ( source_ == me )    {      neighbor_add (myparent, 0);      replay_experiment();      timer_resched(ransub, BULLET_START_RANSUB);	    }    else    {      replay_init();      timer_resched(refresh, BULLET_REFRESH_INTERVAL);    }  }// ---------------------------------------------- // refresh// ----------------------------------------------   joined timer refresh {    int mysize;    unsigned char *my_filter = (unsigned char*)working_file.export_digest(mysize);//      if (my_filter->tab_size != BLOOM_SIZE) {//        printf("Receiver: something is wrong with your filter %x with size %d\n", my_filter, my_filter->tab_size);//        exit(90);//      }    if (neighbor_size(givers)) {      int piece = (working_file.get_last() - working_file.get_earliest())/neighbor_size(givers);      sprintf(trace_buf_, "Receiver: dividing range %d (%d - %d)between %d senders\n",           working_file.get_last() - working_file.get_earliest(),          working_file.get_earliest(), working_file.get_last(),          neighbor_size(givers));       trace_print();      int starting =  working_file.get_earliest();      int ending = working_file.get_last();      int sender_num=0;      foreach_neighbor (neighbor_senders_to_me*, givuh, givers) {	int modval = (sender_num+sequence) % neighbor_size(givers);	sprintf(trace_buf_, "Filter: sending to %x mod %d max %d low %d high %d\n", givuh->ipaddr, modval, neighbor_size(givers), starting, ending);	trace_print();	int bullet_strategy = parameters.getint("bullet_strategy");	if (bullet_strategy == BULLET_STRAT_STREAMTOALL) {#if USE_BLOOM 	  route_update_sender(givuh->ipaddr, starting, ending, 0, 1, master_useful.get_value(), givuh->useful.get_value(), *(flat_bloom*)my_filter, 0, 0, -1); #else	  route_update_sender(givuh->ipaddr, starting, ending, 0, 1, master_useful.get_value(), givuh->useful.get_value(), *(flat_bitmap*)my_filter, 0, 0, -1); #endif	}	else {#if USE_BLOOM 	  route_update_sender(givuh->ipaddr, starting, ending, modval, neighbor_size(givers), master_useful.get_value(), givuh->useful.get_value(), *(flat_bloom*)my_filter, 0, 0, -1);#else	  route_update_sender(givuh->ipaddr, starting, ending, modval, neighbor_size(givers), master_useful.get_value(), givuh->useful.get_value(), *(flat_bitmap*)my_filter, 0, 0, -1);#endif	} 	sender_num++;      }    }    if (neighbor_size(myparent)) {      neighbor_ransub_parent *papa = neighbor_random(myparent);#if USE_BLOOM       route_update_parent(papa->ipaddr, *(flat_bloom*)my_filter, 0, 0, -1);#else      route_update_parent(papa->ipaddr, *(flat_bitmap*)my_filter, 0, 0, -1);#endif          }    unsigned char *temp = (unsigned char *)my_filter;    delete [] temp;  }  // ---------------------------------------------- // update_sender// ----------------------------------------------   joined recv update_sender {    // delivers a bloom filter to start sending data        neighbor_receivers_from_me *guy;    double application_spacing = (double)(parameters.getint("data_packet_size")) *8/(1000.0*(double) parameters.getint("streaming_rate"));    if (neighbor_size(getters)==0)  // adding the first guy, start sender timer      timer_resched(sending, application_spacing);    if (!neighbor_query (getters, from)) {      if (!neighbor_space(getters)) {	route_request_denied(from, 0, 0, -1);	return;      }      else {        neighbor_add (getters, from);        neighbor_receivers_from_me *gettuh = neighbor_entry(getters, from);        gettuh->start_time = curtime;        sprintf(trace_buf_, "Sender: Added %x as a receiver\n", from);        trace_print();      }     }    guy = neighbor_entry (getters, from);    guy->last_refresh = curtime;    guy->sent_this_refresh = 0;    guy->reported_total_bandwidth = field(total_bandwidth);    guy->reported_peer_bandwidth = field(peer_bandwidth);    guy->recipient_digest.import( (unsigned char *)&(field(filter)));	    guy->keys.clear();    parent_file.get_modulo_keys(guy->keys, (digest &)guy->recipient_digest, field(low_key), field(high_key), field(mod), field(mod_max) );    parallel_file.get_modulo_keys(guy->keys, (digest &)guy->recipient_digest, field(low_key), field(high_key), field(mod), field(mod_max));        sprintf(trace_buf_, "Filter: updated for %x keys %d, low %d high %d mod %d max %d\n", from, guy->keys.size(), field(low_key), field(high_key), field(mod), field(mod_max));    trace_print();  }  joined recv update_parent {    neighbor_ransub_children *kid;    if (neighbor_query (mychildren, from))    {      kid = neighbor_entry(mychildren, from);      kid->kid_digest.import( (unsigned char *)&(field(filter)));	    }  }  joined recv request_denied {    if (neighbor_query(givers, from)) {      sprintf(trace_buf_, "Receiver: sender %.8x denied my request\n", from);      trace_print();      neighbor_remove (givers, from);    }  }  joined recv remove {    // request for this node to stop sending data        //    printf("Sender: got remove request from %.8x\n", from);    if (neighbor_query (getters, from)) {      //      printf("Sender: remove done from %.8x\n", from);      neighbor_remove (getters, from);    }  }  joined recv ask {    // asks for a specific key, not yet implemented  }// ---------------------------------------------- // sending// ----------------------------------------------   joined timer sending {    int should_have_sent;    if (neighbor_size(getters)!=0) {      foreach_neighbor (neighbor_receivers_from_me*, getta, getters) {	if (getta->keys.size()) {	  sprintf(trace_buf_,"need to send %x keys %d\n", getta->ipaddr, getta->keys.size());	  cut_trace();	  if (curtime < getta->last_refresh + BULLET_REFRESH_INTERVAL+1.0) {	    // allow the update to be late up to 1 second, no more	    double sendtime = min(curtime-getta->last_refresh, (double)BULLET_REFRESH_INTERVAL);	    double prev_sendtime = sendtime;	    should_have_sent = (int)ceil((sendtime * (getta->keys.size() + getta->sent_this_refresh)) / (double)BULLET_REFRESH_INTERVAL);	    if (getta->sent_this_refresh > should_have_sent)	      continue;	    int num_send = should_have_sent - getta->sent_this_refresh;	    num_send = min(num_send, 20);  // send up to 20 since that is the queue limit anyway	    int trash;	    sprintf(trace_buf_, "Sender: for %.8x keys %d, sent %d should have %d will send %d last %f\n", getta->ipaddr, getta->keys.size(), getta->sent_this_refresh, should_have_sent, num_send, getta->last_refresh);	    cut_trace();	    while (num_send > 0 && getta->keys.size()>0) {	      list< int>::iterator traverse = getta->keys.begin();	      int value = *traverse; 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -