📄 splitstream.mac
字号:
trace_print(); #endif } } } //forward joinReq // //This method handles processing of messages sent requesting //to join a stripe. Comments of process are inline. forward joinReq { #ifdef SPARE_CAPACITY_TRACE sprintf(trace_buf_,"In forward joinReq (dest:%.8x,from:%.8x)\n",dest,from); trace_print(); #endif //CHECK 1: Do we have spare capacity? int outdegree = compute_outdegree(); if(outdegree < FORWARD_STRIPES) { //CHECK 2: Do we forward for this tree? if(neighbor_query(my_trees, dest)) { //CHECK 3: Is this node in my path to the root? //NOTE: Assume Scribe handles routing loop checking, returns 1 if a loop is found!!! child_add_arg* carg = new child_add_arg; carg->groupId = dest; carg->childId = from; int success; downcall_ext(success, ADD_CHILD, carg); delete carg; #ifdef SPARE_CAPACITY_TRACE sprintf(trace_buf_,"Accepted child if(%d!=0). (dest:%.8x,from:%.8x)\n",!success,dest,from); trace_print(); #endif if(success==2) { #ifdef SPARE_CAPACITY_TRACE sprintf(trace_buf_,"As child is already in group, terminating anycast joinReq. (dest:%.8x,from:%.8x)\n",dest,from); trace_print(); #endif return 1; } else if(success==0) { //ADDING CHILD TO LOCAL SCRIBE STATE, BECAUSE SCRIBE WON'T UPCALL FROM THE DOWNCALL. neighbor_add(neighbor_info(my_trees,dest,kids),from); return 1; } else { //KEEP ON FORWARDING return 0; } } else { //THIS SHOULDN'T HAPPEN IN PRACTICE. #ifdef SPARE_CAPACITY_TRACE sprintf(trace_buf_,"WARNING: Sorry, don't forward this stripe. (dest:%.8x,from:%.8x)\n",dest,from); trace_print(); #endif } } else { #ifdef SPARE_CAPACITY_TRACE sprintf(trace_buf_,"Received joinReq & no forward capacity. (dest:%.8x,from:%.8x)\n",dest,from); trace_print(); #endif } return 0; } //recv joinReq // //When a joinReq message is recieved, this means that no-one stoped the forwarding. The node receiving //the message should be the root of the stripe tree, and this represents a failure to join. //One question is what should we do now? This implementation just prints an error message, and goes //on. recv joinReq { //NOTE: This happens when an anycast message is delivered to the root of an anycast tree, in failure. sprintf(trace_buf_,"ERROR: SplitStream forest creation failed! joinReq found no parent (dest:%.8x,from:%.8x)\n",dest,from); trace_print(); } //API downcall_ext // //No downcalls are presently supported. API downcall_ext { //Takes int operation, void* arg switch(operation) { default: sprintf(trace_buf_, "Unrecognized Downcall Extension Called (operation: %d)\n", operation); trace_print(); return -1; //CALL UNRECOGNIZED } return 0; } //API upcall_ext // //The following upcalls are supported: // - PUSHDOWN_SELECTOR: When this node is pushed down from its parent, it is sent a list of // siblings. Scribe then does an extensible upcall to give protocols the option to select // the node to attempt to join at. // - GROUP_CHANGE: upcall_notify doesn't allow scribe to notify splitstream of changes to the // children of sessions -- so instead it calls upcall_ext with GROUP_CHANGE to do so. As a // results of this, splitstream might iniitiate a pushdown, or just save the state. API upcall_ext { //Takes int operation, void* arg switch(operation) { case PUSHDOWN_SELECTOR: { //This roughly matches the description in the SOSP paper. pushdown_selector_arg* psArg = (pushdown_selector_arg*)arg; bool retset=false; int retval; #ifdef PUSHDOWN_TRACE sprintf(trace_buf_,"DEBUG: In PUSHDOWN_SELECTOR (%.8x)...\n",psArg->groupId); trace_print(); #endif if(psArg->size > 0) { scrChildren set; int i; for(i=0; i<psArg->size; i++) { //Look for a node with a common prefix with the group id. if(common_prefix(psArg->groupId, psArg->siblings[i]) > 0) { neighbor_add(set, psArg->siblings[i]); } } if(neighbor_size(set) > 0) { //pick randomly from the set of nodes with a common prefix. retval = neighbor_random(set)->ipaddr; retset=true; neighbor_clear(set); } } if(!retset) { //If there was no node to join, then send an anycast message, and tell Scribe not to try a join. #ifdef PUSHDOWN_TRACE sprintf(trace_buf_,"DEBUG: Pushdown Selector couldn't join sibling, sending anycast (for group %.8x)\n",psArg->groupId); trace_print(); #endif anycast_joinReq(psArg->groupId, 0, 0, -1); return 1; } else { #ifdef PUSHDOWN_TRACE sprintf(trace_buf_,"DEBUG: Pushdown Selector selecting (%.8x for group %.8x)\n",retval,psArg->groupId); trace_print(); #endif psArg->chosen = retval; return 0; } } break; case GROUP_CHANGE: { //dump_state(); group_change* garg = (group_change*)arg; if(neighbor_query(my_trees, garg->groupId)) { neighbor_scrTree* scrTree = neighbor_entry(my_trees, garg->groupId); if(garg->newJoin) { if(!neighbor_query(scrTree->kids, garg->childId)) { neighbor_add(scrTree->kids, garg->childId); int outdegree = compute_outdegree(); if(outdegree > FORWARD_STRIPES + 1) { //For debugging purposes -- if this happens alert the user sprintf(trace_buf_, "ERROR: Outdegree too high (%d)! How did it get this high?\n", outdegree); trace_print(); } if(outdegree > FORWARD_STRIPES) { //Need to push a node down. pushdown_arg *parg = new pushdown_arg; select_pushdown_child(parg, garg); //This function implements the pushdown selection as described in the paper. if(neighbor_query(my_trees,parg->groupId)) { neighbor_scrTree *atree = neighbor_entry(my_trees,parg->groupId); if(neighbor_query(atree->kids, parg->childId)) { //NOTE: Lower layer doesn't do an upcall from the downcall. neighbor_remove(atree->kids, parg->childId); int trash; downcall_ext(trash, PUSHDOWN, (void*)parg); } } delete parg; } } else { //FIXME: What does this mean? } } else { if(neighbor_query(scrTree->kids, garg->childId)) { neighbor_remove(scrTree->kids, garg->childId); } else { //FIXME: What does this mean? } } } } break; default: { sprintf(trace_buf_, "Unrecognized Upcall Extension Called (operation: %d)\n", operation); trace_print(); return -1; //CALL UNRECOGNIZED } } return 0; }}routines { //Current key generation just flips the most significant digit. //Question: what about overlapping keys (i.e. if there were multiple SS sessions?) void generate_keys(int splitstream_id, int* scribe_ids) { for(int i=0; i<NUM_STRIPES; i++) { scribe_ids[i] = (splitstream_id + (i << SS_BITS-STRIPE_SS_BITS) );// % (1 << SS_BITS); } } //Determines length of the common prefix between id1 and id2 int common_prefix(int id1, int id2) { int i; for(i=0; i<SS_BITS/STRIPE_SS_BITS && ((id1>>(SS_BITS-((i+1)*STRIPE_SS_BITS))) == (id2>>(SS_BITS-((i+1)*STRIPE_SS_BITS))) ); i++); return i; } //Follows the description in the SOSP paper to decide which child to push down. void select_pushdown_child(pushdown_arg* parg, group_change* gchg) { int common; int owner; scrTree commonTree; //Holds the common stripes. Although there should only be one stripe with a cmomon prefix in the general case, what if there are multiple SS trees running. scrTree ownerTree; //STEP 1: Select a child from any tree which does not share a common prefix // with my own hash. And for which I am not root. foreach_neighbor(neighbor_scrTree*,scrTreeNode,my_trees) { common = common_prefix(myhash, scrTreeNode->ipaddr); downcall_ext(owner,TEST_OWNER,(void*)&scrTreeNode->ipaddr); if(common == 0 && owner == 0) { if(!neighbor_empty(scrTreeNode->kids)) { parg->groupId = scrTreeNode->ipaddr; if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(scrTreeNode->kids, gchg->childId)) { parg->childId = gchg->childId; } else { parg->childId = neighbor_random(scrTreeNode->kids)->ipaddr; } return; } } else if(common != 0 && owner == 0 && !neighbor_empty(scrTreeNode->kids)) { //This is any stripe with a common prefix with me. neighbor_add(commonTree, scrTreeNode->ipaddr); } else if(owner != 0 && neighbor_size(scrTreeNode->kids)>1) { //This is any stripe which I am owner for -- highest priority //Note: We only add it if it has at least 2 kids, as we can't push down our only //kid for a stripe we own. neighbor_add(ownerTree, scrTreeNode->ipaddr); } } //STEP 2: Pick a child with the shortest common prefix with this stripe // id, with a preference for the newest child. while(!neighbor_empty(commonTree)) { int commonPrefix=SS_BITS; int tmpCommon; scrChildren consider; neighbor_scrTree* scrTreeNode = neighbor_entry(my_trees, neighbor_random(commonTree)->ipaddr); neighbor_remove(commonTree, scrTreeNode->ipaddr); if(!neighbor_empty(scrTreeNode->kids)) { foreach_neighbor(neighbor_scrChildren*,scrChild,scrTreeNode->kids) { tmpCommon = common_prefix(scrTreeNode->ipaddr, scrChild->ipaddr); if(tmpCommon < commonPrefix) { commonPrefix = tmpCommon; neighbor_clear(consider); neighbor_add(consider, scrChild->ipaddr); } else if(tmpCommon == commonPrefix) { neighbor_add(consider, scrChild->ipaddr); } } if(!neighbor_empty(consider)) { parg->groupId = scrTreeNode->ipaddr; if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(consider, gchg->childId)) { parg->childId = gchg->childId; } else { parg->childId = neighbor_random(consider)->ipaddr; } return; } } else { //HUH? sprintf(trace_buf_,"FIXME: ERROR: SS Stripe should only be set if size > 0, but tested wasn't.\n"); trace_print(); } } while(!neighbor_empty(ownerTree)) { //Step 3: If no common non-owned stripe pick a child with the shortest common prefix with //the owned stripe id, with a preference for the newest child. int commonPrefix=SS_BITS; int tmpCommon; scrChildren consider; neighbor_scrTree* scrTreeNode = neighbor_entry(my_trees, neighbor_random(ownerTree)->ipaddr); neighbor_remove(ownerTree, scrTreeNode->ipaddr); if(neighbor_size(scrTreeNode->kids) > 1) { foreach_neighbor(neighbor_scrChildren*,scrChild,scrTreeNode->kids) { tmpCommon = common_prefix(scrTreeNode->ipaddr, scrChild->ipaddr); if(tmpCommon < commonPrefix) { commonPrefix = tmpCommon; neighbor_clear(consider); neighbor_add(consider, scrChild->ipaddr); } else if(tmpCommon == commonPrefix) { neighbor_add(consider, scrChild->ipaddr); } } if(!neighbor_empty(consider)) { parg->groupId = scrTreeNode->ipaddr; if(gchg->groupId == scrTreeNode->ipaddr && neighbor_query(consider, gchg->childId)) { parg->childId = gchg->childId; } else { parg->childId = neighbor_random(consider)->ipaddr; } return; } } else { //HUH? sprintf(trace_buf_,"FIXME: ERROR: We didn't add this stripe unless it had enough children!\n"); trace_print(); } } sprintf(trace_buf_,"FIXME: ERROR: How can we be forwarding too much and still have reached this point?\n"); trace_print(); } int compute_outdegree() { int outdegree = 0; int owner; foreach_neighbor(neighbor_scrTree*,scrTree,my_trees) { downcall_ext(owner,TEST_OWNER,(void*)&scrTree->ipaddr); //NOTE: Broken if non-source is the creator. -- possible -- what about accounting in create_group? if(source_ == me) { outdegree -= owner; } outdegree += neighbor_size(scrTree->kids); } outdegree += NUM_STRIPES * neighbor_size(my_sessions); return outdegree; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -