📄 scribe.mac
字号:
else if(tempsess->parent != UNDEFINED) { route_data(tempsess->parent, field(group_id), COMM_TYPE_COLLECT, field(priority), msg, size, field(priority)); } else { debug_macro("WARNING: Collect msg dropped, no parent\n"); } } } } //Message: pushdown //Action: recv // //1 - Pick sibling to join under based on symantics below. //2 - Reset session state pertinent to parents. //3 - Send a join to that sibling // //Known Problems: See FIXME recv pushdown { if(dest != myhash || from == myhash) { debug_macro(trace_buf_,"Scribe: ERROR: A pushdown message was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",dest,myhash,from,myhash); } else { children siblings = field(siblings); //Symantics: // 1- Upcall to upper layer. If not subscribed, or if they wish, return < 0, // which implies to perform standard scribe pushdown. // 2- Standard Scribe Pushdown: probe for lowest latency // 3- returnCode = 0, chosen node is set in the arg int pushdownNode=-1; int returnCode=-1; if(neighbor_query(my_sessions, field(group_id))) { //Only accept pushdown if it's from the expected parent. if(neighbor_info(my_sessions,field(group_id),parent) == from) { //FIXME: This if(true) should be if(upper layer is subscribed to PUSHDOWN_SELECTOR messages) if(true) { //defined in scribe_ext.h pushdown_selector_arg *arg = new pushdown_selector_arg(); arg->groupId = field(group_id); arg->size = neighbor_size(siblings); if(arg->size > 0) { arg->siblings = new int[arg->size]; int i=0; foreach_neighbor( neighbor_children*, achild, siblings) { arg->siblings[i++] = achild->ipaddr; } } else { arg->siblings = NULL; } returnCode = upcall_ext(PUSHDOWN_SELECTOR, (void*)arg); //Three values for return code: //-1 - Let lower layer pick //0 - I've chosen, result is in arg->chosen //1 - don't send any join messages, I've taken care of it. if(returnCode == 0) { pushdownNode = arg->chosen; } if(arg->size > 0 && arg->siblings) { delete[] arg->siblings; } delete arg; } if(returnCode == -1 && neighbor_size(siblings) > 0) { //FIXME: This should be replaced with a proper probe neighbor_children* mysibling = neighbor_random(siblings); pushdownNode = mysibling->ipaddr; } else if(returnCode == -1) { //No node to join. returnCode=2; } //Update these values anyway. If no node is rejoined -- this becomes a new timeout timer. dojoin(field(group_id)); neighbor_session* tempsess; tempsess = neighbor_entry(my_sessions, field(group_id)); unwatch(tempsess->parent); tempsess->parent = UNDEFINED; debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", field(group_id), myhash, myhash); neighbor_clear(tempsess->pathtoroot); if(returnCode <= 0) { routeIP_join(pushdownNode,field(group_id), 0, 0, -1); } } else { debug_macro("WARNING: Received pushdown from non-parent. Ignoring (gr: %.8x fr: %.8x pr: %.8x)\n",field(group_id),from,neighbor_info(my_sessions,field(group_id),parent)); //sprintf(trace_buf_,"WARNING: Received pushdown from non-parent. Ignoring (gr: %.8x fr: %.8x pr: %.8x)\n",field(group_id),from,neighbor_info(my_sessions,field(group_id),parent)); //trace_print(); } } } } //Message: membership //Action: recv // //When receiving a membership //1 - clear join timer state //2 - update pathtoroot //3 - update parent //4 - send membership to children // //Known Problems: How do we add optimization #1: Cycle detection only when needed? recv membership { if(dest != myhash || from == myhash) { debug_macro("Scribe: ERROR: A membership message (for %.8x) was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",field(group_id),dest,myhash,from,myhash); } else if(neighbor_query(field(pathtoroot),myhash)) { debug_macro("Scribe: ERROR: A membership message (for %.8x) was received and I was in the path to root!\n",field(group_id),dest,myhash,from,myhash); } else { //FIXME: Add optimisation #1, cycle detection only when needed? int groupId = field(group_id); #ifdef MEMBERSHIP_TRACE debug_macro("Scribe: Received membership for group %.8x\n",groupId); #endif if(neighbor_query(my_sessions, groupId)) { neighbor_session* tempsess = neighbor_entry(my_sessions, groupId); neighbor_clear(tempsess->pathtoroot); foreach_neighbor(neighbor_uppath*,tempnode,field(pathtoroot)) { neighbor_add(tempsess->pathtoroot,tempnode->ipaddr); } if(!neighbor_query(tempsess->pathtoroot,from)) { //NOTE: This if statement should essentially be an if(true), but // I added this for safety, because of some long paths seen. neighbor_add(tempsess->pathtoroot,from); } if(groupId != dest && (tempsess->parent ==UNDEFINED || tempsess->parent != from) ) { //FIXME: Consider printing a warning when parent changes unexpectedly. unwatch(tempsess->parent); if(tempsess->parent != UNDEFINED) { routeIP_leave(tempsess->parent,tempsess->ipaddr,0,0,-1); } debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupId, myhash, from); tempsess->parent = from; watch(tempsess->parent); } donejoin(tempsess->ipaddr); foreach_neighbor(neighbor_children*,achild,tempsess->kids) { routeIP_membership(achild->ipaddr,tempsess->ipaddr,tempsess->pathtoroot,0,0,-1); } } else { //FIXME: Nothing for now -- consider sending a leave message. debug_macro("Scribe: WARNING: Received membership for group %.8x, but not subscribed.\n",groupId); } } } //Message: anycast //Action: forward // //Q: Do we need to hang on to Black nodes? Prior experience said stale state was causing routing loops. // Elimination of the spare capacity may solve this, but the general case is still problematic. // //Step 1: If I am a "white" node (first time), add my children //Step 2: Pick the next node in line to process the msg. //Step 3: Route to parent, if any //Step 4: Fall back to routing message toward key. forward anycast { visited dfsState = field(dfsState); if(from == myhash) { return 0; } #ifdef ANYCAST_TRACE debug_macro("DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState)); //sprintf(trace_buf_,"DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState)); //trace_print(); //int i=0; //foreach_neighbor(neighbor_visited*,tmp,dfsState) { // sprintf(trace_buf_+(14*i), "(sib:%.8x())",tmp->ipaddr,tmp->color); // i++; //} //sprintf(trace_buf_+(14*i),"\n"); //trace_print(); #endif if(neighbor_query(my_sessions, field(group_id))) { neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id)); //Step 1: If I am a "white" node (first time), add my children if(!neighbor_query(dfsState, myhash) || neighbor_info(dfsState, myhash, color)==0) { if(!neighbor_query(dfsState, myhash)) { neighbor_add(dfsState, myhash); } neighbor_info(dfsState, myhash, color) = 1; foreach_neighbor(neighbor_children*,achild,tempsess->kids) { if(!neighbor_query(dfsState,achild->ipaddr)) { neighbor_add(dfsState, achild->ipaddr); } } } } //Step 2: Pick the next node in line to process the msg. foreach_neighbor(neighbor_visited*,next,dfsState) { if(next->color != 2 && next->ipaddr==myhash && (!neighbor_query(my_sessions, field(group_id))) || from==myhash) { next->color = 2; } else if(next->color != 2) { if(next->ipaddr == myhash) { next->color=2; //Visit this node. #ifdef ANYCAST_TRACE debug_macro("Scribe: All children visited. Visiting this node. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id)); #endif if(upcall_forward(0, msg, size, COMM_TYPE_ANYCAST)) { return 1; } } else { //Send message #ifdef ANYCAST_TRACE debug_macro("Scribe: Forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr); #endif routeIP_anycast(next->ipaddr, field(group_id), field(from_addr), dfsState, msg, size, -1); #ifdef ANYCAST_TRACE debug_macro("Scribe: Done forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr); #endif return 1; } } } //Step 3: Route to parent, if any //NOTE: dfsState is eitehr all black, or empty if(neighbor_query(my_sessions, field(group_id))) { neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id)); if(tempsess->parent==UNDEFINED) { //If we don't know our parent, we simply forward it toward the group ID. //FIXME: Is this the right thing to do, or do we drop it? #ifdef ANYCAST_TRACE debug_macro("Scribe: Don't know parent, so routing towards groupid. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id)); #endif route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1); return 1; } else { #ifdef ANYCAST_TRACE debug_macro("Scribe: Forwarding to parent %.8x. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",tempsess->parent,from,field(from_addr),field(group_id)); #endif routeIP_anycast(tempsess->parent, field(group_id), field(from_addr), dfsState, msg, size, -1); return 1; } } //Step 4: Fall back to routing message toward key. #ifdef ANYCAST_TRACE debug_macro("Scribe: All options failed. Forwarding using Pastry routing. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id)); #endif route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1); return 1; } //Message: anycast //Action: recv // //Print an error message -- the anycast is delivered to the root of the tree. recv anycast { //This is the case where an anycast message was not handled anywhere. An error message is //returned. //Return error to source. //FIXME: Is this the right thing to do or do I drop it? //routeIP_anycastError(field(from_addr),field(group_id), 0, 0, -1);#ifdef ANYCAST_TRACE debug_macro("WARNING: Scribe: recv anycast. No node accepted anycast message. dropping. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));#endif upcall_deliver(msg, size, COMM_TYPE_ANYCAST); } //Timer: joinTimer // //Ensure joins complete successfully. // //Process: Iterate over join list, incrementing delay for // each. If delay > JOIN_TIMEOUT_INT, issue a new // join. timer joinTimer { foreach_neighbor( neighbor_joining*, tempjoin, outstanding_joins) { ASSERT(tempjoin->ipaddr != UNDEFINED); tempjoin->delay++; if(tempjoin->delay > JOIN_TIMEOUT_INT) { debug_macro("Scribe: WARNING No membership received (delay=%d), rejoining group (%.8x)\n",(int)tempjoin->delay,tempjoin->ipaddr); //sprintf(trace_buf_,"Scribe: WARNING No membership received (delay=%d), rejoining group (%.8x)\n",tempjoin->delay,tempjoin->ipaddr); //trace_print(); tempjoin->delay=0; route_join(tempjoin->ipaddr, tempjoin->ipaddr, 0, 0, -1); } else {#ifdef MEMBERSHIP_TRACE debug_macro("Scribe: Incremented delay=%d for group (%.8x)\n",(int)tempjoin->delay,tempjoin->ipaddr); //sprintf(trace_buf_,"Scribe: Incremented delay=%d for group (%.8x)\n",tempjoin->delay,tempsess->ipaddr); //trace_print();#endif } } if(neighbor_size(outstanding_joins)>0) { timer_resched(joinTimer, JOIN_TIMER_PERIOD); } } //Timer: printer // //prints out current state, and calls dump_state(). //Also -- prints REPLAY BANDWIDTH timer printer {#ifdef SCRIBE_TRACE if(0) { debug_macro("Scribe: Printer timer displaying summary Scribe state:\n"); foreach_neighbor(neighbor_session*,tempsess,my_sessions) { debug_macro("Scribe session: %.8x, with %d kids.\n",tempsess->ipaddr,neighbor_size(tempsess->kids)); } debug_macro("Scribe: Printer timer dumping state:\n"); dump_state(); } if(1) { foreach_neighbor(neighbor_session*,tempsess,my_sessions) { if(tempsess->isRoot) { debug_macro("REPLAY session: %.8x, parent: client0.\n",tempsess->ipaddr); } else { debug_macro("REPLAY session: %.8x, parent: %.8x.\n",tempsess->ipaddr,tempsess->parent); } } }#endif //FIXME: Should I be using globalmacedon? extern MACEDON_Agent *globalmacedon; if (globalmacedon != this) // am i the highest layer return; if ( ( parameters.getint("streaming_time") == -1.0 || curtime > time_booted + parameters.getint("streaming_time")) ) printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", get_hostname(), Scheduler::instance().clock(), pthread_self(), (int) master.get_value(), 0, // need to put in control bw (int) master_useful.get_value(), (int) master.get_value(), (int) master_useful.get_value(), 0 ); } //API error // //This is the result of the failures of "watched" nodes. //Iterate over sessions -- if it is a parent, send it a leave message, and //send a new join, //if it is a child, send it a pushdown message, finally remove it from //the watched group. // //Q: But what about scribe-liveness, not just network liveness? API error { if(type == ERROR_NEIGHBOR_FAILURE) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -