📄 scribems.mac
字号:
// debug_macro("Scribe: hbTimer: sess=%.8x sending hb to child %.8x\n",tempsess->ipaddr,achild->ipaddr); routeIP_heartbeat(achild->ipaddr, tempsess->ipaddr, tempsess->pathtoroot, 0, 0, -1); } } } //Reset the hb count. if(sendHeartbeats) { // debug_macro("Scribe: hbTimer: sess=%.8x clearing hb flag\n",tempsess->ipaddr); tempsess->tick_count_without_data=0; } else { tempsess->tick_count_without_data++; // debug_macro("Scribe: hbTimer: sess=%.8x incrementing tick_count_without_data (%d)\n",tempsess->ipaddr,tempsess->tick_count_without_data); } } } //Timer: printer // //prints out current state, and calls dump_state(). //Also -- prints REPLAY BANDWIDTH timer printer {#ifdef SCRIBE_TRACE pthread_mutex_lock(&debug_lock); sprintf("Scribe: Printer timer displaying summary Scribe state:\n"); trace_print(); foreach_neighbor(neighbor_session*,tempsess,my_sessions) { sprintf(trace_buf_,"Scribe session: %.8x, with %d kids.\n",tempsess->ipaddr,neighbor_size(tempsess->kids)); trace_print(); } sprintf(trace_buf_,"Scribe: Printer timer dumping state:\n"); trace_print(); pthread_mutex_lock(&debug_lock); dump_state();#endif //NOTE: These lines for evaluation purposes. extern MACEDON_Agent *globalmacedon; if (globalmacedon != this) // am i the highest layer return; if ( ( parameters.getint("streaming_time") == -1.0 || curtime > time_booted + parameters.getint("streaming_time")) ) printf("%s %f %d REPLAY_BANDWIDTH %d %d %d %d %d %d\n", get_hostname(), Scheduler::instance().clock(), pthread_self(), (int) master.get_value(), 0, // need to put in control bw (int) master_useful.get_value(), (int) master.get_value(), (int) master_useful.get_value(), 0 ); if(1) { foreach_neighbor(neighbor_session*,tempsess,my_sessions) { if(tempsess->isRoot) { debug_macro("REPLAY session: %.8x, parent: client0.\n",tempsess->ipaddr); } else { debug_macro("REPLAY session: %.8x, parent: %.8x.\n",tempsess->ipaddr,tempsess->parent); } } } } init API status_change { if(status == STATUS_READY) { state_change(joined); } } API create_group { //sufficient? will blindly send it. route_create_group( groupID, groupID, 0, 0, -1 ); //routeIP_create_group( groupID, groupID, 0, 0, -1 ); //it's not clear that routeIP isn't the right thing. } //API: Join // //1- Create State if needed, send a join if new and not root. //2- Set subscribed to 1 on the state. API join { //add neighbor... set subscribed to 1 neighbor_session *tempsess; if(!neighbor_query(my_sessions, groupID)) { //send join message, add neighbor neighbor_add(my_sessions, groupID); neighbor_session *temp = neighbor_entry(my_sessions, groupID); downcall_ext(temp->isRoot, TEST_OWNER, (void*)&groupID); temp->ticks_since_heartbeat = 0; temp->tick_count_for_renew = 0; temp->tick_count_without_data = 0; if(!temp->isRoot) { debug_macro("Scribe: API join: Joining a group (%.8x), routing join.\n",groupID); route_join( groupID, groupID, 0, 0, -1); } else { debug_macro("Scribe: API join: Joining a group (%.8x), am root.\n",groupID); debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupID, myhash, myhash); } upcall_notify(my_sessions, NBR_TYPE_TREE); } else { debug_macro("Scribe: API join: Subscribing to a group (%.8x), already joined.\n",groupID); } neighbor_info(my_sessions, groupID, subscribed)=1; } //API: Leave //1 - set unsubscribed. //2 - If also no children, send leave message. API leave { neighbor_session *tempsess; if(neighbor_query(my_sessions, groupID)) { tempsess = neighbor_entry(my_sessions, groupID); tempsess->subscribed = 0; if(neighbor_size(tempsess->kids) < 1) { //No longer part of this group! Remove it and notify parent. if( tempsess->parent!=UNDEFINED ) { debug_macro("Scribe: API leave: Leaving a group (%.8x), sending leave to parent (%.8x).\n",groupID,tempsess->parent); routeIP_leave (tempsess->parent, groupID, 0, 0, -1); } else { debug_macro("Scribe: API leave: WARNING: Leaving a group (%.8x), but no parent record.\n",groupID); } neighbor_clear(tempsess->kids); neighbor_remove(my_sessions, groupID); upcall_notify(my_sessions, NBR_TYPE_TREE); } } //else error, do nothing (already left?) } API multicast { if(upcall_forward(groupID, msg, size, COMM_TYPE_MULTICAST) == 0) { routeIP_data( groupID, groupID, transport, msg, size, transport ); } } API anycast { //FIXME: Some processing for when we are in the anycast group. if(!neighbor_query(my_sessions, groupID)) { visited dfsState; route_anycast( groupID, groupID, myhash, dfsState, msg, size, -1 ); } else { visited dfsState;#ifdef ANYCAST_TRACE pthread_mutex_lock(&debug_lock); sprintf(trace_buf_,"DEBUG: dfsState size=%d, list:\n",neighbor_size(dfsState)); trace_print(); int i=0; foreach_neighbor(neighbor_visited*,tmp,dfsState) { sprintf(trace_buf_+(14*i), "(sib:%.8x())",tmp->ipaddr,tmp->color); i++; } sprintf(trace_buf_+(14*i),"\n"); trace_print(); pthread_mutex_unlock(&debug_lock);#endif neighbor_session *tempsess = neighbor_entry(my_sessions, groupID); //Step 1: I am a "white" node (first time), add my children if(!neighbor_query(dfsState, myhash)) { neighbor_add(dfsState, myhash); } neighbor_info(dfsState, myhash, color) = 1; foreach_neighbor(neighbor_children*,achild,tempsess->kids) { if(!neighbor_query(dfsState,achild->ipaddr)) { neighbor_add(dfsState, achild->ipaddr); } } //Step 2: Pick the next node in line to process the msg. foreach_neighbor(neighbor_visited*,next,dfsState) { if(next->color != 2 && next->ipaddr==myhash) { next->color = 2; } else if(next->color != 2) { //Send message#ifdef ANYCAST_TRACE debug_macro("Scribe: Forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",myhash,myhash,groupID,next->ipaddr);#endif routeIP_anycast(next->ipaddr, groupID, myhash, dfsState, msg, size, -1);#ifdef ANYCAST_TRACE debug_macro("Scribe: Done forwarding anycast msg from %.8x (orig from: %.8x) for group %.8x to child %.8x\n",myhash,myhash,groupID,next->ipaddr);#endif return macedon_sendret; } } //Step 3: Route to parent, if any //NOTE: dfsState is eitehr all black, or empty if(tempsess->parent==UNDEFINED) { //If we don't know our parent, we simply forward it toward the group ID. //FIXME: Is this the right thing to do, or do we drop it?#ifdef ANYCAST_TRACE debug_macro("Scribe: Don't know parent, so routing towards groupid. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",myhash,myhash,groupID);#endif route_anycast(groupID, groupID, myhash, dfsState, msg, size, -1); return macedon_sendret; } else {#ifdef ANYCAST_TRACE debug_macro("Scribe: Forwarding to parent %.8x. (anycast msg from %.8x (orig from: %.8x) for group %.8x)\n",tempsess->parent,myhash,myhash,groupID);#endif routeIP_anycast(tempsess->parent, groupID, myhash, dfsState, msg, size, -1); return macedon_sendret; } } } //API notify: Called when its possible this node's managed ID space // changes // //1 - ignore non-peer notifications //2 - check if we are the rooot on each session //3 - (if needed) tell the new root that it is root //4 - update isRoot status. API notify { #ifdef SCRIBE_TRACE pthread_mutex_lock(&debug_lock); sprintf(trace_buf_, "Received Neighbor Notify, type: %d, size: %d.\n", type, size); trace_print(); for(int i=0; i<size; i++) { sprintf(trace_buf_+(14*i), "(nbr:%.8x)",neighbors[i]); } sprintf(trace_buf_+(14*size),"\n"); trace_print(); pthread_mutex_unlock(&debug_lock); #endif if(type == NBR_TYPE_PEER) { foreach_neighbor (neighbor_session*, tempsess, my_sessions) { int isOwner; downcall_ext(isOwner, TEST_OWNER, (void*)&tempsess->ipaddr); if(tempsess->isRoot && !isOwner ) { debug_macro("Group (%.8x) should be moved to new root.\n", tempsess->ipaddr); //Join under the new root of the session. tempsess->isRoot = false; routeIP_join(tempsess->ipaddr,tempsess->ipaddr, 0, 0, -1); } else if (!tempsess->isRoot && isOwner ) { //I am now the parent, so fix the timers (don't expect heartbeats from parents) tempsess->isRoot = true; } } } } //API downcall_ext // //PUSHDOWN: instructs scribe to push down the given child from teh given tree. // 1 - remove state // 2 - route pushdown //ADD_CHILD: a _request_ by the upper layer to add the child to the given tree. // 1 - Apply a strict set of rules before accepting it, return an appropriate // error code // 2 - Accept the new child, not do not notify upper layer (it was their request, after all) // 3 - Send a heartbeat to the new child. //TEST_OWNER: pass on this ownership request to lower layer. //DEFAULT: error code on unrecognized query API downcall_ext { //Takes int operation, void* arg neighbor_session* tempsess; switch(operation) { case PUSHDOWN: { //NOTE: THE PUSHDOWN FN. TAKES AN INT ARRAY OF SIZE 2. //arg[0]=group_id, arg[1]=pushdown_id int* args=(int*)arg; debug_macro("PUSHDOWN(gr:%.8x,no:%.8x) CALLED\n", args[0],args[1]); if(neighbor_query(my_sessions,args[0])) { tempsess=neighbor_entry(my_sessions,args[0]); if(neighbor_query(tempsess->kids, args[1])) { neighbor_remove(tempsess->kids, args[1]); route_pushdown(args[1],args[0],tempsess->kids, 0, 0, -1); } else { debug_macro("NODE NOT IN GROUP.\n"); } } else { debug_macro("GROUP NOT IN SESSION.\n"); } break; } case ADD_CHILD: { child_add_arg* childToAdd = (child_add_arg*)arg; if(childToAdd->childId != myhash) { if(neighbor_query(my_sessions, childToAdd->groupId)) { tempsess = neighbor_entry(my_sessions, childToAdd->groupId); if(neighbor_size(tempsess->kids) == MAX_CHILDREN) { debug_macro("TOO MANY CHILDREN!.\n"); return 5; } if(neighbor_query(tempsess->pathtoroot, childToAdd->childId)) { debug_macro("CHILD IS IN PATH TO ROOT.\n"); return 3; } if(!neighbor_query(tempsess->kids, childToAdd->childId)) { neighbor_add(tempsess->kids, childToAdd->childId); debug_macro("[downcall(ADD_CHILD)] added kid %.8x for group %.8x, now have %d\n", childToAdd->childId, childToAdd->groupId, neighbor_size(tempsess->kids)); neighbor_info(tempsess->kids,childToAdd->childId,ticks_since_join) = 0; routeIP_heartbeat(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1); } else { debug_macro("CHILD ALREADY IN GROUP (sending heartbeat).\n"); routeIP_heartbeat(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1); return 2; } } else { debug_macro("GROUP NOT IN SESSION.\n"); return 1; //ERROR ADDING CHILD } } else { debug_macro("POTENTIAL CHILD IS ME!\n"); return 4; //ERROR ADDING CHILD } break; } case TEST_OWNER: { int retval; downcall_ext(retval,TEST_OWNER,arg); return retval; } default: { debug_macro("Scribe: Unrecognized Extensible Downcall Made (type=%d).\n",operation); return -1; //CALL UNRECOGNIZED } } return 0; } //API upcall_ext: // None currently defined -- so just return the error code (-1) API upcall_ext { //Takes int operation, void* arg switch(operation) { default: debug_macro("Scribe: Unrecognized Extensible Upcall Made (type=%d).\n", operation); return -1; //CALL UNRECOGNIZED } return 0; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -