overcast.mac
来自「这是一个著名的应用层组播中间件的源码」· MAC 代码 · 共 863 行 · 第 1/2 页
MAC
863 行
if (pops->start_probing_time == 0.0) pops->start_probing_time = curtime; //debug_macro("Probe: got seq %d need %d more from parent %.8x\n", field(sequence), pops->probes_to_get, from); } else if (neighbor_query(grandpa, from)) { neighbor_oparent *gramps = neighbor_entry(grandpa, from); gramps->probes_to_get --; gramps->probing_time = curtime; if (gramps->start_probing_time == 0.0) gramps->start_probing_time = curtime; //debug_macro("Probe: got seq %d need %d more from parent %.8x\n", field(sequence), gramps->probes_to_get, from); } any_ck_probes_finished(); } probed timer stop_probe { probed_probes_completed(); } probing recv probe_done { if (from != probed_node) return; timer_cancel (keep_probing); debug_macro("Probe: unlocked by %.8x with %d left\n", from, probes_to_send); state_change (joined); } any timer sendinfo { double hold_time = parameters.getdouble("hold_time"); if (hold_time == -1.0 || curtime > time_booted + hold_time) notify_on = 1; foreach_neighbor (neighbor_ochildren *, afriend, kids) { if (drand48() < 0.5) { route_parent_info(afriend->ipaddr, papa, kids, height+1, notify_on, num_siblings, 0, 0, -1); } } } joined recv parent_info { // always probes parent and grandpa (if it exists), as well as OVERCAST_SIBLINGS_TO_PROBE siblings int people_to_probe = 1; // at least have to probe parent double hold_time = parameters.getdouble("hold_time"); if ( ( hold_time != -1.0 && curtime > time_booted + hold_time) ) return; if (!neighbor_size(papa) || !neighbor_query(papa, from)) return; num_siblings = neighbor_size(field(brothers)); if ( drand48() * (double)neighbor_size(field(brothers)) > 1.0) return; height = field(height); int prev_notify = notify_on; notify_on = field(enable_notify); if (notify_on == 1 && prev_notify == 0) { upcall_notify(kids, NBR_TYPE_CHILDREN); if (source_ != me) { upcall_notify(papa, NBR_TYPE_PARENT); } } neighbor_oparent *pops = neighbor_random(papa); neighbor_clear (brothers); foreach_neighbor (neighbor_ochildren*, b, field(brothers)) { if (b->ipaddr != me ) { people_to_probe++; neighbor_add (brothers, b->ipaddr); neighbor_ochildren *dude = neighbor_entry(brothers, b->ipaddr); dude->probes_to_get = OVERCAST_NUM_PROBES; } } while (neighbor_size(brothers) > OVERCAST_SIBLINGS_TO_PROBE) { // can't probe too many people neighbor_ochildren *eviltwin = neighbor_random (brothers); people_to_probe--; neighbor_remove (brothers, eviltwin->ipaddr); } neighbor_clear(grandpa); if ( drand48() * (double)field(num_uncles) <= 1.0) { if (neighbor_size(field(daddy))) { people_to_probe++; neighbor_oparent *abuelo = neighbor_random(field(daddy)); neighbor_add(grandpa, abuelo->ipaddr); neighbor_oparent *gramps = neighbor_random(grandpa); gramps->probes_to_get = OVERCAST_NUM_PROBES; } } if (people_to_probe <= 1) // no point in probing anyone if its just me and my papa return; state_change (probed); timer_cancel(stop_probe); timer_resched(stop_probe, OVERCAST_STOP_PROBE); debug_macro("Probe: ask parent %.8x\n", pops->ipaddr); pops->start_probing_time = 0.0; pops->probes_to_get = OVERCAST_NUM_PROBES; pops->lock_requested = 1; route_probe_request ( pops->ipaddr, pops->ipaddr, ++probe_synch, 0.0, 0, 0, -1); probe_skip = ((double)OVERCAST_STOP_PROBE) / ((double)people_to_probe+1 ); timer_resched(probe_requester, probe_skip); } probed timer probe_requester { probed_request_more_probes(); } any recv data { master.update(); int nexthop = me; int transport = field(transport); int comm_type = field(comm_type); if(comm_type == COMM_TYPE_COLLECT) { if(!neighbor_empty(papa)) { neighbor_oparent *mypa = neighbor_random (papa); nexthop = mypa->ipaddr; } } if(upcall_forward( nexthop, msg, size, comm_type) == 0) { if(comm_type != COMM_TYPE_COLLECT) { master_useful.update(); upcall_deliver( msg, size, comm_type); } if (comm_type == COMM_TYPE_MULTICAST) { foreach_neighbor (neighbor_ochildren*, afriend, kids ) { route_data( afriend->ipaddr, afriend->ipaddr, me, COMM_TYPE_MULTICAST, transport, msg, size, transport); } } else if(comm_type == COMM_TYPE_COLLECT) { if(nexthop != me) { route_data(nexthop, nexthop, me, COMM_TYPE_COLLECT, transport, msg, size, transport); } else { master_useful.update(); upcall_deliver( msg, size, comm_type); } } } } API create_group { } API join { } API leave { } API route { route_data(dest, dest, me, COMM_TYPE_UNICAST, transport, msg, size, transport); return_code = macedon_sendret; } API multicast { foreach_neighbor (neighbor_ochildren*, afriend, kids) { route_data(afriend->ipaddr, afriend->ipaddr, me, COMM_TYPE_MULTICAST, transport, msg, size, transport); } } API collect { neighbor_oparent *mypa = neighbor_random (papa); route_data(mypa->ipaddr, mypa->ipaddr, me, COMM_TYPE_COLLECT, transport, msg, size, transport) } //Timer: printer // //prints REPLAY BANDWIDTH //prints info for tree replay timer printer { if(neighbor_size(papa) > 0) { neighbor_oparent *mypa = neighbor_random (papa); debug_macro("REPLAY session: %.8x, parent: %.8x.\n",0xABCD,mypa->ipaddr); } //FIXME: Should I be using globalmacedon? extern MACEDON_Agent *globalmacedon; if (globalmacedon != this) // am i the highest layer return; if ( ( parameters.getint("streaming_time") == -1.0 || curtime > time_booted + parameters.getint("streaming_time")) ) printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", get_hostname(), Scheduler::instance().clock(), pthread_self(), (int) master.get_value(), 0, // need to put in control bw (int) master_useful.get_value(), (int) master.get_value(), (int) master_useful.get_value(), 0 ); }}routines { void any_unlock_nodes() { neighbor_oparent *pops = neighbor_random (papa); if (pops->lock_requested) { debug_macro( "Probe: unlocking parent %.8x\n", pops->ipaddr); route_probe_done(pops->ipaddr, 0, 0, -1); } if (neighbor_size(grandpa)) { neighbor_oparent *gramps = neighbor_random(grandpa); if (gramps->lock_requested) { debug_macro("Probe: unlocking grandparent %.8x\n", gramps->ipaddr); route_probe_done(gramps->ipaddr, 0, 0, -1); } } foreach_neighbor (neighbor_ochildren*, hermano, brothers) { if (hermano->lock_requested) { debug_macro("Probe: unlocking sibling %.8x\n", hermano->ipaddr); route_probe_done(hermano->ipaddr, 0, 0, -1); } } } void probing_send_some_probes() { int garbage[OVERCAST_PAYLOAD]; int trash; while (probes_to_send) { if (OVERCAST_NUM_PROBES+OVERCAST_TRASH_PROBES-probes_to_send <= OVERCAST_TRASH_PROBES) trash = 1; else trash = 0; route_probe(probed_node, OVERCAST_NUM_PROBES-probes_to_send, probe_synch, trash, garbage, OVERCAST_PAYLOAD, -1); if (macedon_sendret) { // send failed double application_spacing = (double)(parameters.getint("data_packet_size")) *8/(1000.0*(double) parameters.getint("streaming_rate")); timer_resched (keep_probing, application_spacing); break; } else probes_to_send--; } } void any_ck_probes_finished() { int finished=1; neighbor_oparent *pops = neighbor_random (papa); if (OVERCAST_NUM_PROBES-pops->probes_to_get < OVERCAST_MIN_PROBES) { finished=0; } if (neighbor_size(grandpa)) { neighbor_oparent *gramps = neighbor_random(grandpa); if (OVERCAST_NUM_PROBES-gramps->probes_to_get < OVERCAST_MIN_PROBES) { finished=0; } } foreach_neighbor (neighbor_ochildren*, hermano, brothers) { if (OVERCAST_NUM_PROBES-hermano->probes_to_get < OVERCAST_MIN_PROBES) { finished=0; } } if (finished) { timer_cancel (stop_probe); probed_probes_completed(); } } void probed_probes_completed() { printf("probed_probes_completed();\n"); int abort=0; neighbor_oparent *pops = neighbor_random(papa); if (pops->delay == 0.0 && OVERCAST_NUM_PROBES-pops->probes_to_get < OVERCAST_MIN_PROBES) { debug_macro("Probe: not enuf probes recd from parent %.8x: %d\n", pops->ipaddr, pops->probes_to_get); state_change (joined); abort=1; } if (!abort) { if (OVERCAST_NUM_PROBES-pops->probes_to_get >= OVERCAST_MIN_PROBES) { pops->delay = (pops->probing_time-pops->start_probing_time) / (OVERCAST_NUM_PROBES-pops->probes_to_get); debug_macro("REPLAY parent %.8x delay set to %f with probes %d, times %f %f\n", pops->ipaddr, pops->delay, (OVERCAST_NUM_PROBES-pops->probes_to_get), pops->probing_time, pops->start_probing_time); } if (neighbor_size(grandpa)) { // my grandfather exists neighbor_oparent *gramps = neighbor_random(grandpa); if (OVERCAST_NUM_PROBES-gramps->probes_to_get >= OVERCAST_MIN_PROBES) { gramps->delay = (gramps->probing_time-gramps->start_probing_time) / (OVERCAST_NUM_PROBES-gramps->probes_to_get); if (gramps->delay < pops->delay * OVERCAST_GOOD_ENOUGH) { debug_macro("REPLAY trying to move from %.8x with delay set %f to grandpa %.8x with delay %f\n", pops->ipaddr, pops->delay, gramps->ipaddr, gramps->delay); state_change (joining); route_add (gramps->ipaddr, 0, 0, -1); return; } else { sprintf(trace_buf_, "Probe: grandpa %.8x delay is %f which is too high comp to %f\n", gramps->ipaddr, gramps->delay, pops->delay); trace_print(); } } else { sprintf(trace_buf_, "Probe: not enuf recd from grandpa %.8x: %d\n", gramps->ipaddr, OVERCAST_NUM_PROBES-gramps->probes_to_get); trace_print(); } } int best=0;; double best_delay=0.0; foreach_neighbor (neighbor_ochildren*, hermano, brothers) { if (OVERCAST_NUM_PROBES-hermano->probes_to_get >= OVERCAST_MIN_PROBES) { hermano->delay = (hermano->probing_time-hermano->start_probing_time) / (OVERCAST_NUM_PROBES-hermano->probes_to_get); if (hermano->delay < pops->delay / OVERCAST_GOOD_ENOUGH && (best == 0 || hermano->delay < best_delay) ) { best_delay = hermano->delay; best = hermano->ipaddr; sprintf(trace_buf_, "Probe: may move from %.8x with delay set %f to sibling %.8x with delay %f\n", pops->ipaddr, pops->delay, hermano->ipaddr, hermano->delay); trace_print(); } else { sprintf(trace_buf_, "REPLAY sibling %.8x delay set to %f which is too high comp to %f and %f probes %d times %f %f\n", hermano->ipaddr, hermano->delay, pops->delay, best_delay, OVERCAST_NUM_PROBES-hermano->probes_to_get, hermano->probing_time, hermano->start_probing_time); trace_print(); } } else { sprintf(trace_buf_, "Probe: not enuf recd from brother %.8x: %d\n", hermano->ipaddr, OVERCAST_NUM_PROBES-hermano->probes_to_get); trace_print(); } } if (best) { // found a sibling to move to state_change (joining); route_add (best, 0, 0, -1); return; } else { state_change (joined); } } any_unlock_nodes(); } void probed_request_more_probes() { neighbor_oparent *pops = neighbor_random(papa); if (neighbor_size(grandpa)) { neighbor_oparent *gramps = neighbor_random(grandpa); if (gramps->lock_requested == 0) { gramps->probes_to_get = OVERCAST_NUM_PROBES; gramps->start_probing_time = 0.0; gramps->lock_requested = 1; sprintf(trace_buf_, "Probe: ask grandparent %.8x\n", gramps->ipaddr); trace_print(); route_probe_request ( gramps->ipaddr, pops->ipaddr, probe_synch, 0.0, 0, 0, -1); timer_resched(probe_requester, probe_skip); return; } } foreach_neighbor (neighbor_ochildren*, hermano, brothers) { if (hermano->lock_requested == 0) { sprintf(trace_buf_, "Probe: ask sibling %.8x\n", hermano->ipaddr); trace_print(); hermano->start_probing_time = 0.0; hermano->probes_to_get = OVERCAST_NUM_PROBES; hermano->lock_requested = 1; route_probe_request ( hermano->ipaddr, pops->ipaddr, probe_synch, 0.0, 0, 0, -1); timer_resched(probe_requester, probe_skip); return; } } } void joined_read_parent_file() { struct hostent *phe; struct hostent *mhe; struct in_addr paddr; struct in_addr myaddr; FILE *myfile; int i=0; char name[200]; char parent[200]; char rest[1024]; char whole_line[1024]; char *parent_filename = parameters.getstr("parent_file"); printf("forced parent_filename: %s\n", parent_filename); forced_children=0; myfile = (FILE *)fopen(parent_filename, "r"); if (!myfile) { printf("nonexistent forced parent_file %s\n", parent_filename); return; } while (fgets(whole_line, 512, myfile)!=NULL) { int successful = 0; if ((successful =sscanf(whole_line, "%s %s %[^\n]s", &name, &parent, &rest))!= 2) { continue; } if ((mhe = gethostbyname(name)) == 0) { printf("Bad host lookup.\n"); exit(24); } memcpy(&myaddr, mhe->h_addr_list[0], sizeof(struct in_addr)); if ((phe = gethostbyname(parent)) == 0) { printf("Bad host lookup.\n"); exit(24); } memcpy(&paddr, phe->h_addr_list[0], sizeof(struct in_addr)); printf("forced parent line: %s %.8x %s %.8x\n", name, myaddr, parent, paddr); if (paddr.s_addr == me) { // i am parent forced_children++; } else if (myaddr.s_addr == me) { // i am kid forced_parent = paddr.s_addr; sprintf(trace_buf_, "forced_parent: %.8x\n", forced_parent); trace_print(); } } fclose(myfile); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?