⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribe.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
      foreach_neighbor(neighbor_session*,tempsess,my_sessions) {        if(tempsess->parent == neighbor) {          routeIP_leave(neighbor,tempsess->ipaddr,0,0,-1);          debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", tempsess->ipaddr, myhash, myhash);          route_join(tempsess->ipaddr,tempsess->ipaddr,0,0,-1);          dojoin(tempsess->ipaddr);          tempsess->parent = UNDEFINED;        }        if(neighbor_query(tempsess->kids,neighbor)) {          neighbor_remove(tempsess->kids,neighbor);          routeIP_pushdown(neighbor,tempsess->ipaddr,tempsess->kids,0,0,-1);          if(true) { //FIXME: To be replaced by if(upper layer registered for this type of message)            //group_change defined in scribe_ext.h            group_change *gchg = new group_change;            gchg->groupId = tempsess->ipaddr;            gchg->childId = neighbor;            gchg->newJoin = false;            //Notify upper layer that the child has been added to the given group.            upcall_ext(GROUP_CHANGE, (void*)gchg);            delete gchg;          }	        }      }      neighbor_remove(watched,neighbor);    }    }  API create_group {    //sufficient?  will blindly send it.    route_create_group( groupID, groupID, 0, 0 , -1);    //routeIP_create_group( groupID, groupID, 0, 0, -1 ); //it's not clear that routeIP isn't the right thing.  }  //API: Join  //  //1- Create State if needed, send a join if new and not root.  //2- Set subscribed to 1 on the state.  API join {    //add neighbor... set subscribed to 1    neighbor_session *tempsess;    if(!neighbor_query(my_sessions, groupID)) {      //send join message, add neighbor      neighbor_add(my_sessions, groupID);			      neighbor_session *temp = neighbor_entry(my_sessions, groupID);      downcall_ext(temp->isRoot, TEST_OWNER, (void*)&groupID);      if(!temp->isRoot) {        debug_macro("Scribe: API join: Joining a group (%.8x), routing join.\n",groupID);        route_join( groupID, groupID, 0, 0, -1);        dojoin(groupID);      } else {        debug_macro("Scribe: API join: Joining a group (%.8x), am root.\n",groupID);        debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupID, myhash, myhash);      }      upcall_notify(my_sessions, NBR_TYPE_TREE);    } else {      debug_macro("Scribe: API join: Subscribing to a group (%.8x), already joined.\n",groupID);    }    neighbor_info(my_sessions, groupID, subscribed)=1;  }  //API: Leave  //1 - set unsubscribed.  //2 - If also no children, send leave message.  API leave {    neighbor_session *tempsess;    if(neighbor_query(my_sessions, groupID)) {      tempsess = neighbor_entry(my_sessions, groupID);      tempsess->subscribed = 0;      if(neighbor_size(tempsess->kids) < 1) {        //No longer part of this group!  Remove it and notify parent.        if( tempsess->parent!=UNDEFINED ) {          debug_macro("Scribe: API leave: Leaving a group (%.8x), sending leave to parent (%.8x).\n",groupID,tempsess->parent);          routeIP_leave (tempsess->parent, groupID, 0, 0, -1);          unwatch(tempsess->parent);          tempsess->parent = UNDEFINED;        } else {          debug_macro("Scribe: API leave: WARNING: Leaving a group (%.8x), but no parent record.\n",groupID);        }        neighbor_remove(my_sessions, groupID);        donejoin(groupID);        upcall_notify(my_sessions, NBR_TYPE_TREE);      }    } //else error, do nothing (already left?)  }  API multicast {    return_code = 0;    if(upcall_forward(groupID, msg, size, COMM_TYPE_MULTICAST) == 0) {      routeIP_data( groupID, groupID, COMM_TYPE_MULTICAST, transport, msg, size, transport );      return_code = macedon_sendret;    }  }  API collect {    return_code = 0;    if (neighbor_query(my_sessions, groupID)) {      neighbor_session* tempsess = neighbor_entry(my_sessions, groupID);      //Update the parent neighbor type, if needed (need lock inside).      if( (tempsess->isRoot) ) {        upcall_deliver(msg, size, COMM_TYPE_COLLECT);      }      else if(upcall_forward(tempsess->parent, msg, size, COMM_TYPE_COLLECT) == 0) {        if( (tempsess->parent==UNDEFINED) ) {          //tempsess->parent = from;          //NOTE: This was to handle the case where a join timed out -- and we were taken by two parents.            debug_macro("WARNING: Collecting but no parent.\n");          return_code = 1;        } else {          route_data( tempsess->parent, groupID, COMM_TYPE_COLLECT, transport, msg, size, transport );          return_code = macedon_sendret;        }      }    } else {      debug_macro("WARNING: Collecting but no group.\n");      return_code = 1;    }  }  API anycast {    //FIXME: Some processing for when we are in the anycast group.    return_code = 0;    if(upcall_forward(groupID, msg, size, COMM_TYPE_ANYCAST) == 0) {      visited dfsState;      route_anycast( groupID, groupID, myhash, dfsState, msg, size, -1 );      return_code = macedon_sendret;    }  }  API route {    route_unicast_data(dest, msg, size, transport);    return_code = macedon_sendret;  }  API routeIP {    routeIP_unicast_data(dest, msg, size, transport);    return_code = macedon_sendret;  }  forward unicast_data {    return upcall_forward(dest, msg, size, COMM_TYPE_UNICAST);  }  recv unicast_data {    upcall_deliver(msg, size, COMM_TYPE_UNICAST);   }  //API notify: Called when its possible this node's managed ID space   //            changes  //  //1 - ignore non-peer notifications  //2 - check if we are the rooot on each session  //3 - (if needed) tell the new root that it is root  //4 - update isRoot status.  API notify {    #ifdef SCRIBE_TRACE    debug_macro( "Received Neighbor Notify, type: %d, size: %d.\n", type, size);    //for(int i=0; i<size; i++) {    //  sprintf(trace_buf_+(14*i), "(nbr:%.8x)",neighbors[i]);    //}    //sprintf(trace_buf_+(14*size),"\n");    //trace_print();    #endif    if(type == NBR_TYPE_PEER) {      foreach_neighbor (neighbor_session*, tempsess, my_sessions) {        int isOwner;        downcall_ext(isOwner, TEST_OWNER, (void*)&tempsess->ipaddr);        if(tempsess->isRoot && !isOwner ) {          debug_macro( "Group (%.8x) should be moved to new root.\n", tempsess->ipaddr);          //Join under the new root of the session.          tempsess->isRoot = false;          //NOTE: A _REAL_ Implementation would need to keep state on whether they are source for each group.          if(source_ == me) {             children siblings;            neighbor_clear(siblings);            neighbor_add(siblings,tempsess->ipaddr);            route_create_group( tempsess->ipaddr, tempsess->ipaddr, 0, 0 , -1);            foreach_neighbor (neighbor_children*, afriend, tempsess->kids) {              routeIP_pushdown( afriend->ipaddr, tempsess->ipaddr, siblings, 0, 0, -1 );            }            neighbor_remove(my_sessions, tempsess->ipaddr);            upcall_notify(my_sessions, NBR_TYPE_TREE);  //notify the upper layer of the state change.          } else {            routeIP_join(tempsess->ipaddr,tempsess->ipaddr, 0, 0, -1);            dojoin(tempsess->ipaddr);          }        } else if (!tempsess->isRoot && isOwner ) {          //I am now the parent, so fix the timers (don't expect heartbeats from parents)          tempsess->isRoot = true;          donejoin(tempsess->ipaddr);          unwatch(tempsess->parent);          tempsess->parent = UNDEFINED;        }      }    }  }  //API downcall_ext  //  //PUSHDOWN: instructs scribe to push down the given child from teh given tree.  //  1 - remove state  //  2 - route pushdown  //ADD_CHILD: a _request_ by the upper layer to add the child to the given tree.  //  1 - Apply a strict set of rules before accepting it, return an appropriate  //      error code  //  2 - Accept the new child, not do not notify upper layer (it was their request, after all)  //  3 - Send a heartbeat to the new child.  //TEST_OWNER: pass on this ownership request to lower layer.  //DEFAULT: error code on unrecognized query  API downcall_ext {    //Takes int operation, void* arg    neighbor_session* tempsess;	        switch(operation) {      case PUSHDOWN:          {          //NOTE: THE PUSHDOWN FN. TAKES AN INT ARRAY OF SIZE 2.          //arg[0]=group_id, arg[1]=pushdown_id          int* args=(int*)arg;          debug_macro( "PUSHDOWN(gr:%.8x,no:%.8x) CALLED\n", args[0],args[1]);          if(neighbor_query(my_sessions,args[0])) {            tempsess=neighbor_entry(my_sessions,args[0]);            if(neighbor_query(tempsess->kids, args[1])) {              neighbor_remove(tempsess->kids, args[1]);              unwatch(args[1]);              //FIXME: Should this be routeIP or route?              route_pushdown(args[1],args[0],tempsess->kids, 0, 0, -1);            } else {              debug_macro( "NODE NOT IN GROUP.\n");            }          } else {            debug_macro( "GROUP NOT IN SESSION.\n");          }          break;        }      case ADD_CHILD:        {          child_add_arg* childToAdd = (child_add_arg*)arg;          if(childToAdd->childId != myhash) {              if(neighbor_query(my_sessions, childToAdd->groupId)) {              tempsess = neighbor_entry(my_sessions, childToAdd->groupId);              if(neighbor_size(tempsess->kids) == MAX_CHILDREN) {                debug_macro( "TOO MANY CHILDREN!.\n");                return 5;              }              if(neighbor_query(tempsess->pathtoroot, childToAdd->childId) || tempsess->parent == UNDEFINED) {                debug_macro( "CHILD IS IN PATH TO ROOT OR BUSY JOINING (parent: %.8x).\n",tempsess->parent);                return 3;              }              if(!neighbor_query(tempsess->kids, childToAdd->childId)) {                neighbor_add(tempsess->kids, childToAdd->childId);                watch(childToAdd->childId);                debug_macro( "[downcall(ADD_CHILD)] added kid %.8x for group %.8x, now have %d\n", childToAdd->childId, childToAdd->groupId, neighbor_size(tempsess->kids));                routeIP_membership(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1);              } else {                debug_macro( "CHILD ALREADY IN GROUP (sending membership).\n");                routeIP_membership(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1);                return 2;              }            } else {              debug_macro( "GROUP NOT IN SESSION.\n");              return 1; //ERROR ADDING CHILD            }          } else {            debug_macro( "POTENTIAL CHILD IS ME!\n");            return 4; //ERROR ADDING CHILD          }           break;        }      case TEST_OWNER:         {          int retval;          downcall_ext(retval,TEST_OWNER,arg);          return retval;        }      default:         {          debug_macro( "Scribe: Unrecognized Extensible Downcall Made (type=%d).\n",operation);          return -1; //CALL UNRECOGNIZED        }    }    return 0;  }  //API upcall_ext:  //  None currently defined -- so just return the error code (-1)  API upcall_ext {    //Takes int operation, void* arg    switch(operation) {      default:         debug_macro( "Scribe: Unrecognized Extensible Upcall Made (type=%d).\n", operation);        return -1; //CALL UNRECOGNIZED    }    return 0;  }}routines {  //Common procedure to increment/add a node to be watched.  void watch(macedon_key who) {    if(who == UNDEFINED) return;#ifdef SCRIBE_TRACE    debug_macro("watch(%.8x)\n",who);#endif    if(!neighbor_query(watched,who)) {      neighbor_add( watched, who );    }    neighbor_info(watched,who,count)++;  }  //Common procedure to decrement/delete a node being watched.  void unwatch(macedon_key who) {    if(who == UNDEFINED) return;#ifdef SCRIBE_TRACE    debug_macro("unwatch(%.8x)\n",who);#endif    neighbor_info(watched,who,count)--;    if(neighbor_info(watched,who,count)==0) {      neighbor_remove(watched,who);    }  }  //Common procedure to join  void dojoin(macedon_key which) {    if(which == UNDEFINED) return;    if(!neighbor_info(my_sessions,which,isRoot)) {      if(!neighbor_query(outstanding_joins,which)) {        if(neighbor_empty(outstanding_joins)) {          timer_resched(joinTimer,JOIN_TIMER_PERIOD);        }        neighbor_add(outstanding_joins,which);      }      neighbor_info(outstanding_joins,which,delay)=0;    }    }  //Common procedure to stop monitoring a join  void donejoin(macedon_key which) {    if(which == UNDEFINED) return;    neighbor_remove(outstanding_joins,which);    if(neighbor_empty(outstanding_joins)) {      timer_cancel(joinTimer);    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -