⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribems.mac

📁 这是一个著名的应用层组播中间件的源码
💻 MAC
📖 第 1 页 / 共 3 页
字号:
	    //            debug_macro("Scribe: hbTimer: sess=%.8x sending hb to child %.8x\n",tempsess->ipaddr,achild->ipaddr);            routeIP_heartbeat(achild->ipaddr, tempsess->ipaddr, tempsess->pathtoroot, 0, 0, -1);          }        }      }      //Reset the hb count.      if(sendHeartbeats) {	//        debug_macro("Scribe: hbTimer: sess=%.8x clearing hb flag\n",tempsess->ipaddr);        tempsess->tick_count_without_data=0;      } else {        tempsess->tick_count_without_data++;	//        debug_macro("Scribe: hbTimer: sess=%.8x incrementing tick_count_without_data (%d)\n",tempsess->ipaddr,tempsess->tick_count_without_data);      }    }  }  //Timer: printer  //  //prints out current state, and calls dump_state().  //Also -- prints REPLAY BANDWIDTH  timer printer {#ifdef SCRIBE_TRACE    pthread_mutex_lock(&debug_lock);    sprintf("Scribe: Printer timer displaying summary Scribe state:\n");    trace_print();    foreach_neighbor(neighbor_session*,tempsess,my_sessions) {      sprintf(trace_buf_,"Scribe session: %.8x, with %d kids.\n",tempsess->ipaddr,neighbor_size(tempsess->kids));      trace_print();    }    sprintf(trace_buf_,"Scribe: Printer timer dumping state:\n");    trace_print();    pthread_mutex_lock(&debug_lock);    dump_state();#endif    //NOTE: These lines for evaluation purposes.    extern MACEDON_Agent *globalmacedon;    if (globalmacedon != this)  // am i the highest layer      return;     if ( ( parameters.getint("streaming_time") == -1.0 ||	   curtime > time_booted + parameters.getint("streaming_time")) )      printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", 	     get_hostname(), Scheduler::instance().clock(), pthread_self(),	     (int) master.get_value(),	     0,   // need to put in control bw	     (int) master_useful.get_value(),	     (int) master.get_value(),	     (int) master_useful.get_value(),	     0	     );    if(1) {      foreach_neighbor(neighbor_session*,tempsess,my_sessions) {        if(tempsess->isRoot) {          debug_macro("REPLAY session: %.8x, parent: client0.\n",tempsess->ipaddr);        } else {          debug_macro("REPLAY session: %.8x, parent: %.8x.\n",tempsess->ipaddr,tempsess->parent);        }      }    }  }  init API status_change {    if(status == STATUS_READY) {      state_change(joined);    }  }    API create_group {    //sufficient?  will blindly send it.    route_create_group( groupID, groupID, 0, 0, -1 );    //routeIP_create_group( groupID, groupID, 0, 0, -1 ); //it's not clear that routeIP isn't the right thing.  }  //API: Join  //  //1- Create State if needed, send a join if new and not root.  //2- Set subscribed to 1 on the state.  API join {    //add neighbor... set subscribed to 1    neighbor_session *tempsess;    if(!neighbor_query(my_sessions, groupID)) {      //send join message, add neighbor      neighbor_add(my_sessions, groupID);			      neighbor_session *temp = neighbor_entry(my_sessions, groupID);      downcall_ext(temp->isRoot, TEST_OWNER, (void*)&groupID);      temp->ticks_since_heartbeat = 0;      temp->tick_count_for_renew = 0;      temp->tick_count_without_data = 0;      if(!temp->isRoot) {        debug_macro("Scribe: API join: Joining a group (%.8x), routing join.\n",groupID);        route_join( groupID, groupID, 0, 0, -1);      } else {        debug_macro("Scribe: API join: Joining a group (%.8x), am root.\n",groupID);        debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupID, myhash, myhash);      }      upcall_notify(my_sessions, NBR_TYPE_TREE);    } else {      debug_macro("Scribe: API join: Subscribing to a group (%.8x), already joined.\n",groupID);    }    neighbor_info(my_sessions, groupID, subscribed)=1;  }  //API: Leave  //1 - set unsubscribed.  //2 - If also no children, send leave message.  API leave {    neighbor_session *tempsess;    if(neighbor_query(my_sessions, groupID)) {      tempsess = neighbor_entry(my_sessions, groupID);      tempsess->subscribed = 0;      if(neighbor_size(tempsess->kids) < 1) {        //No longer part of this group!  Remove it and notify parent.        if( tempsess->parent!=UNDEFINED ) {          debug_macro("Scribe: API leave: Leaving a group (%.8x), sending leave to parent (%.8x).\n",groupID,tempsess->parent);          routeIP_leave (tempsess->parent, groupID, 0, 0, -1);        } else {          debug_macro("Scribe: API leave: WARNING: Leaving a group (%.8x), but no parent record.\n",groupID);        }        neighbor_clear(tempsess->kids);        neighbor_remove(my_sessions, groupID);        upcall_notify(my_sessions, NBR_TYPE_TREE);      }    } //else error, do nothing (already left?)  }  API multicast {    if(upcall_forward(groupID, msg, size, COMM_TYPE_MULTICAST) == 0) {      routeIP_data( groupID, groupID, transport, msg, size, transport );    }  }  API anycast {    //FIXME: Some processing for when we are in the anycast group.    if(!neighbor_query(my_sessions, groupID)) {      visited dfsState;      route_anycast( groupID, groupID, myhash, dfsState, msg, size, -1 );    } else {       visited dfsState;#ifdef ANYCAST_TRACE      pthread_mutex_lock(&debug_lock);      sprintf(trace_buf_,"DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState));      trace_print();      int i=0;      foreach_neighbor(neighbor_visited*,tmp,dfsState) {        sprintf(trace_buf_+(14*i), "(sib:%.8x())",tmp->ipaddr,tmp->color);        i++;      }      sprintf(trace_buf_+(14*i),"\n");      trace_print();      pthread_mutex_unlock(&debug_lock);#endif      neighbor_session *tempsess = neighbor_entry(my_sessions, groupID);      //Step 1: I am a "white" node (first time), add my children      if(!neighbor_query(dfsState, myhash)) {        neighbor_add(dfsState, myhash);      }       neighbor_info(dfsState, myhash, color) = 1;      foreach_neighbor(neighbor_children*,achild,tempsess->kids) {        if(!neighbor_query(dfsState,achild->ipaddr)) {          neighbor_add(dfsState, achild->ipaddr);        }       }      //Step 2: Pick the next node in line to process the msg.      foreach_neighbor(neighbor_visited*,next,dfsState) {        if(next->color != 2 && next->ipaddr==myhash) {          next->color = 2;        }        else if(next->color != 2) {          //Send message#ifdef ANYCAST_TRACE          debug_macro("Scribe: Forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",myhash,myhash,groupID,next->ipaddr);#endif          routeIP_anycast(next->ipaddr, groupID, myhash, dfsState, msg, size, -1);#ifdef ANYCAST_TRACE          debug_macro("Scribe: Done forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",myhash,myhash,groupID,next->ipaddr);#endif          return macedon_sendret;        }      }      //Step 3: Route to parent, if any      //NOTE: dfsState is eitehr all black, or empty      if(tempsess->parent==UNDEFINED) {        //If we don't know our parent, we simply forward it toward the group ID.        //FIXME: Is this the right thing to do, or do we drop it?#ifdef ANYCAST_TRACE        debug_macro("Scribe: Don't know parent, so routing towards groupid. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",myhash,myhash,groupID);#endif         route_anycast(groupID, groupID, myhash, dfsState, msg, size, -1);        return macedon_sendret;      } else {#ifdef ANYCAST_TRACE        debug_macro("Scribe: Forwarding to parent %.8x. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",tempsess->parent,myhash,myhash,groupID);#endif        routeIP_anycast(tempsess->parent, groupID, myhash, dfsState, msg, size, -1);        return macedon_sendret;      }	    }  }  //API notify: Called when its possible this node's managed ID space   //            changes  //  //1 - ignore non-peer notifications  //2 - check if we are the rooot on each session  //3 - (if needed) tell the new root that it is root  //4 - update isRoot status.  API notify {    #ifdef SCRIBE_TRACE    pthread_mutex_lock(&debug_lock);    sprintf(trace_buf_, "Received Neighbor Notify, type: %d, size: %d.\n", type, size);    trace_print();    for(int i=0; i<size; i++) {      sprintf(trace_buf_+(14*i), "(nbr:%.8x)",neighbors[i]);    }    sprintf(trace_buf_+(14*size),"\n");    trace_print();    pthread_mutex_unlock(&debug_lock);    #endif    if(type == NBR_TYPE_PEER) {      foreach_neighbor (neighbor_session*, tempsess, my_sessions) {        int isOwner;        downcall_ext(isOwner, TEST_OWNER, (void*)&tempsess->ipaddr);        if(tempsess->isRoot && !isOwner ) {          debug_macro("Group (%.8x) should be moved to new root.\n", tempsess->ipaddr);          //Join under the new root of the session.          tempsess->isRoot = false;          routeIP_join(tempsess->ipaddr,tempsess->ipaddr, 0, 0, -1);        } else if (!tempsess->isRoot && isOwner ) {          //I am now the parent, so fix the timers (don't expect heartbeats from parents)          tempsess->isRoot = true;        }      }    }  }  //API downcall_ext  //  //PUSHDOWN: instructs scribe to push down the given child from teh given tree.  //  1 - remove state  //  2 - route pushdown  //ADD_CHILD: a _request_ by the upper layer to add the child to the given tree.  //  1 - Apply a strict set of rules before accepting it, return an appropriate  //      error code  //  2 - Accept the new child, not do not notify upper layer (it was their request, after all)  //  3 - Send a heartbeat to the new child.  //TEST_OWNER: pass on this ownership request to lower layer.  //DEFAULT: error code on unrecognized query  API downcall_ext {    //Takes int operation, void* arg    neighbor_session* tempsess;	        switch(operation) {      case PUSHDOWN:          {          //NOTE: THE PUSHDOWN FN. TAKES AN INT ARRAY OF SIZE 2.          //arg[0]=group_id, arg[1]=pushdown_id          int* args=(int*)arg;          debug_macro("PUSHDOWN(gr:%.8x,no:%.8x) CALLED\n", args[0],args[1]);          if(neighbor_query(my_sessions,args[0])) {            tempsess=neighbor_entry(my_sessions,args[0]);            if(neighbor_query(tempsess->kids, args[1])) {              neighbor_remove(tempsess->kids, args[1]);              route_pushdown(args[1],args[0],tempsess->kids, 0, 0, -1);            } else {              debug_macro("NODE NOT IN GROUP.\n");            }          } else {            debug_macro("GROUP NOT IN SESSION.\n");          }          break;        }      case ADD_CHILD:        {          child_add_arg* childToAdd = (child_add_arg*)arg;          if(childToAdd->childId != myhash) {              if(neighbor_query(my_sessions, childToAdd->groupId)) {              tempsess = neighbor_entry(my_sessions, childToAdd->groupId);              if(neighbor_size(tempsess->kids) == MAX_CHILDREN) {                debug_macro("TOO MANY CHILDREN!.\n");                return 5;              }              if(neighbor_query(tempsess->pathtoroot, childToAdd->childId)) {                debug_macro("CHILD IS IN PATH TO ROOT.\n");                return 3;              }              if(!neighbor_query(tempsess->kids, childToAdd->childId)) {                neighbor_add(tempsess->kids, childToAdd->childId);                debug_macro("[downcall(ADD_CHILD)] added kid %.8x for group %.8x, now have %d\n", childToAdd->childId, childToAdd->groupId, neighbor_size(tempsess->kids));                neighbor_info(tempsess->kids,childToAdd->childId,ticks_since_join) = 0;                routeIP_heartbeat(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1);              } else {                debug_macro("CHILD ALREADY IN GROUP (sending heartbeat).\n");                routeIP_heartbeat(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1);                return 2;              }            } else {              debug_macro("GROUP NOT IN SESSION.\n");              return 1; //ERROR ADDING CHILD            }          } else {            debug_macro("POTENTIAL CHILD IS ME!\n");            return 4; //ERROR ADDING CHILD          }           break;        }      case TEST_OWNER:         {          int retval;          downcall_ext(retval,TEST_OWNER,arg);          return retval;        }      default:         {          debug_macro("Scribe: Unrecognized Extensible Downcall Made (type=%d).\n",operation);          return -1; //CALL UNRECOGNIZED        }    }    return 0;  }  //API upcall_ext:  //  None currently defined -- so just return the error code (-1)  API upcall_ext {    //Takes int operation, void* arg    switch(operation) {      default:         debug_macro("Scribe: Unrecognized Extensible Upcall Made (type=%d).\n", operation);        return -1; //CALL UNRECOGNIZED    }    return 0;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -