📄 coop-agent.cc
字号:
target_join_q_lid = dst_join_lid; AppPacket *p = new AppPacket(JOIN_QUERY); init_join_query_packet(p,target_join_q_lid,/* Scheduler::my_clock() */ my_clock(),false, udp_recv_agent_addr);//sunny change3 (6lines) p->src_agent=this->id; memcpy(& p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); p->dst_agent=bse.agent_id; memcpy(& p->dst_agent_addr, & bse.agent_addr, sizeof(struct sockaddr_in));//sunny send_pkt_wrapper(p,bse.agent_id,bse.agent_addr); /* Set the timeout to resend the request if needed */ jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT); return;}void coopAgent::init_layer_arr (void) { for (int i = 0; i < MAX_LAYERS; i++) { if (layers.arr[i] != NULL) { delete layers.arr[i]; layers.arr[i] = NULL; } } return;}void coopAgent::stop (void) { assert (started == true);//PRM-added, changed//sunny/* if ( !m_mature && isInternal() ) { down_internal++; if (SHOW_ALL) printf("internal_stop %d %f\n", id, my_clock() ); } else if ( !m_mature ) down_leaf++;*///sunny //Variable from sleeDisplay //UpCount--; g_stop++; //if ( id == 1 && m_mature ) // sleeDisplay();//PRM//#ifdef LOG_COOP_STOP printf ("[coop %d ] At %8.4f stop\n", id, /* Scheduler::my_clock() */ my_clock());//#endif jqt.CancelTimer(); cancel_higher_layer_ping_timers();//PRM qrt.CancelTimer(); double dtemp = m_recent_up_time; /* if ( m_recent_up_time < 500.0 ) dtemp = 500.0; else dtemp = m_recent_up_time; */ double down_time = my_clock(); /* double final_time = 1.0*(CUT_OFF-1)/16 + 500 ; if ( down_time > final_time ) down_time = final_time; */ if ( down_time > dtemp ) m_total_up_time += down_time - dtemp; m_recent_up_time = LONG_TIME; /* if ( id <= NUM_AGENT ) { nodes[id] = -1; agents[id] = NULL; } *///PRM delete_self_from_all_higher_layers(-1); init_layer_arr(); Agent::stop(); return;}void coopAgent::specific_send_data_pkt (char* payload, int data_len) { assert (started == true);//#ifdef LOG_DATA_PACKET_INIT printf ("[coop %d ] At %8.4f source data-pkt : seq %d length %d\n", id, my_clock(), m_data_pkt_seq, data_len);//#endif //PRM-changed //forward_data_packet(payload, data_len, id, udp_recv_agent_addr, m_data_pkt_seq ++,0,-1,true); printf("sunny: call forward_data_packet in specific_send_data_pkt\n"); forward_data_packet(payload, data_len, id, udp_recv_agent_addr, m_data_pkt_seq ++,0,-1,true, FRESH_DATA, my_clock(),0); if ( m_data_pkt_seq <= CUT_OFF ) num_pkt_init++; //PRMprintf("sunny: finished well in specific_send_data_pkt\n"); return;}int coopAgent::specific_rx_pkt_handler (Packet *p) { if (p->t != PACKET_APP) { printf ("[Err] COOP %d received unknown packet type\n", id); return -1; } AppPacket *ap = (AppPacket *)p; rx_pkt_wrapper(ap); switch (ap->st) { case JOIN_QUERY : case JOIN_FORWARD : handle_join_query_forward(ap); break; case JOIN_RESPONSE : handle_join_response(ap); break; case CLUSTER_REFRESH : handle_cluster_refresh(ap); break; case CLUSTER_MERGE : handle_cluster_merge(ap); break; case PING_QUERY : handle_ping_query(ap); break; case PING_RESPONSE : handle_ping_response(ap); break; case PACKET_DATA : return handle_data_packet(ap); case PACKET_DATA_ACK : handle_data_ack_packet(ap); break;//PRM-added case QUERY_RANDOM: handle_query_random(ap); break; case RESPONSE_RANDOM: handle_response_random(ap); break; case RETRANSMIT_REQUEST: handle_retransmit_request(ap); break; //Adpot //case REQUEST_ADOPT: //handle_request_adopt(ap); //break;//PRM default : printf ("[Err] COOP %d received unknown packet subtype\n", id); return -1; } return 0;}void coopAgent::handle_join_query_forward (AppPacket * ap) { assert ( (ap->st == JOIN_QUERY) || (ap->st == JOIN_FORWARD) ); int qlid; double src_time; bool attach; AgentInfo src_ag; AgentInfo original_dst; if (ap->st == JOIN_QUERY) { qlid = ap->u.joinq_p.q_lid; attach = ap->u.joinq_p.attach; src_ag.agent_id = ap->src_agent; memcpy(& src_ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in)); //src_ag.agent_addr = ap->src; original_dst.agent_id = id; // original_dst.node_id = n->id; memcpy(& original_dst.agent_addr, & udp_recv_agent_addr, sizeof(struct sockaddr_in)); src_time = ap->u.joinq_p.send_time; } else { qlid = ap->u.joinforward_p.q_lid; attach = ap->u.joinforward_p.attach; src_ag.agent_id = ap->u.joinforward_p.src_ag.agent_id; // src_ag.node_id = ap->u.joinforward_p.src_ag.node_id; memcpy(& src_ag.agent_addr, & ap->u.joinforward_p.src_ag.agent_addr, sizeof(struct sockaddr_in)); original_dst.agent_id = ap->u.joinforward_p.original_dst.agent_id; // original_dst.node_id = ap->u.joinforward_p.original_dst.node_id; memcpy(& original_dst.agent_addr, & ap->u.joinforward_p.original_dst.agent_addr, sizeof(struct sockaddr_in)); src_time = ap->u.joinforward_p.send_time; } int required_qlid; if (attach == true) required_qlid = qlid; else required_qlid = qlid - 1; /* First check if this agent should send a response at all */ if (valid_join_query_forward_packet(qlid,attach) == false) { AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);//sunny change3 (6lines) resp_p->src_agent=this->id; memcpy(& resp_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); resp_p->dst_agent=src_ag.agent_id; memcpy(& resp_p->dst_agent_addr, & src_ag.agent_addr, sizeof(struct sockaddr_in));//sunny resp_p->u.joinresp_p.layer_id = required_qlid; resp_p->u.joinresp_p.accept = false; resp_p->u.joinresp_p.mbr_count = 0; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; // resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id; memcpy(& resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in)); send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr); return; } //#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f (c-jqf) recvd-valid-join-qf : lid %d from < [ %d ] > exp-src < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), qlid,ap->src_agent, src_ag.agent_id);//#endif LayerInfo * this_layer = layers.arr[required_qlid]; if ( self_check(this_layer->root) == false) { /* If I am not the cluster root, then forward this to cluster root */ AppPacket *fwd_p = new AppPacket(JOIN_FORWARD); put_info_into_join_forward_packet (src_ag,qlid,original_dst,src_time,attach,fwd_p);/* DEBUG */ if ( (qlid < 0) || ( (qlid == 0) && (attach == false) ) ) { printf ("[Errcheck] 1: qlid %d attach %d state %d\n", qlid, attach, state); fflush(stdout); assert(0); }/* DEBUG *///sunny change3 (6lines) fwd_p->src_agent=this->id; memcpy(& fwd_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); fwd_p->dst_agent=this_layer->root->ag.agent_id; memcpy(& fwd_p->dst_agent_addr, & this_layer->root->ag.agent_addr, sizeof(struct sockaddr_in));//sunny send_pkt_wrapper(fwd_p,this_layer->root->ag.agent_id, this_layer->root->ag.agent_addr);//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : fwd to root < [ %d ] >\n", id, this_layer->root->ag.agent_id);//#endif return; } if (attach == false) { void * pos = this_layer->ag_list.Locate(src_ag.agent_id); if (pos != NULL) {//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : dup delete\n", id);//#endif LayerAgentInfo * this_la = this_layer->ag_list.GetAt(pos); assert (this_la != NULL); this_layer->ag_list.RemoveAt(pos); delete this_la; log_cluster_change_info(this_layer->lid);//#ifdef LOG_COOP_CLUSTER_CHANGE display_cluster_info(this_layer,this_layer->lid);//#endif } /* Create response packet */ AppPacket *resp_p = new AppPacket(JOIN_RESPONSE); put_layer_cluster_info_into_packet(this_layer,resp_p);//sunny change3 (6lines) resp_p->src_agent=this->id; memcpy(& resp_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); resp_p->dst_agent=src_ag.agent_id; memcpy(& resp_p->dst_agent_addr, & src_ag.agent_addr, sizeof(struct sockaddr_in));//sunny resp_p->u.joinresp_p.accept = true; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; // resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id; memcpy(& resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in)); send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr);//#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : sent response\n", id);//#endif } else { /* Attach appropriately to the cluster */ LayerAgentInfo *la_ag = new LayerAgentInfo(src_ag.agent_id,src_ag.agent_addr); if (this_layer->AddClusterMember(la_ag) == false) { delete la_ag; la_ag = this_layer->FindClusterMember(src_ag.agent_id); la_ag->refresh = true; } // if (ap->st == JOIN_QUERY) { // and not JOIN_FORWARD // la_ag->dist = /* Scheduler::my_clock() */ my_clock() - src_time; if ((la_ag->valid_dist_clock == false) || (my_clock() - la_ag->dist_clock >= DIST_CLOCK_THRESHOLD)) { // dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & la_ag->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); la_ag->dist = estimate_rtt_wrapper(their_addr); la_ag->dist_clock = my_clock(); la_ag->valid_dist_clock = true; } //#ifdef LOG_COOP_JUNK printf ("[coop %d ] . . (c-jqf) : *attach*\n", id);//#endif // Immediate update of new member joining the cluster this_layer->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(this_layer,false,true,NULL); this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); } return;}void coopAgent::split_cluster_using_two_partition (LayerInfo * l) { LayerAgentInfo ** ag_arr = l->CreateLayerAgentInfoArray(); assert (ag_arr != NULL); double * cost = create_cost_matrix(id,ag_arr,l->ag_list.GetSize()); int root1_index, root2_index; int * index_set1 = NULL; int * index_set2 = NULL; int set1_size, set2_size; // Partition the set of members int lower_size = l->ag_list.GetSize() / 2; two_partition(l->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index); assert (index_set1 != NULL); assert (index_set2 != NULL); free(cost); // After the split, cluster1 is my cluster, cluster2 is the other cluster LayerAgentInfo ** cluster1 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set1_size); bool need_to_swap = true; for (int i = 0; i < set1_size; i++) { cluster1[i] = ag_arr[index_set1[i]]; if (self_check(cluster1[i]) == true) need_to_swap = false; } LayerAgentInfo ** cluster2 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set2_size); for (int i = 0; i < set2_size; i++) cluster2[i] = ag_arr[index_set2[i]]; LayerAgentInfo * c1_root = ag_arr[root1_index]; LayerAgentInfo * c2_root = ag_arr[root2_index]; if (need_to_swap == true) { LayerAgentInfo ** tmp_cl = cluster1; cluster1 = cluster2; cluster2 = tmp_cl; int tmp_cl_size = set1_size; set1_size = set2_size; set2_size = tmp_cl_size; LayerAgentInfo * tmp_root = c1_root; c1_root = c2_root; c2_root = tmp_root; } free(ag_arr); free(index_set1); free(index_set2); // Purge my own cluster of members that are in the other cluster // Create a temporary layer to send out the root transfer message LayerInfo * tmp_layer = new LayerInfo (l->lid,this); for (int i = 0; i < set2_size; i++) { tmp_layer->AddClusterMember(cluster2[i]); l->DeleteClusterMember(cluster2[i]); } tmp_layer->root = c2_root;//#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f split-cluster-self lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);//#endif // Send refresh or root transfer for my cluster if (self_check(c1_root) == false) { do_internal_root_transfer(l,c1_root); } else { l->cr_msgt.CancelTimer(); send_all_cluster_refresh_packet(l,false,true,NULL); l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); }//#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f split-cluster-other lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);//#endif // Transfer control of the other cluster send_cluster_remove(tmp_layer,true); // Cleanup ...
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -