📄 scribems.mac
字号:
if(field(group_id) != dest && (tempsess->parent==UNDEFINED || tempsess->parent != from) ) { //tempsess->parent = from; //NOTE: This was to handle the case where a join timed out -- and we were taken by two parents. debug_macro("Scribe: WARNING: Received data from wrong parent. (group: %.8x, expected: %.8x, observed: %.8x)\n", field(group_id), tempsess->parent, from); } foreach_neighbor (neighbor_children*, afriend, tempsess->kids) { routeIP_data( afriend->ipaddr, field(group_id), field(priority), msg, size, field(priority) ); } } } else { //note: else message is dropped, and a leave is returned to the sender! //WARNING: What if a failure caused this, and I just haven't been notified? if(field(group_id) != dest) routeIP_leave(from, field(group_id), 0, 0, -1); } } } //Message: pushdown //Action: recv // //1 - Pick sibling to join under based on symantics below. //2 - Reset session state pertinent to parents. //3 - Send a join to that sibling // //Known Problems: See FIXME recv pushdown { if(dest != myhash || from == myhash) { debug_macro("Scribe: ERROR: A pushdown message was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",dest,myhash,from,myhash); } else { children siblings = field(siblings); //Symantics: // 1- Upcall to upper layer. If not subscribed, or if they wish, return < 0, // which implies to perform standard scribe pushdown. // 2- Standard Scribe Pushdown: probe for lowest latency // 3- returnCode = 0, chosen node is set in the arg int pushdownNode=-1; int returnCode=-1; if(neighbor_query(my_sessions, field(group_id))) { //defined in scribe_ext.h pushdown_selector_arg *arg = new pushdown_selector_arg(); arg->groupId = field(group_id); arg->size = neighbor_size(siblings); if(arg->size > 0) { arg->siblings = new int[arg->size]; int i=0; foreach_neighbor( neighbor_children*, achild, siblings) { arg->siblings[i++] = achild->ipaddr; } } else { arg->siblings = NULL; } returnCode = upcall_ext(PUSHDOWN_SELECTOR, (void*)arg); //Three values for return code: //-1 - Let lower layer pick //0 - I've chosen, result is in arg->chosen //1 - don't send any join messages, I've taken care of it. if(returnCode == 0) { pushdownNode = arg->chosen; } if(arg->size > 0 && arg->siblings) { delete[] arg->siblings; } delete arg; if(returnCode == -1 && neighbor_size(siblings) > 0) { //FIXME: This should be replaced with a proper probe neighbor_children* mysibling = neighbor_random(siblings); pushdownNode = mysibling->ipaddr; } else if(returnCode == -1) { //No node to join. returnCode=2; } //Update these values anyway. If no node is rejoined -- this becomes a new timeout timer. neighbor_session* tempsess; tempsess = neighbor_entry(my_sessions, field(group_id)); tempsess->parent = UNDEFINED; debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", field(group_id), myhash, myhash); neighbor_clear(tempsess->pathtoroot); tempsess->ticks_since_heartbeat=0; tempsess->tick_count_for_renew=0; if(returnCode <= 0) { routeIP_join(pushdownNode,field(group_id), 0, 0, -1); } else { } } } } //Message: heartbeat //Action: recv // //When receiving a heartbeat //1 - clear heartbeat state //2 - update pathtoroot //3 - update parent // //Known Problems: How do we add optimization #1: Cycle detection only when needed? recv heartbeat { if(dest != myhash || from == myhash) { debug_macro("Scribe: ERROR: A heartbeat message (for %.8x) was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",field(group_id),dest,myhash,from,myhash); } else if(neighbor_query(field(pathtoroot),myhash)) { debug_macro("Scribe: ERROR: A heartbeat message (for %.8x) was received and I was in the path to root!\n",field(group_id),dest,myhash,from,myhash); } else { //FIXME: Add optimisation #1, cycle detection only when needed? int groupId = field(group_id); #ifdef HEARTBEAT_TRACE debug_macro("Scribe: Received heartbeat for group %.8x\n",groupId); #endif if(neighbor_query(my_sessions, groupId)) { neighbor_session* tempsess = neighbor_entry(my_sessions, groupId); tempsess->ticks_since_heartbeat = 0; neighbor_clear(tempsess->pathtoroot); foreach_neighbor(neighbor_uppath*,tempnode,field(pathtoroot)) { neighbor_add(tempsess->pathtoroot,tempnode->ipaddr); } if(!neighbor_query(tempsess->pathtoroot,from)) { //NOTE: This if statement should essentially be an if(true), but // I added this for safety, because of some long paths seen. neighbor_add(tempsess->pathtoroot,from); } if(groupId != dest && (tempsess->parent ==UNDEFINED || tempsess->parent != from) ) { //FIXME: Consider printing a warning when parent changes unexpectedly. tempsess->parent = from; debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupId, myhash, from); } } else { //FIXME: Nothing for now -- consider sending a leave message. debug_macro("Scribe: WARNING: Received heartbeat for group %.8x, but not subscribed.\n",groupId); } } } //Message: anycast //Action: forward // //Q: Do we need to hang on to Black nodes? Prior experience said stale state was causing routing loops. // Elimination of the spare capacity may solve this, but the general case is still problematic. // //Step 1: If I am a "white" node (first time), add my children //Step 2: Pick the next node in line to process the msg. //Step 3: Route to parent, if any //Step 4: Fall back to routing message toward key. forward anycast { visited dfsState = field(dfsState); if(from == myhash) { return 0; } #ifdef ANYCAST_TRACE pthread_mutex_lock(&debug_lock); sprintf(trace_buf_,"DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState)); trace_print(); int i=0; foreach_neighbor(neighbor_visited*,tmp,dfsState) { sprintf(trace_buf_+(14*i), "(sib:%.8x())",tmp->ipaddr,tmp->color); i++; } sprintf(trace_buf_+(14*i),"\n"); trace_print(); pthread_mutex_unlock(&debug_lock); #endif if(neighbor_query(my_sessions, field(group_id))) { neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id)); //Step 1: If I am a "white" node (first time), add my children if(!neighbor_query(dfsState, myhash) || neighbor_info(dfsState, myhash, color)==0) { if(!neighbor_query(dfsState, myhash)) { neighbor_add(dfsState, myhash); } neighbor_info(dfsState, myhash, color) = 1; foreach_neighbor(neighbor_children*,achild,tempsess->kids) { if(!neighbor_query(dfsState,achild->ipaddr)) { neighbor_add(dfsState, achild->ipaddr); } } } } //Step 2: Pick the next node in line to process the msg. foreach_neighbor(neighbor_visited*,next,dfsState) { if(next->color != 2 && next->ipaddr==myhash && (!neighbor_query(my_sessions, field(group_id))) || from==myhash) { next->color = 2; } else if(next->color != 2) { if(next->ipaddr == myhash) { next->color=2; //Visit this node. #ifdef ANYCAST_TRACE debug_macro("Scribe: All children visited. Visiting this node. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id)); #endif if(upcall_forward(0, msg, size, COMM_TYPE_ANYCAST)) { return 1; } } else { //Send message #ifdef ANYCAST_TRACE debug_macro("Scribe: Forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr); #endif routeIP_anycast(next->ipaddr, field(group_id), field(from_addr), dfsState, msg, size, -1); #ifdef ANYCAST_TRACE debug_macro("Scribe: Done forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr); #endif return 1; } } } //Step 3: Route to parent, if any //NOTE: dfsState is eitehr all black, or empty if(neighbor_query(my_sessions, field(group_id))) { neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id)); if(tempsess->parent==UNDEFINED) { //If we don't know our parent, we simply forward it toward the group ID. //FIXME: Is this the right thing to do, or do we drop it? #ifdef ANYCAST_TRACE debug_macro("Scribe: Don't know parent, so routing towards groupid. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id)); #endif route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1); return 1; } else { #ifdef ANYCAST_TRACE debug_macro("Scribe: Forwarding to parent %.8x. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",tempsess->parent,from,field(from_addr),field(group_id)); #endif routeIP_anycast(tempsess->parent, field(group_id), field(from_addr), dfsState, msg, size, -1); return 1; } } //Step 4: Fall back to routing message toward key. #ifdef ANYCAST_TRACE debug_macro("Scribe: All options failed. Forwarding using Pastry routing. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id)); #endif route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1); return 1; } //Message: anycast //Action: recv // //Print an error message -- the anycast is delivered to the root of the tree. recv anycast { //This is the case where an anycast message was not handled anywhere. An error message is //printed.#ifdef ANYCAST_TRACE debug_macro("WARNING: Scribe: recv anycast. No node accepted anycast message. dropping. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));#endif upcall_deliver(msg, size, COMM_TYPE_ANYCAST); } //Timer: hbTimer // //Perform periodic maintenance actions. //Steps 1 and 2 only apply to groups we aren't the root for. //STEP 1: Check to see if we're receiving data on the tree. //STEP 2: See if we need to renew our interest to our parent //STEP 3: Check for children who seem to have lost interest in the group //STEP 3b: Increment ticks_since_join. //STEP 4: Send a heartbeat to all children if no data has been sent. // //Q: A good API Error could resolve this. But what about scribe-liveness, not just network liveness? timer hbTimer { static int timerUid = 0; char pathId[40]; sprintf(pathId, "scribe_hbtimer_%.8x_%.8x", me, timerUid++); neighbor_children *achild; foreach_neighbor( neighbor_session*, tempsess, my_sessions) { //Steps 1 and 2 only apply to groups we aren't the root for. if(!tempsess->isRoot) { //STEP 1: Check to see if we're receiving data on the tree. if(tempsess->ticks_since_heartbeat > HB_MISS_REPAIR) { debug_macro("Scribe: WARNING No heartbeat received (ticks_since_hb=%d), rejoining group (%.8x)\n",tempsess->ticks_since_heartbeat,tempsess->ipaddr); route_join(tempsess->ipaddr, tempsess->ipaddr, 0, 0, -1); tempsess->ticks_since_heartbeat = 0; tempsess->tick_count_for_renew = 0; tempsess->parent = UNDEFINED; debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", tempsess->ipaddr, myhash, myhash); neighbor_clear(tempsess->pathtoroot); } else { tempsess->ticks_since_heartbeat++;#ifdef HEARTBEAT_TRACE debug_macro("Scribe: Incremented ticks_since_hb=%d for group (%.8x)\n",tempsess->ticks_since_heartbeat,tempsess->ipaddr);#endif //STEP 2: See if we need to renew our interest to our parent if(tempsess->tick_count_for_renew > RENEW_TICKS) { if(tempsess->parent != UNDEFINED) {#ifdef HEARTBEAT_TRACE debug_macro("Scribe: Time to renew interest to parent (%.8x). tick_count_for_renew=%d for group (%.8x)\n",tempsess->parent,tempsess->tick_count_for_renew,tempsess->ipaddr);#endif //If we have a parent, send a renewal message to that parent. routeIP_join(tempsess->parent, tempsess->ipaddr, 0, 0, -1); } else { //Oddball case. Should only happen if RENEW_TICKS < HB_MISS_REPAIR. Basically retries a join.#ifdef HEARTBEAT_TRACE debug_macro("Scribe: WARNING: Oddball case: Time to renew interest to parent, but no known parent. tick_count_for_renew=%d for group (%.8x)\n",tempsess->tick_count_for_renew,tempsess->ipaddr);#endif route_join(tempsess->ipaddr, tempsess->ipaddr, 0, 0, -1); } tempsess->tick_count_for_renew=0; } else { tempsess->tick_count_for_renew++;#ifdef HEARTBEAT_TRACE debug_macro("Scribe: incremented tick_count_for_renew=%d for group (%.8x)\n",tempsess->tick_count_for_renew,tempsess->ipaddr);#endif } } } //Check if heartbeats need to be sent. bool sendHeartbeats=false; if(tempsess->tick_count_without_data > HB_TICKS) sendHeartbeats = true; // debug_macro("Scribe: hbTimer: sess=%.8x sendHeartbeats=%d\n",tempsess->ipaddr,sendHeartbeats); foreach_neighbor( neighbor_children*, achild, tempsess->kids) { //STEP 3: Check for children who seem to have lost interest in the group if(achild->ticks_since_join > DROP_CHILD_TICKS) { // debug_macro("Scribe: hbTimer: sess=%.8x dropping dead child %.8x\n",tempsess->ipaddr,achild->ipaddr); group_change *gchg = new group_change; gchg->groupId = tempsess->ipaddr; gchg->childId = achild->ipaddr; gchg->newJoin = false; upcall_ext(GROUP_CHANGE, (void*)gchg); delete gchg; neighbor_remove(tempsess->kids, achild->ipaddr); } else { //STEP 3b: Increment ticks_since_join. achild->ticks_since_join++; //STEP 4: Send a heartbeat to all children if no data has been sent. if(sendHeartbeats) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -