📄 splitstreamms.mac
字号:
//forward joinReq // //This method handles processing of messages sent to the spare capacity group requesting //to join a stripe. Comments of process are inline. forward joinReq { #ifdef SPARE_CAPACITY_TRACE debug_macro("In forward joinReq (dest:%.8x,from:%.8x,fromaddr:%.8x,gr:%.8x)\n",dest,from,field(from_addr),field(stripe_id)); #endif //CHECK 1: Do we have spare capacity? int outdegree = compute_outdegree(); if(outdegree < FORWARD_STRIPES) { //CHECK 2: Do we forward for this tree? if(neighbor_query(my_trees, field(stripe_id))) { //CHECK 3: Is this node in my path to the root? //NOTE: Assume Scribe handles routing loop checking, returns 1 if a loop is found!!! child_add_arg* carg = new child_add_arg; carg->groupId = field(stripe_id); carg->childId = field(from_addr); int success; downcall_ext(success, ADD_CHILD, carg); delete carg; #ifdef SPARE_CAPACITY_TRACE debug_macro("Accepted child if(%d!=0). (dest:%.8x,from:%.8x,gr:%.8x)\n",!success,dest,from,field(stripe_id)); #endif if(success==2) { #ifdef SPARE_CAPACITY_TRACE debug_macro("As child is already in group, terminating anycast joinReq. (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id)); #endif return 1; } else if(success==0) { //ADDING CHILD TO LOCAL SCRIBE STATE, BECAUSE SCRIBE WON'T UPCALL FROM THE DOWNCALL. neighbor_add(neighbor_info(my_trees,field(stripe_id),kids),field(from_addr)); outdegree = compute_outdegree(); if(outdegree >= FORWARD_STRIPES) { //outdegree is maximum. leave the anycast group. #ifdef SPACE_CAPACITY_TRACE debug_macro( "Outdegree at max (%d)! Leaving spare capacity group (%.8x)?\n", outdegree, ANYCAST_ID); #endif leave(ANYCAST_ID); } //else there is still spare capacity return 1; } else { //KEEP ON FORWARDING return 0; } } else { #ifdef SPARE_CAPACITY_TRACE debug_macro("Sorry, we don't forward this stripe. (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id)); #endif } } else { //THIS SHOULDN'T HAPPEN IN PRACTICE. #ifdef SPARE_CAPACITY_TRACE debug_macro("WARNING: HUH? Received joinReq & no forward capacity. (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id)); #endif leave(ANYCAST_ID); } return 0; } //recv joinReq // //When a joinReq message is recieved, this means that no-one stoped the forwarding. The node receiving //the message should be the root of the spare capacity group, and this represents a failure to join. //One question is what should we do now? This implementation just prints an error message, and goes //on. recv joinReq { //NOTE: This happens when an anycast message is delivered to the root of an anycast tree, in failure. debug_macro("ERROR: SplitStream forest creation failed! joinReq found no parent (dest:%.8x,from:%.8x,gr:%.8x)\n",dest,from,field(stripe_id)); } //API downcall_ext // //No downcalls are presently supported. API downcall_ext { //Takes int operation, void* arg switch(operation) { default: debug_macro( "Unrecognized Downcall Extension Called (operation: %d)\n", operation); return -1; //CALL UNRECOGNIZED } return 0; } //API upcall_ext // //The following upcalls are supported: // - PUSHDOWN_SELECTOR: When this node is pushed down from its parent, it is sent a list of // siblings. Scribe then does an extensible upcall to give protocols the option to select // the node to attempt to join at. // - GROUP_CHANGE: upcall_notify doesn't allow scribe to notify splitstream of changes to the // children of sessions -- so instead it calls upcall_ext with GROUP_CHANGE to do so. As a // results of this, splitstream might iniitiate a pushdown, update its spare capacity group // subscribtion status or just save the state. API upcall_ext { //Takes int operation, void* arg switch(operation) { case PUSHDOWN_SELECTOR: { //This roughly matches the description in the SOSP paper. pushdown_selector_arg* psArg = (pushdown_selector_arg*)arg; bool retset=false; int retval; #ifdef PUSHDOWN_TRACE debug_macro("DEBUG: In PUSHDOWN_SELECTOR (%.8x)...\n",psArg->groupId); #endif if(psArg->groupId == ANYCAST_ID) { //Let Scribe handle spare capacity group. #ifdef PUSHDOWN_TRACE debug_macro("DEBUG: Letting scribe pick a pushdown neighbor for the anycast group (%.8x)...\n",psArg->groupId); #endif return -1; } if(psArg->size > 0) { scrChildren set; int i; for(i=0; i<psArg->size; i++) { //Look for a node with a common prefix with the group id. if(common_prefix(psArg->groupId, psArg->siblings[i]) > 0) { neighbor_add(set, psArg->siblings[i]); } } if(neighbor_size(set) > 0) { //pick randomly from the set of nodes with a common prefix. retval = neighbor_random(set)->ipaddr; retset=true; neighbor_clear(set); } } if(!retset) { //If there was no node to join, then send an anycast message, and tell Scribe not to try a join. #ifdef PUSHDOWN_TRACE debug_macro("DEBUG: Pushdown Selector couldn't join sibling, sending anycast (%.8x for group %.8x)\n",ANYCAST_ID,psArg->groupId); #endif anycast_joinReq(ANYCAST_ID, psArg->groupId, myhash, 0, 0, -1); return 1; } else { #ifdef PUSHDOWN_TRACE debug_macro("DEBUG: Pushdown Selector selecting (%.8x for group %.8x)\n",retval,psArg->groupId); #endif psArg->chosen = retval; return 0; } } break; case GROUP_CHANGE: { //dump_state(); group_change* garg = (group_change*)arg; if(garg->groupId == ANYCAST_ID) { break; } if(neighbor_query(my_trees, garg->groupId)) { neighbor_scrTree* scrTree = neighbor_entry(my_trees, garg->groupId); int outdegree = compute_outdegree(); if(garg->newJoin) { if(!neighbor_query(scrTree->kids, garg->childId)) { neighbor_add(scrTree->kids, garg->childId); if(outdegree > FORWARD_STRIPES + 1) { //For debugging purposes -- if this happens alert the user debug_macro( "ERROR: Outdegree too high (%d)! How did it get this high?\n", outdegree); } if(outdegree > FORWARD_STRIPES) { //Need to push a node down. pushdown_arg *parg = new pushdown_arg; select_pushdown_child(parg, garg); //This function implements the pushdown selection as described in the paper. if(neighbor_query(my_trees,parg->groupId)) { neighbor_scrTree *atree = neighbor_entry(my_trees,parg->groupId); if(neighbor_query(atree->kids, parg->childId)) { //NOTE: Lower layer doesn't do an upcall from the downcall. neighbor_remove(atree->kids, parg->childId); int trash; downcall_ext(trash, PUSHDOWN, (void*)parg); } } delete parg; } else if(outdegree == FORWARD_STRIPES) { //outdegree is maximum. leave the anycast group. #ifdef SPACE_CAPACITY_TRACE debug_macro( "Outdegree at max (%d)! Leaving spare capacity group (%.8x)?\n", outdegree, ANYCAST_ID); #endif leave(ANYCAST_ID); } //else there is still spare capacity } else { } } else { if(neighbor_query(scrTree->kids, garg->childId)) { neighbor_remove(scrTree->kids, garg->childId); if(outdegree == FORWARD_STRIPES) { join(ANYCAST_ID); } // else spare capacity state didn't change. } else { } } } } break; default: { debug_macro( "Unrecognized Upcall Extension Called (operation: %d)\n", operation); return -1; //CALL UNRECOGNIZED } } return 0; }}routines { //Current key generation just flips the most significant digit. //Question: what about overlapping keys (i.e. if there were multiple SS sessions?) void generate_keys(int splitstream_id, int* scribe_ids) { for(int i=0; i<NUM_STRIPES; i++) { scribe_ids[i] = (splitstream_id + (i << SS_BITS-STRIPE_SS_BITS) );// % (1 << SS_BITS); } } //Determines length of the common prefix between id1 and id2 int common_prefix(int id1, int id2) { int i; for(i=0; i<SS_BITS/STRIPE_SS_BITS && ((id1>>(SS_BITS-((i+1)*STRIPE_SS_BITS))) == (id2>>(SS_BITS-((i+1)*STRIPE_SS_BITS))) ); i++); return i; } //Follows the description in the SOSP paper to decide which child to push down. void select_pushdown_child(pushdown_arg* parg, group_change* gchg) { int common; int owner; scrTree commonTree; //Holds the common stripes. Although there should only be one stripe with a cmomon prefix in the general case, what if there are multiple SS trees running. scrTree ownerTree; //STEP 1: Select a child from any tree which does not share a common prefix // with my own hash. And for which I am not root. foreach_neighbor(neighbor_scrTree*,scrTreeNode,my_trees) { common = common_prefix(myhash, scrTreeNode->ipaddr); downcall_ext(owner,TEST_OWNER,(void*)&scrTreeNode->ipaddr); if(common == 0 && owner == 0) { if(!neighbor_empty(scrTreeNode->kids)) { parg->groupId = scrTreeNode->ipaddr; if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(scrTreeNode->kids, gchg->childId)) { parg->childId = gchg->childId; } else { parg->childId = neighbor_random(scrTreeNode->kids)->ipaddr; } return; } } else if(common != 0 && owner == 0 && !neighbor_empty(scrTreeNode->kids)) { //This is any stripe with a common prefix with me. neighbor_add(commonTree, scrTreeNode->ipaddr); } else if(owner != 0 && neighbor_size(scrTreeNode->kids)>1) { //This is any stripe which I am owner for -- highest priority //Note: We only add it if it has at least 2 kids, as we can't push down our only //kid for a stripe we own. neighbor_add(ownerTree, scrTreeNode->ipaddr); } } //STEP 2: Pick a child with the shortest common prefix with this stripe // id, with a preference for the newest child. while(!neighbor_empty(commonTree)) { int commonPrefix=SS_BITS; int tmpCommon; scrChildren consider; neighbor_scrTree* scrTreeNode = neighbor_entry(my_trees, neighbor_random(commonTree)->ipaddr); neighbor_remove(commonTree, scrTreeNode->ipaddr); if(!neighbor_empty(scrTreeNode->kids)) { foreach_neighbor(neighbor_scrChildren*,scrChild,scrTreeNode->kids) { tmpCommon = common_prefix(scrTreeNode->ipaddr, scrChild->ipaddr); if(tmpCommon < commonPrefix) { commonPrefix = tmpCommon; neighbor_clear(consider); neighbor_add(consider, scrChild->ipaddr); } else if(tmpCommon == commonPrefix) { neighbor_add(consider, scrChild->ipaddr); } } if(!neighbor_empty(consider)) { parg->groupId = scrTreeNode->ipaddr; if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(consider, gchg->childId)) { parg->childId = gchg->childId; } else { parg->childId = neighbor_random(consider)->ipaddr; } return; } } else { //HUH? debug_macro("FIXME: ERROR: SS Stripe should only be set if size > 0, but tested wasn't.\n"); } } while(!neighbor_empty(ownerTree)) { //Step 3: If no common non-owned stripe pick a child with the shortest common prefix with //the owned stripe id, with a preference for the newest child. int commonPrefix=SS_BITS; int tmpCommon; scrChildren consider; neighbor_scrTree* scrTreeNode = neighbor_entry(my_trees, neighbor_random(ownerTree)->ipaddr); neighbor_remove(ownerTree, scrTreeNode->ipaddr); if(neighbor_size(scrTreeNode->kids) > 1) { foreach_neighbor(neighbor_scrChildren*,scrChild,scrTreeNode->kids) { tmpCommon = common_prefix(scrTreeNode->ipaddr, scrChild->ipaddr); if(tmpCommon < commonPrefix) { commonPrefix = tmpCommon; neighbor_clear(consider); neighbor_add(consider, scrChild->ipaddr); } else if(tmpCommon == commonPrefix) { neighbor_add(consider, scrChild->ipaddr); } } if(!neighbor_empty(consider)) { parg->groupId = scrTreeNode->ipaddr; if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(consider, gchg->childId)) { parg->childId = gchg->childId; } else { parg->childId = neighbor_random(consider)->ipaddr; } return; } } else { //HUH? debug_macro("FIXME: ERROR: We didn't add this stripe unless it had enough children!\n"); } } debug_macro("FIXME: ERROR: How can we be forwarding too much and still have reached this point?\n"); } int compute_outdegree() { int outdegree = 0; int owner; foreach_neighbor(neighbor_scrTree*,scrTree,my_trees) { downcall_ext(owner,TEST_OWNER,(void*)&scrTree->ipaddr); //NOTE: Broken if non-source is the creator. -- possible -- what about accounting in create_group? if(source_ == me) { outdegree -= owner; } outdegree += neighbor_size(scrTree->kids); } outdegree += NUM_STRIPES * neighbor_size(my_sessions); //debug_macro("computed outdegree: %d\n", outdegree); return outdegree; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -