⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bullet.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
	      if (!getta->recipient_digest.contains(value)) {		int mysize;		unsigned char *data_msg = working_file.get_message(value, mysize);		if (data_msg) 		  {		    route_peer_data( getta->ipaddr, 				     COMM_TYPE_MULTICAST, 				     value, data_msg, 				     mysize, -1);   		    delete [] data_msg;		  }		else		  {		    getta->keys.pop_front();		    continue;//  		    route_peer_data( getta->ipaddr, //  				     COMM_TYPE_MULTICAST, //  				     value, cached_msg, //  				     parameters.getint("data_packet_size"), -1);   		  }		if (!macedon_sendret) {		  sprintf(trace_buf_, "sent %d to %.8x\n", value, getta->ipaddr);		  cut_trace();		  getta->recipient_digest.insert(value);		  getta->keys.pop_front();		}		else {		  sprintf(trace_buf_, "failed sent %d to %.8x\n", value, getta->ipaddr);		  cut_trace();		}		getta->sent_this_refresh++;	      }	      else		{		  sprintf(trace_buf_, "duplicate to send %d to %.8x\n", value, getta->ipaddr);		  cut_trace();		}	      num_send--;	    }	    //	    if (getta->keys.size()) {	    //     list< int>::iterator traverse = getta->keys.begin();	    //   int value = *traverse; 	      	    //    sprintf(trace_buf_, "Sender: left keys %d, top %d\n", getta->keys.size(), value);	    //    trace_print();	    //  }	  }	}      }      double application_spacing = (double)(parameters.getint("data_packet_size")) *8/(1000.0*(double) parameters.getint("streaming_rate"));      timer_resched(sending, application_spacing);    }  }  joined recv papa_data {    if (field(comm_type) != COMM_TYPE_MULTICAST) {      upcall_deliver( msg, size, field(comm_type));      return;    }    if (!neighbor_query(myparent,from))      return;    got_from = from;    got_msg = msg;    got_size = size;    got_key = field(key_seq);    got_type = BULLET_DATA_PARENT;    joined_got_multi_data();  }  joined recv peer_data {    if (field(comm_type) != COMM_TYPE_MULTICAST) {      upcall_deliver( msg, size, field(comm_type));      return;    }    //     if (!neighbor_query(myparent,from) &&    // 	!neighbor_query(givers, from))    //       return;    got_from = from;    got_msg = msg;    got_size = size;    got_key = field(key_seq);    got_type = BULLET_DATA_PARALLEL;    joined_got_multi_data();  }  joined timer decision {    //    working_file.dump_stats();    //    parent_file.dump_stats();    //    parallel_file.dump_stats();    foreach_neighbor (neighbor_ransub_children*, kid, mychildren ) {      double stream_time = parameters.getdouble("streaming_time");      if ( ( stream_time == -1.0 ||            curtime > time_booted + stream_time) ) {        sprintf(trace_buf_, "strat: %x dens %f repr %d fctr %f sent %d not %d gas %d\n", kid->ipaddr, kid->density, kid->represents, kid->bandwidth_factor, kid->sent, kid->not_sent, kid->gas_sent);        trace_print();      }      kid->sent = 0;      kid->gas_sent = 0;      kid->not_sent = 0;    }  }  joined timer printer {    double parent_bandwidth = 0.0;    double useful_parent_bandwidth = 0.0;    double total_bandwidth = 0.0;    double useful_bandwidth = 0.0;    foreach_neighbor (neighbor_senders_to_me *, givuh, givers) {      total_bandwidth+= givuh->arrivals.get_value();      useful_bandwidth+=givuh->useful.get_value();    }         foreach_neighbor (neighbor_ransub_parent *, papa, myparent) {      parent_bandwidth = papa->arrivals.get_value();      useful_parent_bandwidth = papa->useful.get_value();      total_bandwidth += papa->arrivals.get_value();      useful_bandwidth += papa->useful.get_value();    }    total_bandwidth = master.get_value();    useful_bandwidth = master_useful.get_value();    smooth_bandwidth.update( useful_bandwidth );    double stream_time = parameters.getdouble("streaming_time");    if ( ( stream_time == -1.0 ||          curtime > time_booted + stream_time) )      printf("%s %f %d REPLAY_BULLET_BANDWIDTH %d %d %d %d %d %d\n",           get_hostname(), Scheduler::instance().clock(), pthread_self(),          (int) parent_bandwidth,          0,             (int) useful_parent_bandwidth,          (int) total_bandwidth,          (int) useful_bandwidth,          (int) smooth_bandwidth.get_value()          );  }// ---------------------------------------------- // collect// ----------------------------------------------   joined API collect { // send the data to our parent in the tree    sprintf(trace_buf_, "API collect\n");        cut_trace();    neighbor_ransub_parent *mypa = neighbor_random (myparent);    if (mypa) {      sprintf(trace_buf_, "API collect to %x\n", mypa->ipaddr);          cut_trace();            // Only forward it if I have a parent      route_collect_data (mypa->ipaddr, mypa->ipaddr, me, COMM_TYPE_COLLECT, transport, msg, size, transport);    }    return_code = 0;  }  joined recv collect_data {    sprintf(trace_buf_, "recv collect_data type %d size %d from %x at %x\n", field(comm_type), size,from ,me);        cut_trace();    // check to see if we should process this data    int should_forward = upcall_forward(0, msg, size, field(comm_type));    if (should_forward)      return; // Leave if upper layer says not to take this msg    upcall_deliver( msg, size, field(comm_type));    neighbor_ransub_parent *mypa = neighbor_random (myparent);    if (mypa)       {      sprintf(trace_buf_, "routing collect to %x\n", mypa->ipaddr);          cut_trace();            route_collect_data (mypa->ipaddr, mypa->ipaddr, me, COMM_TYPE_COLLECT, field(priority), msg, size, field(priority));      }  }      // ---------------------------------------------- // multicast// ----------------------------------------------   API multicast {    curkey ++;    sprintf(trace_buf_, "Sender: root sending key %d\n", curkey);        cut_trace();    got_from = 0;    got_msg = msg;    got_size = size;    got_key = curkey;    got_type = BULLET_DATA_PARENT;    multicast_success_code = 1;    joined_got_multi_data();    return_code = multicast_success_code;  }  // ---------------------------------------------- // ransub// ----------------------------------------------   joined timer ransub {    // This code times out ransub, for failure detection    if (collect_missing)      {      collect_expired = 1;      joined_xmit_collect_if_need();      collect_expired = 0;    }    if (source_ == me) {      if (collect_missing) {        foreach_neighbor(neighbor_ransub_children *, kid, mychildren) {          if (kid->seq != sequence)          {            sprintf(trace_buf_, "RanSub: coll still out at root for %x.\n", kid->ipaddr);            trace_print();          }        }//          timer_resched(ransub, BULLET_SHORT_RANSUB);	//          return;      }      bullet_summary_ticket* myticket = new bullet_summary_ticket(working_set::UNIVERSE_SIZE);      * myticket = working_file.get_sketch();      myticket->st.address = me;      sequence++;      population = descendants + 1;      collect_missing=1;      sprintf (trace_buf_, "RanSub: root dist seq %d, population %d\n", sequence, population);      trace_print();      foreach_neighbor(neighbor_ransub_children *, kid1, mychildren)	{	  candidate_set<cand_bullet_summary_ticket>  tosend(BULLET_MAX_CANDS);	  if ( neighbor_space(getters) && source_ != me )  	    tosend.addj(myticket->st);	  int sofar=1;	  foreach_neighbor(neighbor_ransub_children *, kid2, mychildren) 	    {	      if (kid1->ipaddr != kid2->ipaddr) {		tosend.compact(kid2->gathered, kid2->represents, sofar);		sofar+=kid2->represents;	      }	    }	  kid1->seq = sequence -1;	  sprintf(trace_buf_, "RanSub: root send dist seq %d to %x.\n", sequence, kid1->ipaddr);	  trace_print();	  route_distribute(kid1->ipaddr, sequence, population, tosend, 0, 0, -1);	}      timer_resched(ransub, BULLET_NORMAL_RANSUB);      delete myticket;    }  }// ---------------------------------------------- // distribute// ----------------------------------------------     joined recv distribute {    if (!neighbor_query(myparent, from)) {      sprintf(trace_buf_, "RanSub: dist from wrong parent %.8x seq %d.\n", from, field(sequence));      return;    }    sprintf(trace_buf_, "RanSub: dist from %.8x seq %d, expect %d.\n", from, field(sequence), sequence+1);    trace_print();    bullet_summary_ticket* myticket = new bullet_summary_ticket(working_set::UNIVERSE_SIZE);    * myticket = working_file.get_sketch();    myticket->st.address = me;    if (sequence == 0)      sequence = field(sequence) - 1;    if (field(sequence) > sequence)    {      sequence = field(sequence);      curset = field(mydistribute);      //	curset.printem(me);      population = field(population);      // 	if (source_ != me) {      // 	  timer_resched(ransub, BULLET_NORMAL_RANSUB-1);      // 	}      // first do the bullet stuff      int best=0;      double best_resemblance=1.1; // force picking at least one      double resemblance;      neighbor_ransub_parent *mypa;            for (int i = 0; i < curset.number_candidates ;i++ )	{	  bullet_summary_ticket* source_ticket = new bullet_summary_ticket(working_set::UNIVERSE_SIZE);	  source_ticket->st = curset.candidates[i];	  resemblance = working_file.compute_resemblance(*source_ticket);	  sprintf(trace_buf_, "Receiver: resemblance with %.8x is %f\n", source_ticket->get_address(), resemblance);	  cut_trace();	  mypa = neighbor_random(myparent);	  if (source_ticket->get_address()!=me &&	      source_ticket->get_address()!=mypa->ipaddr && 	      source_ticket->get_address() != source_ &&	      !neighbor_query (givers, source_ticket->get_address()) &&	      // this guy is already a sender to me	      best_resemblance>resemblance )	    {	      best_resemblance=resemblance;	      best=source_ticket->get_address();	    }	  delete source_ticket;	}            any_close_peers();            // send the request if we found someone good and we need more bandwidth      if (best_resemblance!=1.1 && 	  smooth_bandwidth.get_value() > 0 &&	  smooth_bandwidth.get_value() < 	  (double)parameters.getint("streaming_rate")*1000 &&	  best != 0 &&	  neighbor_space(givers)) {	sprintf(trace_buf_, "Receiver: Added %.8x as sender, bw %f, seq %d.\n", best, smooth_bandwidth.get_value(), sequence);	trace_print();	neighbor_add(givers, best);	neighbor_senders_to_me *givuh = neighbor_entry(givers, best);	givuh->start_time = curtime;	int mysize;	unsigned char *my_filter = (unsigned char *)working_file.export_digest(mysize);#if USE_BLOOM 	  route_update_sender(best, 0, 0, 0, 0, master_useful.get_value(), 0.0, *(flat_bloom*)my_filter, 0, 0, -1); #else	  route_update_sender(best, 0, 0, 0, 0, master_useful.get_value(), 0.0, *(flat_bitmap*)my_filter, 0, 0, -1); #endif	delete [] my_filter;      }      else {	sprintf(trace_buf_, "Receiver: either my bandwidth %f is zero or close to the target %d or I have noone to select from.\n", smooth_bandwidth.get_value(), parameters.getint("streaming_rate")*1000);	cut_trace();      }      // now the ransub processing      if (neighbor_size(mychildren) == 0)	{	  candidate_set<cand_bullet_summary_ticket>  tosend(BULLET_MAX_CANDS);	  if ( neighbor_space(getters)  && source_ != me  )	    tosend.addj(myticket->st);		  neighbor_ransub_parent *papa = neighbor_random(myparent);	  sprintf(trace_buf_, "RanSub: send coll seq %d to %x.\n", sequence, papa->ipaddr);	  trace_print();	  route_collect(papa->ipaddr, sequence, 1, tosend, 0, 0, -1);	}      else	{       	  collect_missing = 1;	  foreach_neighbor(neighbor_ransub_children *, kid1, mychildren) 	    {	      candidate_set<cand_bullet_summary_ticket>  tosend(BULLET_MAX_CANDS);	      if ( neighbor_space(getters)  && source_ != me )		tosend.addj(myticket->st);		      tosend.compact(curset, population-descendants-1, 1);	      int sofar = population-descendants;	      foreach_neighbor(neighbor_ransub_children *, kid2, mychildren) 		{		  if (kid1->ipaddr != kid2->ipaddr) {		    tosend.compact(kid2->gathered, kid2->represents, sofar);		    sofar+=kid2->represents;		  }		}	      kid1->seq = sequence -1;	      sprintf(trace_buf_, "RanSub: send dist seq %d to %x.\n", sequence, kid1->ipaddr);	      trace_print();	      route_distribute(kid1->ipaddr, sequence, population, tosend, 0, 0, -1);      	    }	  }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -