⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 splitstreamms.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 2 页
字号:
  //forward joinReq  //  //This method handles processing of messages sent to the spare capacity group requesting  //to join a stripe.  Comments of process are inline.  forward joinReq {    #ifdef SPARE_CAPACITY_TRACE    debug_macro("In forward joinReq (dest:%.8x,from:%.8x,fromaddr:%.8x,gr:%.8x)\n",dest,from,field(from_addr),field(stripe_id));    #endif    //CHECK 1: Do we have spare capacity?    int outdegree = compute_outdegree();    if(outdegree < FORWARD_STRIPES) {      //CHECK 2: Do we forward for this tree?      if(neighbor_query(my_trees, field(stripe_id))) {        //CHECK 3: Is this node in my path to the root?        //NOTE: Assume Scribe handles routing loop checking, returns 1 if a loop is found!!!        child_add_arg* carg = new child_add_arg;        carg->groupId = field(stripe_id);        carg->childId = field(from_addr);        int success;        downcall_ext(success, ADD_CHILD, carg);        delete carg;        #ifdef SPARE_CAPACITY_TRACE        debug_macro("Accepted child if(%d!=0). (dest:%.8x,from:%.8x,gr:%.8x)\n",!success,dest,from,field(stripe_id));        #endif        if(success==2) {          #ifdef SPARE_CAPACITY_TRACE          debug_macro("As child is already in group, terminating anycast joinReq. (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id));          #endif          return 1;        } else if(success==0) {          //ADDING CHILD TO LOCAL SCRIBE STATE, BECAUSE SCRIBE WON'T UPCALL FROM THE DOWNCALL.          neighbor_add(neighbor_info(my_trees,field(stripe_id),kids),field(from_addr));          outdegree = compute_outdegree();          if(outdegree >= FORWARD_STRIPES) {            //outdegree is maximum.  leave the anycast group.            #ifdef SPACE_CAPACITY_TRACE            debug_macro( "Outdegree at max (%d)!  Leaving spare capacity group (%.8x)?\n", outdegree, ANYCAST_ID);            #endif            leave(ANYCAST_ID);          } //else there is still spare capacity          return 1;        } else {          //KEEP ON FORWARDING          return 0;        }      } else {        #ifdef SPARE_CAPACITY_TRACE        debug_macro("Sorry, we don't forward this stripe. (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id));        #endif      }    } else {      //THIS SHOULDN'T HAPPEN IN PRACTICE.      #ifdef SPARE_CAPACITY_TRACE      debug_macro("WARNING: HUH? Received joinReq & no forward capacity. (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id));      #endif      leave(ANYCAST_ID);    }    return 0;  }  //recv joinReq  //  //When a joinReq message is recieved, this means that no-one stoped the forwarding.  The node receiving  //the message should be the root of the spare capacity group, and this represents a failure to join.  //One question is what should we do now?  This implementation just prints an error message, and goes  //on.  recv joinReq {    //NOTE: This happens when an anycast message is delivered to the root of an anycast tree, in failure.    debug_macro("ERROR: SplitStream forest creation failed!  joinReq found no parent (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id));  }  //API downcall_ext  //  //No downcalls are presently supported.  API downcall_ext {    //Takes int operation, void* arg    switch(operation) {      default:         debug_macro( "Unrecognized Downcall Extension Called (operation: %d)\n", operation);        return -1; //CALL UNRECOGNIZED    }    return 0;  }  //API upcall_ext  //  //The following upcalls are supported:  // - PUSHDOWN_SELECTOR: When this node is pushed down from its parent, it is sent a list of   //   siblings. Scribe then does an extensible upcall to give protocols the option to select  //   the node to attempt to join at.    // - GROUP_CHANGE: upcall_notify doesn't allow scribe to notify splitstream of changes to the  //   children of sessions -- so instead it calls upcall_ext with GROUP_CHANGE to do so.  As a  //   results of this, splitstream might iniitiate a pushdown, update its spare capacity group  //   subscribtion status or just save the state.  API upcall_ext {    //Takes int operation, void* arg    switch(operation) {      case PUSHDOWN_SELECTOR:         {          //This roughly matches the description in the SOSP paper.          pushdown_selector_arg* psArg = (pushdown_selector_arg*)arg;          bool retset=false;          int retval;          #ifdef PUSHDOWN_TRACE          debug_macro("DEBUG: In PUSHDOWN_SELECTOR (%.8x)...\n",psArg->groupId);          #endif          if(psArg->groupId == ANYCAST_ID) { //Let Scribe handle spare capacity group.              #ifdef PUSHDOWN_TRACE            debug_macro("DEBUG: Letting scribe pick a pushdown neighbor for the anycast group (%.8x)...\n",psArg->groupId);            #endif            return -1;          }           if(psArg->size > 0) {            scrChildren set;            int i;            for(i=0; i<psArg->size; i++) { //Look for a node with a common prefix with the group id.              if(common_prefix(psArg->groupId, psArg->siblings[i]) > 0) {                neighbor_add(set, psArg->siblings[i]);              }            }            if(neighbor_size(set) > 0) { //pick randomly from the set of nodes with a common prefix.              retval = neighbor_random(set)->ipaddr;              retset=true;              neighbor_clear(set);            }          }	          if(!retset) { //If there was no node to join, then send an anycast message, and tell Scribe not to try a join.            #ifdef PUSHDOWN_TRACE            debug_macro("DEBUG: Pushdown Selector couldn't join sibling, sending anycast (%.8x for group %.8x)\n",ANYCAST_ID,psArg->groupId);            #endif            anycast_joinReq(ANYCAST_ID, psArg->groupId, myhash, 0, 0, -1);            return 1;           } else {            #ifdef PUSHDOWN_TRACE            debug_macro("DEBUG: Pushdown Selector selecting (%.8x for group %.8x)\n",retval,psArg->groupId);            #endif            psArg->chosen = retval;            return 0;          }        }        break;      case GROUP_CHANGE:        {          //dump_state();          group_change* garg = (group_change*)arg;          if(garg->groupId == ANYCAST_ID) {            break;          }          if(neighbor_query(my_trees, garg->groupId)) {            neighbor_scrTree* scrTree = neighbor_entry(my_trees, garg->groupId);            int outdegree = compute_outdegree();            if(garg->newJoin)            {              if(!neighbor_query(scrTree->kids, garg->childId))               {                neighbor_add(scrTree->kids, garg->childId);                if(outdegree > FORWARD_STRIPES + 1) { //For debugging purposes -- if this happens alert the user                  debug_macro( "ERROR: Outdegree too high (%d)!  How did it get this high?\n", outdegree);                }                 if(outdegree > FORWARD_STRIPES) {                  //Need to push a node down.                  pushdown_arg *parg = new pushdown_arg;                  select_pushdown_child(parg, garg); //This function implements the pushdown selection as described in the paper.                  if(neighbor_query(my_trees,parg->groupId)) {                    neighbor_scrTree *atree = neighbor_entry(my_trees,parg->groupId);                    if(neighbor_query(atree->kids, parg->childId)) {                      //NOTE: Lower layer doesn't do an upcall from the downcall.                      neighbor_remove(atree->kids, parg->childId);                      int trash;                      downcall_ext(trash, PUSHDOWN, (void*)parg);                    }                  }                  delete parg;                }                 else if(outdegree == FORWARD_STRIPES) {                  //outdegree is maximum.  leave the anycast group.                  #ifdef SPACE_CAPACITY_TRACE                  debug_macro( "Outdegree at max (%d)!  Leaving spare capacity group (%.8x)?\n", outdegree, ANYCAST_ID);                  #endif                  leave(ANYCAST_ID);                } //else there is still spare capacity              }              else {              }            }            else {              if(neighbor_query(scrTree->kids, garg->childId)) {                neighbor_remove(scrTree->kids, garg->childId);                if(outdegree == FORWARD_STRIPES) {                  join(ANYCAST_ID);                } // else spare capacity state didn't change.              } else {              }            }          }        }          break;      default:         {          debug_macro( "Unrecognized Upcall Extension Called (operation: %d)\n", operation);          return -1; //CALL UNRECOGNIZED        }    }    return 0;  }}routines {  //Current key generation just flips the most significant digit.  //Question: what about overlapping keys (i.e. if there were multiple SS sessions?)  void generate_keys(int splitstream_id, int* scribe_ids) {    for(int i=0; i<NUM_STRIPES; i++) {      scribe_ids[i] = (splitstream_id + (i << SS_BITS-STRIPE_SS_BITS) );// % (1 << SS_BITS);    }  }    //Determines length of the common prefix between id1 and id2  int common_prefix(int id1, int id2) {    int i;    for(i=0; i<SS_BITS/STRIPE_SS_BITS &&         ((id1>>(SS_BITS-((i+1)*STRIPE_SS_BITS))) == (id2>>(SS_BITS-((i+1)*STRIPE_SS_BITS))) );         i++);    return i;  }  //Follows the description in the SOSP paper to decide which child to push down.  void select_pushdown_child(pushdown_arg* parg, group_change* gchg) {    int common;    int owner;    scrTree commonTree; //Holds the common stripes.  Although there should only be one stripe with a cmomon prefix in the general case, what if there are multiple SS trees running.    scrTree ownerTree;    //STEP 1: Select a child from any tree which does not share a common prefix     //        with my own hash. And for which I am not root.    foreach_neighbor(neighbor_scrTree*,scrTreeNode,my_trees) {      common = common_prefix(myhash, scrTreeNode->ipaddr);      downcall_ext(owner,TEST_OWNER,(void*)&scrTreeNode->ipaddr);      if(common == 0 && owner == 0) {        if(!neighbor_empty(scrTreeNode->kids)) {          parg->groupId = scrTreeNode->ipaddr;          if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(scrTreeNode->kids, gchg->childId)) {            parg->childId = gchg->childId;          } else {            parg->childId = neighbor_random(scrTreeNode->kids)->ipaddr;          }          return;        }      } else if(common != 0 && owner == 0 && !neighbor_empty(scrTreeNode->kids)) { //This is any stripe with a common prefix with me.        neighbor_add(commonTree, scrTreeNode->ipaddr);      } else if(owner != 0 && neighbor_size(scrTreeNode->kids)>1) {         //This is any stripe which I am owner for -- highest priority        //Note: We only add it if it has at least 2 kids, as we can't push down our only         //kid for a stripe we own.        neighbor_add(ownerTree, scrTreeNode->ipaddr);      }    }    //STEP 2: Pick a child with the shortest common prefix with this stripe    //        id, with a preference for the newest child.    while(!neighbor_empty(commonTree)) {      int commonPrefix=SS_BITS;      int tmpCommon;      scrChildren consider;       neighbor_scrTree* scrTreeNode = neighbor_entry(my_trees, neighbor_random(commonTree)->ipaddr);      neighbor_remove(commonTree, scrTreeNode->ipaddr);      if(!neighbor_empty(scrTreeNode->kids)) {        foreach_neighbor(neighbor_scrChildren*,scrChild,scrTreeNode->kids) {          tmpCommon = common_prefix(scrTreeNode->ipaddr, scrChild->ipaddr);          if(tmpCommon < commonPrefix) {            commonPrefix = tmpCommon;            neighbor_clear(consider);            neighbor_add(consider, scrChild->ipaddr);          } else if(tmpCommon == commonPrefix) {            neighbor_add(consider, scrChild->ipaddr);          }        }        if(!neighbor_empty(consider)) {          parg->groupId = scrTreeNode->ipaddr;          if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(consider, gchg->childId)) {            parg->childId = gchg->childId;          } else {            parg->childId = neighbor_random(consider)->ipaddr;          }          return;        }        } else {        //HUH?        debug_macro("FIXME: ERROR: SS Stripe should only be set if size > 0, but tested wasn't.\n");      }    }         while(!neighbor_empty(ownerTree)) {      //Step 3: If no common non-owned stripe pick a child with the shortest common prefix with       //the owned stripe id, with a preference for the newest child.      int commonPrefix=SS_BITS;      int tmpCommon;      scrChildren consider;       neighbor_scrTree* scrTreeNode = neighbor_entry(my_trees, neighbor_random(ownerTree)->ipaddr);      neighbor_remove(ownerTree, scrTreeNode->ipaddr);      if(neighbor_size(scrTreeNode->kids) > 1) {        foreach_neighbor(neighbor_scrChildren*,scrChild,scrTreeNode->kids) {          tmpCommon = common_prefix(scrTreeNode->ipaddr, scrChild->ipaddr);          if(tmpCommon < commonPrefix) {            commonPrefix = tmpCommon;            neighbor_clear(consider);            neighbor_add(consider, scrChild->ipaddr);          } else if(tmpCommon == commonPrefix) {            neighbor_add(consider, scrChild->ipaddr);          }        }        if(!neighbor_empty(consider)) {          parg->groupId = scrTreeNode->ipaddr;          if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(consider, gchg->childId)) {            parg->childId = gchg->childId;          } else {            parg->childId = neighbor_random(consider)->ipaddr;          }          return;        }      } else {        //HUH?        debug_macro("FIXME: ERROR: We didn't add this stripe unless it had enough children!\n");      }    }	        debug_macro("FIXME: ERROR: How can we be forwarding too much and still have reached this point?\n");      }  int compute_outdegree() {    int outdegree = 0;    int owner;    foreach_neighbor(neighbor_scrTree*,scrTree,my_trees) {      downcall_ext(owner,TEST_OWNER,(void*)&scrTree->ipaddr);      //NOTE: Broken if non-source is the creator. -- possible -- what about accounting in create_group?      if(source_ == me) {        outdegree -= owner;      }      outdegree += neighbor_size(scrTree->kids);    }    outdegree += NUM_STRIPES * neighbor_size(my_sessions);    //debug_macro("computed outdegree: %d\n", outdegree);    return outdegree;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -