📄 bullet.mac
字号:
if (!getta->recipient_digest.contains(value)) { int mysize; unsigned char *data_msg = working_file.get_message(value, mysize); if (data_msg) { route_peer_data( getta->ipaddr, COMM_TYPE_MULTICAST, value, data_msg, mysize, -1); delete [] data_msg; } else { getta->keys.pop_front(); continue;// route_peer_data( getta->ipaddr, // COMM_TYPE_MULTICAST, // value, cached_msg, // parameters.getint("data_packet_size"), -1); } if (!macedon_sendret) { sprintf(trace_buf_, "sent %d to %.8x\n", value, getta->ipaddr); cut_trace(); getta->recipient_digest.insert(value); getta->keys.pop_front(); } else { sprintf(trace_buf_, "failed sent %d to %.8x\n", value, getta->ipaddr); cut_trace(); } getta->sent_this_refresh++; } else { sprintf(trace_buf_, "duplicate to send %d to %.8x\n", value, getta->ipaddr); cut_trace(); } num_send--; } // if (getta->keys.size()) { // list< int>::iterator traverse = getta->keys.begin(); // int value = *traverse; // sprintf(trace_buf_, "Sender: left keys %d, top %d\n", getta->keys.size(), value); // trace_print(); // } } } } double application_spacing = (double)(parameters.getint("data_packet_size")) *8/(1000.0*(double) parameters.getint("streaming_rate")); timer_resched(sending, application_spacing); } } joined recv papa_data { if (field(comm_type) != COMM_TYPE_MULTICAST) { upcall_deliver( msg, size, field(comm_type)); return; } if (!neighbor_query(myparent,from)) return; got_from = from; got_msg = msg; got_size = size; got_key = field(key_seq); got_type = BULLET_DATA_PARENT; joined_got_multi_data(); } joined recv peer_data { if (field(comm_type) != COMM_TYPE_MULTICAST) { upcall_deliver( msg, size, field(comm_type)); return; } // if (!neighbor_query(myparent,from) && // !neighbor_query(givers, from)) // return; got_from = from; got_msg = msg; got_size = size; got_key = field(key_seq); got_type = BULLET_DATA_PARALLEL; joined_got_multi_data(); } joined timer decision { // working_file.dump_stats(); // parent_file.dump_stats(); // parallel_file.dump_stats(); foreach_neighbor (neighbor_ransub_children*, kid, mychildren ) { double stream_time = parameters.getdouble("streaming_time"); if ( ( stream_time == -1.0 || curtime > time_booted + stream_time) ) { sprintf(trace_buf_, "strat: %x dens %f repr %d fctr %f sent %d not %d gas %d\n", kid->ipaddr, kid->density, kid->represents, kid->bandwidth_factor, kid->sent, kid->not_sent, kid->gas_sent); trace_print(); } kid->sent = 0; kid->gas_sent = 0; kid->not_sent = 0; } } joined timer printer { double parent_bandwidth = 0.0; double useful_parent_bandwidth = 0.0; double total_bandwidth = 0.0; double useful_bandwidth = 0.0; foreach_neighbor (neighbor_senders_to_me *, givuh, givers) { total_bandwidth+= givuh->arrivals.get_value(); useful_bandwidth+=givuh->useful.get_value(); } foreach_neighbor (neighbor_ransub_parent *, papa, myparent) { parent_bandwidth = papa->arrivals.get_value(); useful_parent_bandwidth = papa->useful.get_value(); total_bandwidth += papa->arrivals.get_value(); useful_bandwidth += papa->useful.get_value(); } total_bandwidth = master.get_value(); useful_bandwidth = master_useful.get_value(); smooth_bandwidth.update( useful_bandwidth ); double stream_time = parameters.getdouble("streaming_time"); if ( ( stream_time == -1.0 || curtime > time_booted + stream_time) ) printf("%s %f %d REPLAY_BULLET_BANDWIDTH %d %d %d %d %d %d\n", get_hostname(), Scheduler::instance().clock(), pthread_self(), (int) parent_bandwidth, 0, (int) useful_parent_bandwidth, (int) total_bandwidth, (int) useful_bandwidth, (int) smooth_bandwidth.get_value() ); }// ---------------------------------------------- // collect// ---------------------------------------------- joined API collect { // send the data to our parent in the tree sprintf(trace_buf_, "API collect\n"); cut_trace(); neighbor_ransub_parent *mypa = neighbor_random (myparent); if (mypa) { sprintf(trace_buf_, "API collect to %x\n", mypa->ipaddr); cut_trace(); // Only forward it if I have a parent route_collect_data (mypa->ipaddr, mypa->ipaddr, me, COMM_TYPE_COLLECT, transport, msg, size, transport); } return_code = 0; } joined recv collect_data { sprintf(trace_buf_, "recv collect_data type %d size %d from %x at %x\n", field(comm_type), size,from ,me); cut_trace(); // check to see if we should process this data int should_forward = upcall_forward(0, msg, size, field(comm_type)); if (should_forward) return; // Leave if upper layer says not to take this msg upcall_deliver( msg, size, field(comm_type)); neighbor_ransub_parent *mypa = neighbor_random (myparent); if (mypa) { sprintf(trace_buf_, "routing collect to %x\n", mypa->ipaddr); cut_trace(); route_collect_data (mypa->ipaddr, mypa->ipaddr, me, COMM_TYPE_COLLECT, field(priority), msg, size, field(priority)); } } // ---------------------------------------------- // multicast// ---------------------------------------------- API multicast { curkey ++; sprintf(trace_buf_, "Sender: root sending key %d\n", curkey); cut_trace(); got_from = 0; got_msg = msg; got_size = size; got_key = curkey; got_type = BULLET_DATA_PARENT; multicast_success_code = 1; joined_got_multi_data(); return_code = multicast_success_code; } // ---------------------------------------------- // ransub// ---------------------------------------------- joined timer ransub { // This code times out ransub, for failure detection if (collect_missing) { collect_expired = 1; joined_xmit_collect_if_need(); collect_expired = 0; } if (source_ == me) { if (collect_missing) { foreach_neighbor(neighbor_ransub_children *, kid, mychildren) { if (kid->seq != sequence) { sprintf(trace_buf_, "RanSub: coll still out at root for %x.\n", kid->ipaddr); trace_print(); } }// timer_resched(ransub, BULLET_SHORT_RANSUB); // return; } bullet_summary_ticket* myticket = new bullet_summary_ticket(working_set::UNIVERSE_SIZE); * myticket = working_file.get_sketch(); myticket->st.address = me; sequence++; population = descendants + 1; collect_missing=1; sprintf (trace_buf_, "RanSub: root dist seq %d, population %d\n", sequence, population); trace_print(); foreach_neighbor(neighbor_ransub_children *, kid1, mychildren) { candidate_set<cand_bullet_summary_ticket> tosend(BULLET_MAX_CANDS); if ( neighbor_space(getters) && source_ != me ) tosend.addj(myticket->st); int sofar=1; foreach_neighbor(neighbor_ransub_children *, kid2, mychildren) { if (kid1->ipaddr != kid2->ipaddr) { tosend.compact(kid2->gathered, kid2->represents, sofar); sofar+=kid2->represents; } } kid1->seq = sequence -1; sprintf(trace_buf_, "RanSub: root send dist seq %d to %x.\n", sequence, kid1->ipaddr); trace_print(); route_distribute(kid1->ipaddr, sequence, population, tosend, 0, 0, -1); } timer_resched(ransub, BULLET_NORMAL_RANSUB); delete myticket; } }// ---------------------------------------------- // distribute// ---------------------------------------------- joined recv distribute { if (!neighbor_query(myparent, from)) { sprintf(trace_buf_, "RanSub: dist from wrong parent %.8x seq %d.\n", from, field(sequence)); return; } sprintf(trace_buf_, "RanSub: dist from %.8x seq %d, expect %d.\n", from, field(sequence), sequence+1); trace_print(); bullet_summary_ticket* myticket = new bullet_summary_ticket(working_set::UNIVERSE_SIZE); * myticket = working_file.get_sketch(); myticket->st.address = me; if (sequence == 0) sequence = field(sequence) - 1; if (field(sequence) > sequence) { sequence = field(sequence); curset = field(mydistribute); // curset.printem(me); population = field(population); // if (source_ != me) { // timer_resched(ransub, BULLET_NORMAL_RANSUB-1); // } // first do the bullet stuff int best=0; double best_resemblance=1.1; // force picking at least one double resemblance; neighbor_ransub_parent *mypa; for (int i = 0; i < curset.number_candidates ;i++ ) { bullet_summary_ticket* source_ticket = new bullet_summary_ticket(working_set::UNIVERSE_SIZE); source_ticket->st = curset.candidates[i]; resemblance = working_file.compute_resemblance(*source_ticket); sprintf(trace_buf_, "Receiver: resemblance with %.8x is %f\n", source_ticket->get_address(), resemblance); cut_trace(); mypa = neighbor_random(myparent); if (source_ticket->get_address()!=me && source_ticket->get_address()!=mypa->ipaddr && source_ticket->get_address() != source_ && !neighbor_query (givers, source_ticket->get_address()) && // this guy is already a sender to me best_resemblance>resemblance ) { best_resemblance=resemblance; best=source_ticket->get_address(); } delete source_ticket; } any_close_peers(); // send the request if we found someone good and we need more bandwidth if (best_resemblance!=1.1 && smooth_bandwidth.get_value() > 0 && smooth_bandwidth.get_value() < (double)parameters.getint("streaming_rate")*1000 && best != 0 && neighbor_space(givers)) { sprintf(trace_buf_, "Receiver: Added %.8x as sender, bw %f, seq %d.\n", best, smooth_bandwidth.get_value(), sequence); trace_print(); neighbor_add(givers, best); neighbor_senders_to_me *givuh = neighbor_entry(givers, best); givuh->start_time = curtime; int mysize; unsigned char *my_filter = (unsigned char *)working_file.export_digest(mysize);#if USE_BLOOM route_update_sender(best, 0, 0, 0, 0, master_useful.get_value(), 0.0, *(flat_bloom*)my_filter, 0, 0, -1); #else route_update_sender(best, 0, 0, 0, 0, master_useful.get_value(), 0.0, *(flat_bitmap*)my_filter, 0, 0, -1); #endif delete [] my_filter; } else { sprintf(trace_buf_, "Receiver: either my bandwidth %f is zero or close to the target %d or I have noone to select from.\n", smooth_bandwidth.get_value(), parameters.getint("streaming_rate")*1000); cut_trace(); } // now the ransub processing if (neighbor_size(mychildren) == 0) { candidate_set<cand_bullet_summary_ticket> tosend(BULLET_MAX_CANDS); if ( neighbor_space(getters) && source_ != me ) tosend.addj(myticket->st); neighbor_ransub_parent *papa = neighbor_random(myparent); sprintf(trace_buf_, "RanSub: send coll seq %d to %x.\n", sequence, papa->ipaddr); trace_print(); route_collect(papa->ipaddr, sequence, 1, tosend, 0, 0, -1); } else { collect_missing = 1; foreach_neighbor(neighbor_ransub_children *, kid1, mychildren) { candidate_set<cand_bullet_summary_ticket> tosend(BULLET_MAX_CANDS); if ( neighbor_space(getters) && source_ != me ) tosend.addj(myticket->st); tosend.compact(curset, population-descendants-1, 1); int sofar = population-descendants; foreach_neighbor(neighbor_ransub_children *, kid2, mychildren) { if (kid1->ipaddr != kid2->ipaddr) { tosend.compact(kid2->gathered, kid2->represents, sofar); sofar+=kid2->represents; } } kid1->seq = sequence -1; sprintf(trace_buf_, "RanSub: send dist seq %d to %x.\n", sequence, kid1->ipaddr); trace_print(); route_distribute(kid1->ipaddr, sequence, population, tosend, 0, 0, -1); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -