⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribe.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
        else if(tempsess->parent != UNDEFINED) { route_data(tempsess->parent, field(group_id), COMM_TYPE_COLLECT, field(priority), msg, size, field(priority)); }        else {          debug_macro("WARNING: Collect msg dropped, no parent\n");        }      }    }  }  //Message: pushdown  //Action: recv  //  //1 - Pick sibling to join under based on symantics below.  //2 - Reset session state pertinent to parents.  //3 - Send a join to that sibling  //  //Known Problems: See FIXME  recv pushdown {    if(dest != myhash || from == myhash) {      debug_macro(trace_buf_,"Scribe: ERROR: A pushdown message was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",dest,myhash,from,myhash);    } else {          children siblings = field(siblings);      //Symantics:       // 1- Upcall to upper layer.  If not subscribed, or if they wish, return < 0,       //    which implies to perform standard scribe pushdown.      // 2- Standard Scribe Pushdown: probe for lowest latency      // 3- returnCode = 0, chosen node is set in the arg      int pushdownNode=-1;      int returnCode=-1;      if(neighbor_query(my_sessions, field(group_id))) {        //Only accept pushdown if it's from the expected parent.        if(neighbor_info(my_sessions,field(group_id),parent) == from) {          //FIXME: This if(true) should be if(upper layer is subscribed to PUSHDOWN_SELECTOR messages)          if(true) {            //defined in scribe_ext.h            pushdown_selector_arg *arg = new pushdown_selector_arg();            arg->groupId = field(group_id);            arg->size = neighbor_size(siblings);            if(arg->size > 0) {              arg->siblings = new int[arg->size];              int i=0;              foreach_neighbor( neighbor_children*, achild, siblings) {                arg->siblings[i++] = achild->ipaddr;              }            } else {              arg->siblings = NULL;            }            returnCode = upcall_ext(PUSHDOWN_SELECTOR, (void*)arg);            //Three values for return code:            //-1 - Let lower layer pick            //0  - I've chosen, result is in arg->chosen            //1 - don't send any join messages, I've taken care of it.            if(returnCode == 0) {              pushdownNode = arg->chosen;            }            if(arg->size > 0 && arg->siblings) {              delete[] arg->siblings;            }              delete arg;          }          if(returnCode == -1 && neighbor_size(siblings) > 0) {            //FIXME: This should be replaced with a proper probe            neighbor_children* mysibling = neighbor_random(siblings);            pushdownNode = mysibling->ipaddr;          } else if(returnCode == -1) {            //No node to join.            returnCode=2;          }          //Update these values anyway. If no node is rejoined -- this becomes a new timeout timer.          dojoin(field(group_id));          neighbor_session* tempsess;          tempsess = neighbor_entry(my_sessions, field(group_id));          unwatch(tempsess->parent);          tempsess->parent = UNDEFINED;          debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", field(group_id), myhash, myhash);          neighbor_clear(tempsess->pathtoroot);          if(returnCode <= 0) {            routeIP_join(pushdownNode,field(group_id), 0, 0, -1);          }	        } else {          debug_macro("WARNING: Received pushdown from non-parent.  Ignoring (gr: %.8x fr: %.8x pr: %.8x)\n",field(group_id),from,neighbor_info(my_sessions,field(group_id),parent));          //sprintf(trace_buf_,"WARNING: Received pushdown from non-parent.  Ignoring (gr: %.8x fr: %.8x pr: %.8x)\n",field(group_id),from,neighbor_info(my_sessions,field(group_id),parent));          //trace_print();        }      }    }    }  //Message: membership  //Action: recv  //  //When receiving a membership  //1 - clear join timer state  //2 - update pathtoroot  //3 - update parent  //4 - send membership to children  //  //Known Problems: How do we add optimization #1: Cycle detection only when needed?  recv membership {    if(dest != myhash || from == myhash) {      debug_macro("Scribe: ERROR: A membership message (for %.8x) was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",field(group_id),dest,myhash,from,myhash);    } else if(neighbor_query(field(pathtoroot),myhash)) {      debug_macro("Scribe: ERROR: A membership message (for %.8x) was received and I was in the path to root!\n",field(group_id),dest,myhash,from,myhash);    } else {          //FIXME: Add optimisation #1, cycle detection only when needed?      int groupId = field(group_id);      #ifdef MEMBERSHIP_TRACE      debug_macro("Scribe: Received membership for group %.8x\n",groupId);      #endif      if(neighbor_query(my_sessions, groupId)) {        neighbor_session* tempsess = neighbor_entry(my_sessions, groupId);        neighbor_clear(tempsess->pathtoroot);        foreach_neighbor(neighbor_uppath*,tempnode,field(pathtoroot)) {          neighbor_add(tempsess->pathtoroot,tempnode->ipaddr);        }        if(!neighbor_query(tempsess->pathtoroot,from)) {          //NOTE: This if statement should essentially be an if(true), but          // I added this for safety, because of some long paths seen.          neighbor_add(tempsess->pathtoroot,from);        }        if(groupId != dest && (tempsess->parent ==UNDEFINED || tempsess->parent != from) ) {          //FIXME: Consider printing a warning when parent changes unexpectedly.          unwatch(tempsess->parent);          if(tempsess->parent != UNDEFINED) {            routeIP_leave(tempsess->parent,tempsess->ipaddr,0,0,-1);          }          debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupId, myhash, from);          tempsess->parent = from;          watch(tempsess->parent);        }        donejoin(tempsess->ipaddr);        foreach_neighbor(neighbor_children*,achild,tempsess->kids) {          routeIP_membership(achild->ipaddr,tempsess->ipaddr,tempsess->pathtoroot,0,0,-1);        }      } else {        //FIXME: Nothing for now -- consider sending a leave message.        debug_macro("Scribe: WARNING: Received membership for group %.8x, but not subscribed.\n",groupId);      }    }    }  //Message: anycast  //Action: forward  //  //Q: Do we need to hang on to Black nodes?  Prior experience said stale state was causing routing loops.  //   Elimination of the spare capacity may solve this, but the general case is still problematic.  //  //Step 1: If I am a "white" node (first time), add my children  //Step 2: Pick the next node in line to process the msg.  //Step 3: Route to parent, if any  //Step 4: Fall back to routing message toward key.  forward anycast {    visited dfsState = field(dfsState);    if(from == myhash) {      return 0;    }    #ifdef ANYCAST_TRACE    debug_macro("DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState));    //sprintf(trace_buf_,"DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState));    //trace_print();    //int i=0;    //foreach_neighbor(neighbor_visited*,tmp,dfsState) {    //  sprintf(trace_buf_+(14*i), "(sib:%.8x())",tmp->ipaddr,tmp->color);    //  i++;    //}    //sprintf(trace_buf_+(14*i),"\n");    //trace_print();    #endif    if(neighbor_query(my_sessions, field(group_id))) {      neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id));      //Step 1: If I am a "white" node (first time), add my children      if(!neighbor_query(dfsState, myhash) || neighbor_info(dfsState, myhash, color)==0) {        if(!neighbor_query(dfsState, myhash)) {          neighbor_add(dfsState, myhash);        }         neighbor_info(dfsState, myhash, color) = 1;        foreach_neighbor(neighbor_children*,achild,tempsess->kids) {          if(!neighbor_query(dfsState,achild->ipaddr)) {            neighbor_add(dfsState, achild->ipaddr);          }         }      }    }    //Step 2: Pick the next node in line to process the msg.    foreach_neighbor(neighbor_visited*,next,dfsState) {      if(next->color != 2 && next->ipaddr==myhash && (!neighbor_query(my_sessions, field(group_id))) || from==myhash) {        next->color = 2;      }      else if(next->color != 2) {        if(next->ipaddr == myhash) {          next->color=2;          //Visit this node.          #ifdef ANYCAST_TRACE          debug_macro("Scribe: All children visited. Visiting this node. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));          #endif          if(upcall_forward(0, msg, size, COMM_TYPE_ANYCAST)) {            return 1;          }        } else {          //Send message          #ifdef ANYCAST_TRACE          debug_macro("Scribe: Forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr);          #endif	        routeIP_anycast(next->ipaddr, field(group_id), field(from_addr), dfsState, msg, size, -1);          #ifdef ANYCAST_TRACE          debug_macro("Scribe: Done forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr);          #endif          return 1;        }      }    }    //Step 3: Route to parent, if any    //NOTE: dfsState is eitehr all black, or empty    if(neighbor_query(my_sessions, field(group_id))) {      neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id));      if(tempsess->parent==UNDEFINED) {        //If we don't know our parent, we simply forward it toward the group ID.        //FIXME: Is this the right thing to do, or do we drop it?        #ifdef ANYCAST_TRACE        debug_macro("Scribe: Don't know parent, so routing towards groupid. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));        #endif 	      route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1);        return 1;      } else {        #ifdef ANYCAST_TRACE        debug_macro("Scribe: Forwarding to parent %.8x. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",tempsess->parent,from,field(from_addr),field(group_id));        #endif	      routeIP_anycast(tempsess->parent, field(group_id), field(from_addr), dfsState, msg, size, -1);        return 1;      }	    }        //Step 4: Fall back to routing message toward key.    #ifdef ANYCAST_TRACE    debug_macro("Scribe: All options failed. Forwarding using Pastry routing. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));    #endif    route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1);    return 1;  }  //Message: anycast  //Action: recv  //  //Print an error message -- the anycast is delivered to the root of the tree.  recv anycast {    //This is the case where an anycast message was not handled anywhere. An error message is    //returned.    //Return error to source.    //FIXME: Is this the right thing to do or do I drop it?    //routeIP_anycastError(field(from_addr),field(group_id), 0, 0, -1);#ifdef ANYCAST_TRACE    debug_macro("WARNING: Scribe: recv anycast.  No node accepted anycast message. dropping. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));#endif    upcall_deliver(msg, size, COMM_TYPE_ANYCAST);  }  //Timer: joinTimer  //  //Ensure joins complete successfully.  //  //Process: Iterate over join list, incrementing delay for   //          each.  If delay > JOIN_TIMEOUT_INT, issue a new  //          join.  timer joinTimer {    foreach_neighbor( neighbor_joining*, tempjoin, outstanding_joins) {      ASSERT(tempjoin->ipaddr != UNDEFINED);      tempjoin->delay++;      if(tempjoin->delay > JOIN_TIMEOUT_INT) {        debug_macro("Scribe: WARNING No membership received (delay=%d), rejoining group (%.8x)\n",(int)tempjoin->delay,tempjoin->ipaddr);        //sprintf(trace_buf_,"Scribe: WARNING No membership received (delay=%d), rejoining group (%.8x)\n",tempjoin->delay,tempjoin->ipaddr);        //trace_print();        tempjoin->delay=0;        route_join(tempjoin->ipaddr, tempjoin->ipaddr, 0, 0, -1);      } else {#ifdef MEMBERSHIP_TRACE        debug_macro("Scribe: Incremented delay=%d for group (%.8x)\n",(int)tempjoin->delay,tempjoin->ipaddr);        //sprintf(trace_buf_,"Scribe: Incremented delay=%d for group (%.8x)\n",tempjoin->delay,tempsess->ipaddr);        //trace_print();#endif      }    }    if(neighbor_size(outstanding_joins)>0) {      timer_resched(joinTimer, JOIN_TIMER_PERIOD);    }  }  //Timer: printer  //  //prints out current state, and calls dump_state().  //Also -- prints REPLAY BANDWIDTH  timer printer {#ifdef SCRIBE_TRACE    if(0) {      debug_macro("Scribe: Printer timer displaying summary Scribe state:\n");      foreach_neighbor(neighbor_session*,tempsess,my_sessions) {        debug_macro("Scribe session: %.8x, with %d kids.\n",tempsess->ipaddr,neighbor_size(tempsess->kids));      }      debug_macro("Scribe: Printer timer dumping state:\n");      dump_state();    }    if(1) {      foreach_neighbor(neighbor_session*,tempsess,my_sessions) {        if(tempsess->isRoot) {          debug_macro("REPLAY session: %.8x, parent: client0.\n",tempsess->ipaddr);        } else {          debug_macro("REPLAY session: %.8x, parent: %.8x.\n",tempsess->ipaddr,tempsess->parent);        }      }    }#endif    //FIXME: Should I be using globalmacedon?    extern MACEDON_Agent *globalmacedon;    if (globalmacedon != this)  // am i the highest layer      return;     if ( ( parameters.getint("streaming_time") == -1.0 ||	   curtime > time_booted + parameters.getint("streaming_time")) )      printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", 	     get_hostname(), Scheduler::instance().clock(), pthread_self(),	     (int) master.get_value(),	     0,   // need to put in control bw	     (int) master_useful.get_value(),	     (int) master.get_value(),	     (int) master_useful.get_value(),	     0	     );  }  //API error  //  //This is the result of the failures of "watched" nodes.    //Iterate over sessions -- if it is a parent, send it a leave message, and  //send a new join,  //if it is a child, send it a pushdown message, finally remove it from  //the watched group.  //  //Q: But what about scribe-liveness, not just network liveness?  API error {    if(type == ERROR_NEIGHBOR_FAILURE) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -