⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribems.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
          if(field(group_id) != dest && (tempsess->parent==UNDEFINED || tempsess->parent != from) ) {            //tempsess->parent = from;            //NOTE: This was to handle the case where a join timed out -- and we were taken by two parents.              debug_macro("Scribe: WARNING: Received data from wrong parent. (group: %.8x, expected: %.8x, observed: %.8x)\n", field(group_id), tempsess->parent, from);          }          foreach_neighbor (neighbor_children*, afriend, tempsess->kids) {            routeIP_data( afriend->ipaddr, field(group_id), field(priority), msg, size, field(priority) );          }        }      } else { //note: else message is dropped, and a leave is returned to the sender!        //WARNING: What if a failure caused this, and I just haven't been notified?         if(field(group_id) != dest) routeIP_leave(from, field(group_id), 0, 0, -1);      }    }  }  //Message: pushdown  //Action: recv  //  //1 - Pick sibling to join under based on symantics below.  //2 - Reset session state pertinent to parents.  //3 - Send a join to that sibling  //  //Known Problems: See FIXME  recv pushdown {    if(dest != myhash || from == myhash) {      debug_macro("Scribe: ERROR: A pushdown message was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",dest,myhash,from,myhash);    } else {          children siblings = field(siblings);      //Symantics:       // 1- Upcall to upper layer.  If not subscribed, or if they wish, return < 0,       //    which implies to perform standard scribe pushdown.      // 2- Standard Scribe Pushdown: probe for lowest latency      // 3- returnCode = 0, chosen node is set in the arg      int pushdownNode=-1;      int returnCode=-1;      if(neighbor_query(my_sessions, field(group_id))) {        //defined in scribe_ext.h        pushdown_selector_arg *arg = new pushdown_selector_arg();        arg->groupId = field(group_id);        arg->size = neighbor_size(siblings);        if(arg->size > 0) {          arg->siblings = new int[arg->size];          int i=0;          foreach_neighbor( neighbor_children*, achild, siblings) {            arg->siblings[i++] = achild->ipaddr;          }        } else {          arg->siblings = NULL;        }        returnCode = upcall_ext(PUSHDOWN_SELECTOR, (void*)arg);        //Three values for return code:        //-1 - Let lower layer pick        //0  - I've chosen, result is in arg->chosen        //1 - don't send any join messages, I've taken care of it.        if(returnCode == 0) {          pushdownNode = arg->chosen;        }        if(arg->size > 0 && arg->siblings) {          delete[] arg->siblings;        }          delete arg;        if(returnCode == -1 && neighbor_size(siblings) > 0) {          //FIXME: This should be replaced with a proper probe          neighbor_children* mysibling = neighbor_random(siblings);          pushdownNode = mysibling->ipaddr;        } else if(returnCode == -1) {          //No node to join.          returnCode=2;        }        //Update these values anyway. If no node is rejoined -- this becomes a new timeout timer.        neighbor_session* tempsess;        tempsess = neighbor_entry(my_sessions, field(group_id));        tempsess->parent = UNDEFINED;        debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", field(group_id), myhash, myhash);        neighbor_clear(tempsess->pathtoroot);        tempsess->ticks_since_heartbeat=0;        tempsess->tick_count_for_renew=0;        if(returnCode <= 0) {          routeIP_join(pushdownNode,field(group_id), 0, 0, -1);        }	else {        }      }	    }    }  //Message: heartbeat  //Action: recv  //  //When receiving a heartbeat  //1 - clear heartbeat state  //2 - update pathtoroot  //3 - update parent  //  //Known Problems: How do we add optimization #1: Cycle detection only when needed?  recv heartbeat {    if(dest != myhash || from == myhash) {      debug_macro("Scribe: ERROR: A heartbeat message (for %.8x) was received and either dest(%.8x)!=myhash(%.8x) or from(%.8x)==myhash(%.8x)\n",field(group_id),dest,myhash,from,myhash);    } else if(neighbor_query(field(pathtoroot),myhash)) {      debug_macro("Scribe: ERROR: A heartbeat message (for %.8x) was received and I was in the path to root!\n",field(group_id),dest,myhash,from,myhash);    } else {          //FIXME: Add optimisation #1, cycle detection only when needed?      int groupId = field(group_id);      #ifdef HEARTBEAT_TRACE      debug_macro("Scribe: Received heartbeat for group %.8x\n",groupId);      #endif      if(neighbor_query(my_sessions, groupId)) {        neighbor_session* tempsess = neighbor_entry(my_sessions, groupId);        tempsess->ticks_since_heartbeat = 0;        neighbor_clear(tempsess->pathtoroot);        foreach_neighbor(neighbor_uppath*,tempnode,field(pathtoroot)) {          neighbor_add(tempsess->pathtoroot,tempnode->ipaddr);        }        if(!neighbor_query(tempsess->pathtoroot,from)) {          //NOTE: This if statement should essentially be an if(true), but          // I added this for safety, because of some long paths seen.          neighbor_add(tempsess->pathtoroot,from);        }        if(groupId != dest && (tempsess->parent ==UNDEFINED || tempsess->parent != from) ) {          //FIXME: Consider printing a warning when parent changes unexpectedly.          tempsess->parent = from;          debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupId, myhash, from);        }      } else {        //FIXME: Nothing for now -- consider sending a leave message.        debug_macro("Scribe: WARNING: Received heartbeat for group %.8x, but not subscribed.\n",groupId);      }    }    }  //Message: anycast  //Action: forward  //  //Q: Do we need to hang on to Black nodes?  Prior experience said stale state was causing routing loops.  //   Elimination of the spare capacity may solve this, but the general case is still problematic.  //  //Step 1: If I am a "white" node (first time), add my children  //Step 2: Pick the next node in line to process the msg.  //Step 3: Route to parent, if any  //Step 4: Fall back to routing message toward key.  forward anycast {    visited dfsState = field(dfsState);    if(from == myhash) {      return 0;    }    #ifdef ANYCAST_TRACE    pthread_mutex_lock(&debug_lock);    sprintf(trace_buf_,"DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState));    trace_print();    int i=0;    foreach_neighbor(neighbor_visited*,tmp,dfsState) {      sprintf(trace_buf_+(14*i), "(sib:%.8x())",tmp->ipaddr,tmp->color);      i++;    }    sprintf(trace_buf_+(14*i),"\n");    trace_print();    pthread_mutex_unlock(&debug_lock);    #endif    if(neighbor_query(my_sessions, field(group_id))) {      neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id));      //Step 1: If I am a "white" node (first time), add my children      if(!neighbor_query(dfsState, myhash) || neighbor_info(dfsState, myhash, color)==0) {        if(!neighbor_query(dfsState, myhash)) {          neighbor_add(dfsState, myhash);        }         neighbor_info(dfsState, myhash, color) = 1;        foreach_neighbor(neighbor_children*,achild,tempsess->kids) {          if(!neighbor_query(dfsState,achild->ipaddr)) {            neighbor_add(dfsState, achild->ipaddr);          }         }      }    }    //Step 2: Pick the next node in line to process the msg.    foreach_neighbor(neighbor_visited*,next,dfsState) {      if(next->color != 2 && next->ipaddr==myhash && (!neighbor_query(my_sessions, field(group_id))) || from==myhash) {        next->color = 2;      }      else if(next->color != 2) {        if(next->ipaddr == myhash) {          next->color=2;          //Visit this node.          #ifdef ANYCAST_TRACE          debug_macro("Scribe: All children visited. Visiting this node. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));          #endif          if(upcall_forward(0, msg, size, COMM_TYPE_ANYCAST)) {            return 1;          }        } else {          //Send message          #ifdef ANYCAST_TRACE          debug_macro("Scribe: Forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr);          #endif	        routeIP_anycast(next->ipaddr, field(group_id), field(from_addr), dfsState, msg, size, -1);          #ifdef ANYCAST_TRACE          debug_macro("Scribe: Done forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",from,field(from_addr),field(group_id),next->ipaddr);          #endif          return 1;        }      }    }    //Step 3: Route to parent, if any    //NOTE: dfsState is eitehr all black, or empty    if(neighbor_query(my_sessions, field(group_id))) {      neighbor_session *tempsess = neighbor_entry(my_sessions, field(group_id));      if(tempsess->parent==UNDEFINED) {        //If we don't know our parent, we simply forward it toward the group ID.        //FIXME: Is this the right thing to do, or do we drop it?        #ifdef ANYCAST_TRACE        debug_macro("Scribe: Don't know parent, so routing towards groupid. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));        #endif 	      route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1);        return 1;      } else {        #ifdef ANYCAST_TRACE        debug_macro("Scribe: Forwarding to parent %.8x. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",tempsess->parent,from,field(from_addr),field(group_id));        #endif	      routeIP_anycast(tempsess->parent, field(group_id), field(from_addr), dfsState, msg, size, -1);        return 1;      }	    }        //Step 4: Fall back to routing message toward key.    #ifdef ANYCAST_TRACE    debug_macro("Scribe: All options failed. Forwarding using Pastry routing. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));    #endif    route_anycast(field(group_id), field(group_id), field(from_addr), dfsState, msg, size, -1);    return 1;  }  //Message: anycast  //Action: recv  //  //Print an error message -- the anycast is delivered to the root of the tree.  recv anycast {    //This is the case where an anycast message was not handled anywhere. An error message is    //printed.#ifdef ANYCAST_TRACE    debug_macro("WARNING: Scribe: recv anycast.  No node accepted anycast message. dropping. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",from,field(from_addr),field(group_id));#endif    upcall_deliver(msg, size, COMM_TYPE_ANYCAST);  }  //Timer: hbTimer  //  //Perform periodic maintenance actions.  //Steps 1 and 2 only apply to groups we aren't the root for.  //STEP 1: Check to see if we're receiving data on the tree.  //STEP 2: See if we need to renew our interest to our parent  //STEP 3: Check for children who seem to have lost interest in the group  //STEP 3b: Increment ticks_since_join.  //STEP 4: Send a heartbeat to all children if no data has been sent.  //  //Q: A good API Error could resolve this.  But what about scribe-liveness, not just network liveness?  timer hbTimer {    static int timerUid = 0;    char pathId[40];    sprintf(pathId, "scribe_hbtimer_%.8x_%.8x", me, timerUid++);    neighbor_children *achild;    foreach_neighbor( neighbor_session*, tempsess, my_sessions) {      //Steps 1 and 2 only apply to groups we aren't the root for.      if(!tempsess->isRoot) {	        //STEP 1: Check to see if we're receiving data on the tree.        if(tempsess->ticks_since_heartbeat > HB_MISS_REPAIR) {          debug_macro("Scribe: WARNING No heartbeat received (ticks_since_hb=%d), rejoining group (%.8x)\n",tempsess->ticks_since_heartbeat,tempsess->ipaddr);          route_join(tempsess->ipaddr, tempsess->ipaddr, 0, 0, -1);          tempsess->ticks_since_heartbeat = 0;          tempsess->tick_count_for_renew = 0;          tempsess->parent = UNDEFINED;          debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", tempsess->ipaddr, myhash, myhash);          neighbor_clear(tempsess->pathtoroot);        } else {          tempsess->ticks_since_heartbeat++;#ifdef HEARTBEAT_TRACE          debug_macro("Scribe: Incremented ticks_since_hb=%d for group (%.8x)\n",tempsess->ticks_since_heartbeat,tempsess->ipaddr);#endif          //STEP 2: See if we need to renew our interest to our parent          if(tempsess->tick_count_for_renew > RENEW_TICKS) {            if(tempsess->parent != UNDEFINED) {#ifdef HEARTBEAT_TRACE              debug_macro("Scribe: Time to renew interest to parent (%.8x). tick_count_for_renew=%d for group (%.8x)\n",tempsess->parent,tempsess->tick_count_for_renew,tempsess->ipaddr);#endif              //If we have a parent, send a renewal message to that parent.              routeIP_join(tempsess->parent, tempsess->ipaddr, 0, 0, -1);            } else {              //Oddball case.  Should only happen if RENEW_TICKS < HB_MISS_REPAIR.  Basically retries a join.#ifdef HEARTBEAT_TRACE              debug_macro("Scribe: WARNING: Oddball case: Time to renew interest to parent, but no known parent. tick_count_for_renew=%d for group (%.8x)\n",tempsess->tick_count_for_renew,tempsess->ipaddr);#endif              route_join(tempsess->ipaddr, tempsess->ipaddr, 0, 0, -1);            }            tempsess->tick_count_for_renew=0;          } else {            tempsess->tick_count_for_renew++;#ifdef HEARTBEAT_TRACE            debug_macro("Scribe: incremented tick_count_for_renew=%d for group (%.8x)\n",tempsess->tick_count_for_renew,tempsess->ipaddr);#endif          }        }      }	      //Check if heartbeats need to be sent.      bool sendHeartbeats=false;      if(tempsess->tick_count_without_data > HB_TICKS) sendHeartbeats = true;      //      debug_macro("Scribe: hbTimer: sess=%.8x sendHeartbeats=%d\n",tempsess->ipaddr,sendHeartbeats);      foreach_neighbor( neighbor_children*, achild, tempsess->kids) {        //STEP 3: Check for children who seem to have lost interest in the group        if(achild->ticks_since_join > DROP_CHILD_TICKS) {	  //          debug_macro("Scribe: hbTimer: sess=%.8x dropping dead child %.8x\n",tempsess->ipaddr,achild->ipaddr);          group_change *gchg = new group_change;          gchg->groupId = tempsess->ipaddr;          gchg->childId = achild->ipaddr;          gchg->newJoin = false;          upcall_ext(GROUP_CHANGE, (void*)gchg);          delete gchg;          neighbor_remove(tempsess->kids, achild->ipaddr);        } else {          //STEP 3b: Increment ticks_since_join.          achild->ticks_since_join++;          //STEP 4: Send a heartbeat to all children if no data has been sent.          if(sendHeartbeats) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -