📄 coop-agent.old.cc
字号:
LayerInfo * this_layer = layers.arr[required_qlid]; if ( self_check(this_layer->root) == false) { /* If I am not the cluster root, then forward this to cluster root */ AppPacket *fwd_p = new AppPacket(JOIN_FORWARD); put_info_into_join_forward_packet (src_ag,qlid,original_dst,src_time,attach,fwd_p);/* DEBUG */ if ( (qlid < 0) || ( (qlid == 0) && (attach == false) ) ) { printf ("[Errcheck] 1: qlid %d attach %d state %d\n", qlid, attach, state); fflush(stdout); assert(0); }/* DEBUG */ send_pkt_wrapper(fwd_p,this_layer->root->ag.agent_id, this_layer->root->ag.agent_addr);#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : fwd to root < [ %d ] >\n", id, this_layer->root->ag.agent_id);#endif return; } if (attach == false) { void * pos = this_layer->ag_list.Locate(src_ag.agent_id); if (pos != NULL) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : dup delete\n", id);#endif LayerAgentInfo * this_la = this_layer->ag_list.GetAt(pos); assert (this_la != NULL); this_layer->ag_list.RemoveAt(pos); delete this_la; log_cluster_change_info(this_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(this_layer,this_layer->lid);#endif } /* Create response packet */ AppPacket *resp_p = new AppPacket(JOIN_RESPONSE); put_layer_cluster_info_into_packet(this_layer,resp_p); resp_p->u.joinresp_p.accept = true; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; // resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id; memcpy(& resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in)); send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr);#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : sent response\n", id);#endif } else { /* Attach appropriately to the cluster */ LayerAgentInfo *la_ag = new LayerAgentInfo(src_ag.agent_id,src_ag.agent_addr); if (this_layer->AddClusterMember(la_ag) == false) { delete la_ag; la_ag = this_layer->FindClusterMember(src_ag.agent_id); la_ag->refresh = true; } // if (ap->st == JOIN_QUERY) { // and not JOIN_FORWARD // la_ag->dist = /* Scheduler::my_clock() */ my_clock() - src_time; if ((la_ag->valid_dist_clock == false) || (my_clock() - la_ag->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & la_ag->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); la_ag->dist = estimate_rtt_wrapper(their_addr); la_ag->dist_clock = my_clock(); la_ag->valid_dist_clock = true; } #ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : *attach*\n", id);#endif // Immediate update of new member joining the cluster this_layer->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(this_layer,false,true,NULL); this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return;}void coopAgent::split_cluster_using_two_partition (LayerInfo * l) { LayerAgentInfo ** ag_arr = l->CreateLayerAgentInfoArray(); assert (ag_arr != NULL); double * cost = create_cost_matrix(id,ag_arr,l->ag_list.GetSize()); int root1_index, root2_index; int * index_set1 = NULL; int * index_set2 = NULL; int set1_size, set2_size; // Partition the set of members int lower_size = l->ag_list.GetSize() / 2; two_partition(l->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index); assert (index_set1 != NULL); assert (index_set2 != NULL); free(cost); // After the split, cluster1 is my cluster, cluster2 is the other cluster LayerAgentInfo ** cluster1 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set1_size); bool need_to_swap = true; for (int i = 0; i < set1_size; i++) { cluster1[i] = ag_arr[index_set1[i]]; if (self_check(cluster1[i]) == true) need_to_swap = false; } LayerAgentInfo ** cluster2 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set2_size); for (int i = 0; i < set2_size; i++) cluster2[i] = ag_arr[index_set2[i]]; LayerAgentInfo * c1_root = ag_arr[root1_index]; LayerAgentInfo * c2_root = ag_arr[root2_index]; if (need_to_swap == true) { LayerAgentInfo ** tmp_cl = cluster1; cluster1 = cluster2; cluster2 = tmp_cl; int tmp_cl_size = set1_size; set1_size = set2_size; set2_size = tmp_cl_size; LayerAgentInfo * tmp_root = c1_root; c1_root = c2_root; c2_root = tmp_root; } free(ag_arr); free(index_set1); free(index_set2); // Purge my own cluster of members that are in the other cluster // Create a temporary layer to send out the root transfer message LayerInfo * tmp_layer = new LayerInfo (l->lid,this); for (int i = 0; i < set2_size; i++) { tmp_layer->AddClusterMember(cluster2[i]); l->DeleteClusterMember(cluster2[i]); } tmp_layer->root = c2_root;#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f split-cluster-self lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);#endif // Send refresh or root transfer for my cluster if (self_check(c1_root) == false) { do_internal_root_transfer(l,c1_root); } else { l->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(l,false,true,NULL); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); }#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f split-cluster-other lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);#endif // Transfer control of the other cluster send_cluster_remove(tmp_layer,true); // Cleanup ... delete tmp_layer; free(cluster1); free(cluster2); return;}/* This will split the cluster associated with lid. The new cluster * that contains this member will stay as is, the other cluster * will be spawned off. */void coopAgent::split_cluster_assuming_2k (LayerInfo * l) { // Currently, I am implementing a naive scheme. The leader // chooses its nearest neighbor to stay with it in its cluster. // Other more intelligent schemes will be implemented later. assert (l->ag_list.GetSize() == UPPER_2K); LinkedList<LayerAgentInfo *, double> my_cluster_list; /* In my_cluster, the agents are sorted in decreasing order of distance */ LinkedList<LayerAgentInfo *, double> other_cluster_list; /* First, split the agents into my cluster and other cluster */ /* All agents that I know distances to, are my cluster to start with */ LayerAgentInfo *self = NULL; for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo *la_ag = l->ag_list.GetAt(pos); if ( self_check(la_ag) == true) { self = la_ag; continue; } if (la_ag->dist >= 0.0) { /* -1.0 is for sorting it in decreasing order of distance */ my_cluster_list.Add(la_ag,-1.0 * la_ag->dist); } else { /* Sorted by increasing distance order */ other_cluster_list.Add(la_ag,la_ag->dist); } } /* Whichever set has the higher number of agents, need to transfer * some agents to the other set */ void *pos2 = other_cluster_list.GetHeadPosition(); while ( my_cluster_list.GetSize() < (LOWER_K - 1) ) { LayerAgentInfo *a = other_cluster_list.GetAt(pos2); my_cluster_list.Add(a,-1.0 * a->dist); void * old_pos2 = pos2; other_cluster_list.GetNext(pos2); other_cluster_list.RemoveAt(old_pos2); } void *pos3 = my_cluster_list.GetHeadPosition(); while (other_cluster_list.GetSize() < LOWER_K) { LayerAgentInfo *a = my_cluster_list.GetAt(pos2); other_cluster_list.Add(a,a->dist); void * old_pos3 = pos3; my_cluster_list.GetNext(pos3); my_cluster_list.RemoveAt(old_pos3); } // Re-create my own cluster tree l->RemoveAgents(); l->AddClusterRoot(self); l->me_in_layer = true; for (void *pos4 = my_cluster_list.GetHeadPosition(); pos4 != NULL; my_cluster_list.GetNext(pos4) ) { l->AddClusterMember(my_cluster_list.GetAt(pos4)); } // Create the other cluster // SimpleTree<LayerAgentInfo *> other_cluster_tree; // void * pos5 = other_cluster_list.GetHeadPosition(); // assert (pos5 != NULL); // LayerAgentInfo * oc_root_la = other_cluster.GetAt(pos5); // void * oc_root_pos = other_cluster_tree.SetRoot(oc_root_la); // for (pos5 = other_cluster_list.GetNext(pos5); // pos5 != NULL; // other_cluster_list.GetNext(pos5) ) { // other_cluster_tree.AddChild(other_cluster_list.GetAt(pos5),oc_root_pos); // } /* HAVE TO SEND MESSAGE TO THE OTHER CLUSTER MEMBERS */ my_cluster_list.RemoveAll(); other_cluster_list.RemoveAll(); return;}void coopAgent::handle_join_response (AppPacket * ap) { /* Need to validate if this is a desired response packet */ if (valid_join_response_packet(ap) == false) return; int this_lid = ap->u.joinresp_p.layer_id;#ifdef LOG_COOP_RECV printf ("\n[coop %d ] At %8.4f (c-jr) : join-resp : lid %d from < [ %d ] > exp-src < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(),this_lid,ap->src_agent, ap->u.joinresp_p.exp_src.agent_id);#endif if ( (ap->src_agent == bse.agent_id)/* && (ap->src == bse.node_id)*/ ) { assert (ap->u.joinresp_p.accept == true); /* if (ap->u.joinresp_p.your_id != UNDEFINED_AGENT_ID) id = ap->u.joinresp_p.your_id; // recv new id */ // BSE-JF : Have to delete all other members in curr_join_q_lid etc. // or have to wait till all resps are recvd and decide whether to // accept BSE or the other members flush_layer(this_lid); if (this_lid+1 < MAX_LAYERS) { flush_layer(this_lid+1); } layers.arr[this_lid] = new LayerInfo(this_lid,this); LayerInfo * this_layer = layers.arr[this_lid]; jqt.CancelTimer(); /* This is the top layer returned by BSE */ put_packet_cluster_info_into_layer(ap,this_layer); if (this_lid == target_join_q_lid) { // This is the desired layer void * pos = this_layer->ag_list.Locate(id); assert (pos != NULL); LayerAgentInfo * self = this_layer->ag_list.GetAt(pos); self->dist = 0.0; LayerAgentInfo * bse_la = this_layer->FindClusterMember(bse.agent_id); assert (bse_la != NULL); // bse_la->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time; if ((bse_la->valid_dist_clock == false) || (my_clock() - bse_la->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & bse_la->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); bse_la->dist = estimate_rtt_wrapper(their_addr); bse_la->dist_clock = my_clock(); bse_la->valid_dist_clock = true; } this_layer->me_in_layer = true; this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); this_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); state = ATTACHED; set_higher_layer_ping_timer();#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : attach-to-bse\n\n", id);#endif log_cluster_change_info(this_layer->lid);#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(this_layer,this_layer->lid);#endif } else { assert (this_lid > target_join_q_lid); void * pos = this_layer->ag_list.Locate(id); if (pos != NULL) { // I am considered to be in this cluster ! assert ( self_check(this_layer->root) == false); // I am not root #ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : part of returned cluster! delete\n", id);#endif delete_self_from_layer(this_layer); }#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : continue-join-query\n\n", id);#endif continue_join_query(this_lid); } } else { /* This is a message from the higher layer cluster * We need to just save the information till the timeout */ LayerInfo * higher_layer = layers.arr[this_lid + 1]; assert (higher_layer != NULL); void * mem_pos = higher_layer->ag_list.Locate(ap->u.joinresp_p.exp_src.agent_id); assert (mem_pos != NULL); LayerAgentInfo *hl_member = higher_layer->ag_list.GetAt(mem_pos); if (ap->u.joinresp_p.accept == false) {#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jr) : join-reject\n\n", id);#endif higher_layer->DeleteClusterMember(hl_member); delete hl_member; } else { /* If the response is coming from a different agent (I thought some * one else was the cluster leader, then we need to update that here */ if ( (ap->u.joinresp_p.exp_src.agent_id != ap->src_agent) /*|| (ap->u.joinresp_p.exp_src.node_id != ap->src) */) { hl_member->ag.agent_id = ap->src_agent; // hl_member->ag.node_id = ap->src; memcpy(& hl_member->ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in)); } assert (ap->u.joinresp_p.mbr_count > 0); assert ( (hl_member->ag.agent_id == ap->u.joinresp_p.agent_arr[0].ag.agent_id) /* && (hl_member->ag.node_id == ap->u.joinresp_p.agent_arr[0].ag.node_id) */); if (hl_member->lower_layer != NULL) delete hl_member->lower_layer; else m_join_resp_received ++; hl_member->lower_layer = new LayerInfo (this_lid,this); put_packet_cluster_info_into_layer (ap,hl_member->lower_layer); LayerAgentInfo * self = hl_member->lower_layer->FindClusterMember(id); if (self != NULL) self->dist = 0.0; /* Using the time metric. Can use any other */ // hl_member->dist = /* Scheduler::my_clock() */ my_clock() - ap->send_time; if ((hl_member->valid_dist_clock == false) || (my_clock() - hl_member->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & hl_member->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); hl_member->dist = estimate_rtt_wrapper(their_addr); hl_member->dist_clock = my_clock(); hl_member->valid_dist_clock = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -