overcast.mac

来自「这是一个著名的应用层组播中间件的源码」· MAC 代码 · 共 863 行 · 第 1/2 页

MAC
863
字号
      if (pops->start_probing_time == 0.0)        pops->start_probing_time = curtime;      //debug_macro("Probe: got seq %d need %d more from parent %.8x\n", field(sequence), pops->probes_to_get, from);    }        else if (neighbor_query(grandpa, from)) {      neighbor_oparent *gramps = neighbor_entry(grandpa, from);      gramps->probes_to_get --;      gramps->probing_time = curtime;      if (gramps->start_probing_time == 0.0)        gramps->start_probing_time = curtime;      //debug_macro("Probe: got seq %d need %d more from parent %.8x\n", field(sequence), gramps->probes_to_get, from);    }    any_ck_probes_finished();  }  probed timer stop_probe {    probed_probes_completed();  }  probing recv probe_done {    if (from != probed_node)       return;    timer_cancel (keep_probing);    debug_macro("Probe: unlocked by %.8x with %d left\n", from, probes_to_send);    state_change (joined);  }  any timer sendinfo {    double hold_time = parameters.getdouble("hold_time");    if (hold_time == -1.0 ||        curtime > time_booted + hold_time)      notify_on = 1;    foreach_neighbor (neighbor_ochildren *, afriend, kids) {      if (drand48() < 0.5) {        route_parent_info(afriend->ipaddr, papa, kids, height+1, notify_on, num_siblings, 0, 0, -1);      }    }  }  joined recv parent_info {    // always probes parent and grandpa (if it exists), as well as OVERCAST_SIBLINGS_TO_PROBE siblings    int people_to_probe = 1;  // at least have to probe parent      double hold_time = parameters.getdouble("hold_time");    if ( ( hold_time != -1.0 &&           curtime > time_booted + hold_time) )      return;    if (!neighbor_size(papa) || !neighbor_query(papa, from))       return;    num_siblings = neighbor_size(field(brothers));    if ( drand48() * (double)neighbor_size(field(brothers)) > 1.0)      return;    height = field(height);    int prev_notify = notify_on;    notify_on = field(enable_notify);    if (notify_on == 1 && prev_notify == 0) {      upcall_notify(kids, NBR_TYPE_CHILDREN);       if (source_ != me) {        upcall_notify(papa, NBR_TYPE_PARENT);       }    }    neighbor_oparent *pops = neighbor_random(papa);    neighbor_clear (brothers);    foreach_neighbor (neighbor_ochildren*, b, field(brothers)) {      if (b->ipaddr != me ) {        people_to_probe++;        neighbor_add (brothers, b->ipaddr);        neighbor_ochildren *dude = neighbor_entry(brothers, b->ipaddr);        dude->probes_to_get = OVERCAST_NUM_PROBES;      }    }    while (neighbor_size(brothers) > OVERCAST_SIBLINGS_TO_PROBE) {      // can't probe too many people      neighbor_ochildren *eviltwin = neighbor_random (brothers);      people_to_probe--;      neighbor_remove (brothers, eviltwin->ipaddr);    }    neighbor_clear(grandpa);    if ( drand48() * (double)field(num_uncles) <= 1.0) {      if (neighbor_size(field(daddy))) {        people_to_probe++;        neighbor_oparent *abuelo = neighbor_random(field(daddy));        neighbor_add(grandpa, abuelo->ipaddr);        neighbor_oparent *gramps = neighbor_random(grandpa);        gramps->probes_to_get = OVERCAST_NUM_PROBES;      }    }    if (people_to_probe <= 1)  // no point in probing anyone if its just me and my papa      return;    state_change (probed);    timer_cancel(stop_probe);    timer_resched(stop_probe, OVERCAST_STOP_PROBE);    debug_macro("Probe: ask parent %.8x\n", pops->ipaddr);    pops->start_probing_time = 0.0;    pops->probes_to_get = OVERCAST_NUM_PROBES;    pops->lock_requested = 1;    route_probe_request ( pops->ipaddr, pops->ipaddr, ++probe_synch, 0.0, 0, 0, -1);    probe_skip = ((double)OVERCAST_STOP_PROBE) / ((double)people_to_probe+1 );    timer_resched(probe_requester, probe_skip);  }  probed timer probe_requester {    probed_request_more_probes();  }  any recv data {    master.update();    int nexthop = me;    int transport = field(transport);    int comm_type = field(comm_type);    if(comm_type == COMM_TYPE_COLLECT) {        if(!neighbor_empty(papa)) {          neighbor_oparent *mypa = neighbor_random (papa);          nexthop = mypa->ipaddr;        }    }    if(upcall_forward( nexthop, msg, size, comm_type) == 0) {      if(comm_type != COMM_TYPE_COLLECT) {        master_useful.update();        upcall_deliver( msg, size, comm_type);      }       if (comm_type == COMM_TYPE_MULTICAST) {        foreach_neighbor (neighbor_ochildren*, afriend, kids ) {          route_data( afriend->ipaddr,                      afriend->ipaddr, me,                      COMM_TYPE_MULTICAST, transport, msg, size, transport);        }      } else if(comm_type == COMM_TYPE_COLLECT) {        if(nexthop != me) {          route_data(nexthop, nexthop, me, COMM_TYPE_COLLECT, transport, msg, size, transport);        } else {          master_useful.update();          upcall_deliver( msg, size, comm_type);        }      }    }  }        API create_group {  }  API join {  }  API leave {  }  API route {    route_data(dest, dest, me, COMM_TYPE_UNICAST, transport, msg, size, transport);    return_code = macedon_sendret;  }  API multicast {    foreach_neighbor (neighbor_ochildren*, afriend, kids) {      route_data(afriend->ipaddr, afriend->ipaddr, me, COMM_TYPE_MULTICAST, transport, msg, size, transport);    }  }  API collect {    neighbor_oparent *mypa = neighbor_random (papa);    route_data(mypa->ipaddr, mypa->ipaddr, me, COMM_TYPE_COLLECT, transport, msg, size, transport)  }  //Timer: printer  //  //prints REPLAY BANDWIDTH  //prints info for tree replay  timer printer {    if(neighbor_size(papa) > 0) {      neighbor_oparent *mypa = neighbor_random (papa);      debug_macro("REPLAY session: %.8x, parent: %.8x.\n",0xABCD,mypa->ipaddr);    }    //FIXME: Should I be using globalmacedon?    extern MACEDON_Agent *globalmacedon;    if (globalmacedon != this)  // am i the highest layer      return;     if ( ( parameters.getint("streaming_time") == -1.0 ||	   curtime > time_booted + parameters.getint("streaming_time")) )      printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", 	     get_hostname(), Scheduler::instance().clock(), pthread_self(),	     (int) master.get_value(),	     0,   // need to put in control bw	     (int) master_useful.get_value(),	     (int) master.get_value(),	     (int) master_useful.get_value(),	     0	     );  }}routines {  void any_unlock_nodes() {    neighbor_oparent *pops = neighbor_random (papa);    if (pops->lock_requested) {      debug_macro( "Probe: unlocking parent %.8x\n", pops->ipaddr);      route_probe_done(pops->ipaddr, 0, 0, -1);    }    if (neighbor_size(grandpa)) {      neighbor_oparent *gramps = neighbor_random(grandpa);      if (gramps->lock_requested) {        debug_macro("Probe: unlocking grandparent %.8x\n", gramps->ipaddr);        route_probe_done(gramps->ipaddr, 0, 0, -1);      }    }    foreach_neighbor (neighbor_ochildren*, hermano, brothers) {      if (hermano->lock_requested) {        debug_macro("Probe: unlocking sibling %.8x\n", hermano->ipaddr);        route_probe_done(hermano->ipaddr, 0, 0, -1);      }    }  }  void probing_send_some_probes() {    int garbage[OVERCAST_PAYLOAD];    int trash;    while (probes_to_send) {      if (OVERCAST_NUM_PROBES+OVERCAST_TRASH_PROBES-probes_to_send <= OVERCAST_TRASH_PROBES)        trash = 1;      else         trash = 0;      route_probe(probed_node, OVERCAST_NUM_PROBES-probes_to_send, probe_synch, trash, garbage, OVERCAST_PAYLOAD, -1);      if (macedon_sendret) {  // send failed        double application_spacing = (double)(parameters.getint("data_packet_size")) *8/(1000.0*(double) parameters.getint("streaming_rate"));        timer_resched (keep_probing, application_spacing);        break;      }      else         probes_to_send--;    }  }  void any_ck_probes_finished() {    int finished=1;    neighbor_oparent *pops = neighbor_random (papa);    if (OVERCAST_NUM_PROBES-pops->probes_to_get < OVERCAST_MIN_PROBES) {      finished=0;    }    if (neighbor_size(grandpa)) {      neighbor_oparent *gramps = neighbor_random(grandpa);      if (OVERCAST_NUM_PROBES-gramps->probes_to_get < OVERCAST_MIN_PROBES) {        finished=0;      }    }    foreach_neighbor (neighbor_ochildren*, hermano, brothers) {      if (OVERCAST_NUM_PROBES-hermano->probes_to_get < OVERCAST_MIN_PROBES) {        finished=0;      }    }    if (finished) {      timer_cancel (stop_probe);      probed_probes_completed();    }  }  void probed_probes_completed() {    printf("probed_probes_completed();\n");    int abort=0;    neighbor_oparent *pops = neighbor_random(papa);    if (pops->delay == 0.0 &&         OVERCAST_NUM_PROBES-pops->probes_to_get < OVERCAST_MIN_PROBES) {      debug_macro("Probe: not enuf probes recd from parent %.8x: %d\n", pops->ipaddr, pops->probes_to_get);      state_change (joined);      abort=1;    }    if (!abort) {      if (OVERCAST_NUM_PROBES-pops->probes_to_get >= OVERCAST_MIN_PROBES) {        pops->delay = (pops->probing_time-pops->start_probing_time)          / (OVERCAST_NUM_PROBES-pops->probes_to_get);        debug_macro("REPLAY parent %.8x delay set to %f with probes %d, times %f %f\n", pops->ipaddr, pops->delay, (OVERCAST_NUM_PROBES-pops->probes_to_get), pops->probing_time, pops->start_probing_time);      }      if (neighbor_size(grandpa)) {    // my grandfather exists        neighbor_oparent *gramps = neighbor_random(grandpa);        if (OVERCAST_NUM_PROBES-gramps->probes_to_get >= OVERCAST_MIN_PROBES) {          gramps->delay = (gramps->probing_time-gramps->start_probing_time)            / (OVERCAST_NUM_PROBES-gramps->probes_to_get);          if (gramps->delay < pops->delay * OVERCAST_GOOD_ENOUGH) {            debug_macro("REPLAY trying to move from %.8x with delay set %f to grandpa %.8x with delay %f\n", pops->ipaddr, pops->delay, gramps->ipaddr, gramps->delay);            state_change (joining);            route_add (gramps->ipaddr, 0, 0, -1);            return;          }          else {            sprintf(trace_buf_, "Probe: grandpa %.8x delay is %f which is too high comp to %f\n", gramps->ipaddr, gramps->delay, pops->delay);            trace_print();          }        }        else {          sprintf(trace_buf_, "Probe: not enuf recd from grandpa %.8x: %d\n", gramps->ipaddr, OVERCAST_NUM_PROBES-gramps->probes_to_get);          trace_print();        }      }      int best=0;;      double best_delay=0.0;      foreach_neighbor (neighbor_ochildren*, hermano, brothers) {        if (OVERCAST_NUM_PROBES-hermano->probes_to_get >= OVERCAST_MIN_PROBES) {          hermano->delay = (hermano->probing_time-hermano->start_probing_time)            / (OVERCAST_NUM_PROBES-hermano->probes_to_get);          if (hermano->delay < pops->delay / OVERCAST_GOOD_ENOUGH              && (best == 0 || hermano->delay < best_delay) ) {            best_delay = hermano->delay;            best = hermano->ipaddr;            sprintf(trace_buf_, "Probe: may move from %.8x with delay set %f to sibling %.8x with delay %f\n", pops->ipaddr, pops->delay, hermano->ipaddr, hermano->delay);            trace_print();          }          else {            sprintf(trace_buf_, "REPLAY sibling %.8x delay set to %f which is too high comp to %f and %f probes %d times %f %f\n", hermano->ipaddr, hermano->delay, pops->delay, best_delay, OVERCAST_NUM_PROBES-hermano->probes_to_get, hermano->probing_time, hermano->start_probing_time);            trace_print();          }        }        else {          sprintf(trace_buf_, "Probe: not enuf recd from brother %.8x: %d\n", hermano->ipaddr, OVERCAST_NUM_PROBES-hermano->probes_to_get);          trace_print();        }      }      if (best) {    // found a sibling to move to        state_change (joining);        route_add (best, 0, 0, -1);        return;      } else {        state_change (joined);      }    }    any_unlock_nodes();  }  void probed_request_more_probes() {    neighbor_oparent *pops = neighbor_random(papa);    if (neighbor_size(grandpa)) {      neighbor_oparent *gramps = neighbor_random(grandpa);      if (gramps->lock_requested == 0) {        gramps->probes_to_get = OVERCAST_NUM_PROBES;        gramps->start_probing_time = 0.0;        gramps->lock_requested = 1;        sprintf(trace_buf_, "Probe: ask grandparent %.8x\n", gramps->ipaddr);        trace_print();        route_probe_request ( gramps->ipaddr, pops->ipaddr, probe_synch, 0.0, 0, 0, -1);        timer_resched(probe_requester, probe_skip);        return;      }    }    foreach_neighbor (neighbor_ochildren*, hermano, brothers) {      if (hermano->lock_requested == 0) {        sprintf(trace_buf_, "Probe: ask sibling %.8x\n", hermano->ipaddr);        trace_print();        hermano->start_probing_time = 0.0;        hermano->probes_to_get = OVERCAST_NUM_PROBES;        hermano->lock_requested = 1;        route_probe_request ( hermano->ipaddr, pops->ipaddr, probe_synch, 0.0, 0, 0, -1);        timer_resched(probe_requester, probe_skip);        return;      }    }  }  void joined_read_parent_file() {    struct hostent *phe;    struct hostent *mhe;    struct in_addr paddr;    struct in_addr myaddr;    FILE *myfile;    int i=0;    char name[200];    char parent[200];    char rest[1024];    char whole_line[1024];    char *parent_filename = parameters.getstr("parent_file");    printf("forced parent_filename: %s\n", parent_filename);    forced_children=0;    myfile = (FILE *)fopen(parent_filename, "r");    if (!myfile)    {      printf("nonexistent forced parent_file  %s\n", parent_filename);      return;    }    while (fgets(whole_line, 512, myfile)!=NULL)    {      int successful = 0;      if ((successful =sscanf(whole_line, "%s %s %[^\n]s",               &name, &parent, &rest))!= 2)      {        continue;      }	        if ((mhe = gethostbyname(name)) == 0)      {        printf("Bad host lookup.\n");        exit(24);      }      memcpy(&myaddr, mhe->h_addr_list[0], sizeof(struct in_addr));	      if ((phe = gethostbyname(parent)) == 0)      {        printf("Bad host lookup.\n");        exit(24);      }      memcpy(&paddr, phe->h_addr_list[0], sizeof(struct in_addr));      printf("forced parent line: %s %.8x %s %.8x\n", name, myaddr, parent, paddr);      if (paddr.s_addr == me) {   // i am parent        forced_children++;      }      else if (myaddr.s_addr == me) {   // i am kid        forced_parent = paddr.s_addr;        sprintf(trace_buf_, "forced_parent: %.8x\n", forced_parent);        trace_print();      }    }    fclose(myfile);  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?