📄 scribe.mac
字号:
foreach_neighbor(neighbor_session*,tempsess,my_sessions) { if(tempsess->parent == neighbor) { routeIP_leave(neighbor,tempsess->ipaddr,0,0,-1); debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", tempsess->ipaddr, myhash, myhash); route_join(tempsess->ipaddr,tempsess->ipaddr,0,0,-1); dojoin(tempsess->ipaddr); tempsess->parent = UNDEFINED; } if(neighbor_query(tempsess->kids,neighbor)) { neighbor_remove(tempsess->kids,neighbor); routeIP_pushdown(neighbor,tempsess->ipaddr,tempsess->kids,0,0,-1); if(true) { //FIXME: To be replaced by if(upper layer registered for this type of message) //group_change defined in scribe_ext.h group_change *gchg = new group_change; gchg->groupId = tempsess->ipaddr; gchg->childId = neighbor; gchg->newJoin = false; //Notify upper layer that the child has been added to the given group. upcall_ext(GROUP_CHANGE, (void*)gchg); delete gchg; } } } neighbor_remove(watched,neighbor); } } API create_group { //sufficient? will blindly send it. route_create_group( groupID, groupID, 0, 0 , -1); //routeIP_create_group( groupID, groupID, 0, 0, -1 ); //it's not clear that routeIP isn't the right thing. } //API: Join // //1- Create State if needed, send a join if new and not root. //2- Set subscribed to 1 on the state. API join { //add neighbor... set subscribed to 1 neighbor_session *tempsess; if(!neighbor_query(my_sessions, groupID)) { //send join message, add neighbor neighbor_add(my_sessions, groupID); neighbor_session *temp = neighbor_entry(my_sessions, groupID); downcall_ext(temp->isRoot, TEST_OWNER, (void*)&groupID); if(!temp->isRoot) { debug_macro("Scribe: API join: Joining a group (%.8x), routing join.\n",groupID); route_join( groupID, groupID, 0, 0, -1); dojoin(groupID); } else { debug_macro("Scribe: API join: Joining a group (%.8x), am root.\n",groupID); debug_macro("REPLAY Scribe: MEMBERSHIP_CHANGE ( gr me parent ) %.8x %.8x %.8x\n", groupID, myhash, myhash); } upcall_notify(my_sessions, NBR_TYPE_TREE); } else { debug_macro("Scribe: API join: Subscribing to a group (%.8x), already joined.\n",groupID); } neighbor_info(my_sessions, groupID, subscribed)=1; } //API: Leave //1 - set unsubscribed. //2 - If also no children, send leave message. API leave { neighbor_session *tempsess; if(neighbor_query(my_sessions, groupID)) { tempsess = neighbor_entry(my_sessions, groupID); tempsess->subscribed = 0; if(neighbor_size(tempsess->kids) < 1) { //No longer part of this group! Remove it and notify parent. if( tempsess->parent!=UNDEFINED ) { debug_macro("Scribe: API leave: Leaving a group (%.8x), sending leave to parent (%.8x).\n",groupID,tempsess->parent); routeIP_leave (tempsess->parent, groupID, 0, 0, -1); unwatch(tempsess->parent); tempsess->parent = UNDEFINED; } else { debug_macro("Scribe: API leave: WARNING: Leaving a group (%.8x), but no parent record.\n",groupID); } neighbor_remove(my_sessions, groupID); donejoin(groupID); upcall_notify(my_sessions, NBR_TYPE_TREE); } } //else error, do nothing (already left?) } API multicast { return_code = 0; if(upcall_forward(groupID, msg, size, COMM_TYPE_MULTICAST) == 0) { routeIP_data( groupID, groupID, COMM_TYPE_MULTICAST, transport, msg, size, transport ); return_code = macedon_sendret; } } API collect { return_code = 0; if (neighbor_query(my_sessions, groupID)) { neighbor_session* tempsess = neighbor_entry(my_sessions, groupID); //Update the parent neighbor type, if needed (need lock inside). if( (tempsess->isRoot) ) { upcall_deliver(msg, size, COMM_TYPE_COLLECT); } else if(upcall_forward(tempsess->parent, msg, size, COMM_TYPE_COLLECT) == 0) { if( (tempsess->parent==UNDEFINED) ) { //tempsess->parent = from; //NOTE: This was to handle the case where a join timed out -- and we were taken by two parents. debug_macro("WARNING: Collecting but no parent.\n"); return_code = 1; } else { route_data( tempsess->parent, groupID, COMM_TYPE_COLLECT, transport, msg, size, transport ); return_code = macedon_sendret; } } } else { debug_macro("WARNING: Collecting but no group.\n"); return_code = 1; } } API anycast { //FIXME: Some processing for when we are in the anycast group. return_code = 0; if(upcall_forward(groupID, msg, size, COMM_TYPE_ANYCAST) == 0) { visited dfsState; route_anycast( groupID, groupID, myhash, dfsState, msg, size, -1 ); return_code = macedon_sendret; } } API route { route_unicast_data(dest, msg, size, transport); return_code = macedon_sendret; } API routeIP { routeIP_unicast_data(dest, msg, size, transport); return_code = macedon_sendret; } forward unicast_data { return upcall_forward(dest, msg, size, COMM_TYPE_UNICAST); } recv unicast_data { upcall_deliver(msg, size, COMM_TYPE_UNICAST); } //API notify: Called when its possible this node's managed ID space // changes // //1 - ignore non-peer notifications //2 - check if we are the rooot on each session //3 - (if needed) tell the new root that it is root //4 - update isRoot status. API notify { #ifdef SCRIBE_TRACE debug_macro( "Received Neighbor Notify, type: %d, size: %d.\n", type, size); //for(int i=0; i<size; i++) { // sprintf(trace_buf_+(14*i), "(nbr:%.8x)",neighbors[i]); //} //sprintf(trace_buf_+(14*size),"\n"); //trace_print(); #endif if(type == NBR_TYPE_PEER) { foreach_neighbor (neighbor_session*, tempsess, my_sessions) { int isOwner; downcall_ext(isOwner, TEST_OWNER, (void*)&tempsess->ipaddr); if(tempsess->isRoot && !isOwner ) { debug_macro( "Group (%.8x) should be moved to new root.\n", tempsess->ipaddr); //Join under the new root of the session. tempsess->isRoot = false; //NOTE: A _REAL_ Implementation would need to keep state on whether they are source for each group. if(source_ == me) { children siblings; neighbor_clear(siblings); neighbor_add(siblings,tempsess->ipaddr); route_create_group( tempsess->ipaddr, tempsess->ipaddr, 0, 0 , -1); foreach_neighbor (neighbor_children*, afriend, tempsess->kids) { routeIP_pushdown( afriend->ipaddr, tempsess->ipaddr, siblings, 0, 0, -1 ); } neighbor_remove(my_sessions, tempsess->ipaddr); upcall_notify(my_sessions, NBR_TYPE_TREE); //notify the upper layer of the state change. } else { routeIP_join(tempsess->ipaddr,tempsess->ipaddr, 0, 0, -1); dojoin(tempsess->ipaddr); } } else if (!tempsess->isRoot && isOwner ) { //I am now the parent, so fix the timers (don't expect heartbeats from parents) tempsess->isRoot = true; donejoin(tempsess->ipaddr); unwatch(tempsess->parent); tempsess->parent = UNDEFINED; } } } } //API downcall_ext // //PUSHDOWN: instructs scribe to push down the given child from teh given tree. // 1 - remove state // 2 - route pushdown //ADD_CHILD: a _request_ by the upper layer to add the child to the given tree. // 1 - Apply a strict set of rules before accepting it, return an appropriate // error code // 2 - Accept the new child, not do not notify upper layer (it was their request, after all) // 3 - Send a heartbeat to the new child. //TEST_OWNER: pass on this ownership request to lower layer. //DEFAULT: error code on unrecognized query API downcall_ext { //Takes int operation, void* arg neighbor_session* tempsess; switch(operation) { case PUSHDOWN: { //NOTE: THE PUSHDOWN FN. TAKES AN INT ARRAY OF SIZE 2. //arg[0]=group_id, arg[1]=pushdown_id int* args=(int*)arg; debug_macro( "PUSHDOWN(gr:%.8x,no:%.8x) CALLED\n", args[0],args[1]); if(neighbor_query(my_sessions,args[0])) { tempsess=neighbor_entry(my_sessions,args[0]); if(neighbor_query(tempsess->kids, args[1])) { neighbor_remove(tempsess->kids, args[1]); unwatch(args[1]); //FIXME: Should this be routeIP or route? route_pushdown(args[1],args[0],tempsess->kids, 0, 0, -1); } else { debug_macro( "NODE NOT IN GROUP.\n"); } } else { debug_macro( "GROUP NOT IN SESSION.\n"); } break; } case ADD_CHILD: { child_add_arg* childToAdd = (child_add_arg*)arg; if(childToAdd->childId != myhash) { if(neighbor_query(my_sessions, childToAdd->groupId)) { tempsess = neighbor_entry(my_sessions, childToAdd->groupId); if(neighbor_size(tempsess->kids) == MAX_CHILDREN) { debug_macro( "TOO MANY CHILDREN!.\n"); return 5; } if(neighbor_query(tempsess->pathtoroot, childToAdd->childId) || tempsess->parent == UNDEFINED) { debug_macro( "CHILD IS IN PATH TO ROOT OR BUSY JOINING (parent: %.8x).\n",tempsess->parent); return 3; } if(!neighbor_query(tempsess->kids, childToAdd->childId)) { neighbor_add(tempsess->kids, childToAdd->childId); watch(childToAdd->childId); debug_macro( "[downcall(ADD_CHILD)] added kid %.8x for group %.8x, now have %d\n", childToAdd->childId, childToAdd->groupId, neighbor_size(tempsess->kids)); routeIP_membership(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1); } else { debug_macro( "CHILD ALREADY IN GROUP (sending membership).\n"); routeIP_membership(childToAdd->childId, childToAdd->groupId, tempsess->pathtoroot, 0, 0, -1); return 2; } } else { debug_macro( "GROUP NOT IN SESSION.\n"); return 1; //ERROR ADDING CHILD } } else { debug_macro( "POTENTIAL CHILD IS ME!\n"); return 4; //ERROR ADDING CHILD } break; } case TEST_OWNER: { int retval; downcall_ext(retval,TEST_OWNER,arg); return retval; } default: { debug_macro( "Scribe: Unrecognized Extensible Downcall Made (type=%d).\n",operation); return -1; //CALL UNRECOGNIZED } } return 0; } //API upcall_ext: // None currently defined -- so just return the error code (-1) API upcall_ext { //Takes int operation, void* arg switch(operation) { default: debug_macro( "Scribe: Unrecognized Extensible Upcall Made (type=%d).\n", operation); return -1; //CALL UNRECOGNIZED } return 0; }}routines { //Common procedure to increment/add a node to be watched. void watch(macedon_key who) { if(who == UNDEFINED) return;#ifdef SCRIBE_TRACE debug_macro("watch(%.8x)\n",who);#endif if(!neighbor_query(watched,who)) { neighbor_add( watched, who ); } neighbor_info(watched,who,count)++; } //Common procedure to decrement/delete a node being watched. void unwatch(macedon_key who) { if(who == UNDEFINED) return;#ifdef SCRIBE_TRACE debug_macro("unwatch(%.8x)\n",who);#endif neighbor_info(watched,who,count)--; if(neighbor_info(watched,who,count)==0) { neighbor_remove(watched,who); } } //Common procedure to join void dojoin(macedon_key which) { if(which == UNDEFINED) return; if(!neighbor_info(my_sessions,which,isRoot)) { if(!neighbor_query(outstanding_joins,which)) { if(neighbor_empty(outstanding_joins)) { timer_resched(joinTimer,JOIN_TIMER_PERIOD); } neighbor_add(outstanding_joins,which); } neighbor_info(outstanding_joins,which,delay)=0; } } //Common procedure to stop monitoring a join void donejoin(macedon_key which) { if(which == UNDEFINED) return; neighbor_remove(outstanding_joins,which); if(neighbor_empty(outstanding_joins)) { timer_cancel(joinTimer); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -