⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 5 页
字号:
    target_join_q_lid = dst_join_lid;  AppPacket *p = new AppPacket(JOIN_QUERY);  init_join_query_packet(p,target_join_q_lid,/* Scheduler::my_clock() */ my_clock(),false, udp_recv_agent_addr);//sunny change3 (6lines)	p->src_agent=this->id;    	memcpy(& p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));	p->dst_agent=bse.agent_id;    	memcpy(& p->dst_agent_addr, & bse.agent_addr, sizeof(struct sockaddr_in));//sunny   send_pkt_wrapper(p,bse.agent_id,bse.agent_addr);  /* Set the timeout to resend the request if needed */  jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT);  return;}void coopAgent::init_layer_arr (void) {  for (int i = 0; i < MAX_LAYERS; i++) {    if (layers.arr[i] != NULL) {      delete layers.arr[i];      layers.arr[i] = NULL;    }  }  return;}void coopAgent::stop (void) {  assert (started == true);//PRM-added, changed//sunny/*	if ( !m_mature && isInternal() ) {		down_internal++;		if (SHOW_ALL)			printf("internal_stop %d %f\n", id, my_clock() );	}	else if ( !m_mature )		down_leaf++;*///sunny	//Variable from sleeDisplay	//UpCount--;	g_stop++;	//if ( id == 1 && m_mature )	//	sleeDisplay();//PRM//#ifdef LOG_COOP_STOP  printf ("[coop %d ] At %8.4f stop\n", id, /* Scheduler::my_clock() */ my_clock());//#endif   jqt.CancelTimer();  cancel_higher_layer_ping_timers();//PRM	qrt.CancelTimer();	double dtemp = m_recent_up_time;	/*	if ( m_recent_up_time < 500.0 )		dtemp = 500.0;	else 		dtemp = m_recent_up_time;	*/	double down_time = my_clock();	/*	double final_time = 1.0*(CUT_OFF-1)/16 + 500 ;	if ( down_time > final_time )		down_time = final_time;	*/	if ( down_time > dtemp )		m_total_up_time += down_time - dtemp;	m_recent_up_time = LONG_TIME;	/*	if ( id <= NUM_AGENT ) {		nodes[id] = -1;		agents[id] = NULL;	}	*///PRM  delete_self_from_all_higher_layers(-1);  init_layer_arr();  Agent::stop();  return;}void coopAgent::specific_send_data_pkt (char* payload, int data_len) {  assert (started == true);//#ifdef LOG_DATA_PACKET_INIT  printf ("[coop %d ] At %8.4f source data-pkt : seq %d length %d\n", id, my_clock(), m_data_pkt_seq, data_len);//#endif   //PRM-changed  //forward_data_packet(payload, data_len, id, udp_recv_agent_addr, m_data_pkt_seq ++,0,-1,true);  printf("sunny:  call forward_data_packet in specific_send_data_pkt\n");  forward_data_packet(payload, data_len, id, udp_recv_agent_addr, m_data_pkt_seq ++,0,-1,true, FRESH_DATA, my_clock(),0);  if ( m_data_pkt_seq <= CUT_OFF )    num_pkt_init++;    //PRMprintf("sunny: finished well in specific_send_data_pkt\n");  return;}int coopAgent::specific_rx_pkt_handler  (Packet *p) {  if (p->t != PACKET_APP) {     printf ("[Err] COOP %d received unknown packet type\n", id);     return -1;  }  AppPacket *ap = (AppPacket *)p;  rx_pkt_wrapper(ap);  switch (ap->st) {  case JOIN_QUERY :  case JOIN_FORWARD :    handle_join_query_forward(ap);    break;  case JOIN_RESPONSE :    handle_join_response(ap);    break;  case CLUSTER_REFRESH :    handle_cluster_refresh(ap);    break;  case CLUSTER_MERGE :    handle_cluster_merge(ap);    break;  case PING_QUERY :    handle_ping_query(ap);    break;  case PING_RESPONSE :    handle_ping_response(ap);    break;  case PACKET_DATA :    return handle_data_packet(ap);  case PACKET_DATA_ACK :    handle_data_ack_packet(ap);    break;//PRM-added	case QUERY_RANDOM:		handle_query_random(ap);		break;	case RESPONSE_RANDOM:		handle_response_random(ap);		break;	case RETRANSMIT_REQUEST:		handle_retransmit_request(ap);		break;		//Adpot		//case REQUEST_ADOPT:		//handle_request_adopt(ap);		//break;//PRM  default :    printf ("[Err] COOP %d received unknown packet subtype\n", id);    return -1;  }  return 0;}void coopAgent::handle_join_query_forward (AppPacket * ap) {  assert ( (ap->st == JOIN_QUERY) || (ap->st == JOIN_FORWARD) );  int qlid;  double src_time;  bool attach;  AgentInfo src_ag;  AgentInfo original_dst;  if (ap->st == JOIN_QUERY) {    qlid = ap->u.joinq_p.q_lid;    attach = ap->u.joinq_p.attach;    src_ag.agent_id = ap->src_agent;    memcpy(& src_ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));     //src_ag.agent_addr = ap->src;    original_dst.agent_id = id;    //    original_dst.node_id = n->id;    memcpy(& original_dst.agent_addr, & udp_recv_agent_addr, sizeof(struct sockaddr_in));    src_time = ap->u.joinq_p.send_time;  }  else {    qlid = ap->u.joinforward_p.q_lid;    attach = ap->u.joinforward_p.attach;    src_ag.agent_id = ap->u.joinforward_p.src_ag.agent_id;    //    src_ag.node_id = ap->u.joinforward_p.src_ag.node_id;    memcpy(& src_ag.agent_addr, & ap->u.joinforward_p.src_ag.agent_addr, sizeof(struct sockaddr_in));    original_dst.agent_id = ap->u.joinforward_p.original_dst.agent_id;    //    original_dst.node_id = ap->u.joinforward_p.original_dst.node_id;    memcpy(& original_dst.agent_addr, & ap->u.joinforward_p.original_dst.agent_addr, sizeof(struct sockaddr_in));    src_time = ap->u.joinforward_p.send_time;  }    int required_qlid;  if (attach == true)    required_qlid = qlid;  else    required_qlid = qlid - 1;    /* First check if this agent should send a response at all */  if (valid_join_query_forward_packet(qlid,attach) == false) {    AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);//sunny change3 (6lines)	resp_p->src_agent=this->id;    	memcpy(& resp_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));	resp_p->dst_agent=src_ag.agent_id;    	memcpy(& resp_p->dst_agent_addr, & src_ag.agent_addr, sizeof(struct sockaddr_in));//sunny     resp_p->u.joinresp_p.layer_id = required_qlid;    resp_p->u.joinresp_p.accept = false;    resp_p->u.joinresp_p.mbr_count = 0;    resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id;    //    resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id;    memcpy(& resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in));    send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr);    return;  }  //#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f (c-jqf) recvd-valid-join-qf : lid %d from < [ %d ] > exp-src < [ %d ] >\n", id, /* Scheduler::my_clock() */ my_clock(), qlid,ap->src_agent, src_ag.agent_id);//#endif     LayerInfo * this_layer = layers.arr[required_qlid];    if ( self_check(this_layer->root) == false) {    /* If I am not the cluster root, then forward this to cluster root */    AppPacket *fwd_p = new AppPacket(JOIN_FORWARD);    put_info_into_join_forward_packet (src_ag,qlid,original_dst,src_time,attach,fwd_p);/* DEBUG */    if ( (qlid < 0) || ( (qlid == 0) && (attach == false) ) ) {      printf ("[Errcheck] 1: qlid %d attach %d state %d\n", qlid, attach, state);      fflush(stdout);      assert(0);    }/* DEBUG *///sunny change3 (6lines)	fwd_p->src_agent=this->id;    	memcpy(& fwd_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));	fwd_p->dst_agent=this_layer->root->ag.agent_id;    	memcpy(& fwd_p->dst_agent_addr, & this_layer->root->ag.agent_addr, sizeof(struct sockaddr_in));//sunny     send_pkt_wrapper(fwd_p,this_layer->root->ag.agent_id, this_layer->root->ag.agent_addr);//#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-jqf) : fwd to root < [ %d ] >\n", id, this_layer->root->ag.agent_id);//#endif     return;  }    if (attach == false) {        void * pos = this_layer->ag_list.Locate(src_ag.agent_id);    if (pos != NULL) {//#ifdef LOG_COOP_JUNK      printf ("[coop %d ] . . (c-jqf) : dup delete\n", id);//#endif       LayerAgentInfo * this_la = this_layer->ag_list.GetAt(pos);      assert (this_la != NULL);      this_layer->ag_list.RemoveAt(pos);      delete this_la;      log_cluster_change_info(this_layer->lid);//#ifdef LOG_COOP_CLUSTER_CHANGE      display_cluster_info(this_layer,this_layer->lid);//#endif     }        /* Create response packet */    AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);    put_layer_cluster_info_into_packet(this_layer,resp_p);//sunny change3 (6lines)	resp_p->src_agent=this->id;    	memcpy(& resp_p->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in));	resp_p->dst_agent=src_ag.agent_id;    	memcpy(& resp_p->dst_agent_addr, & src_ag.agent_addr, sizeof(struct sockaddr_in));//sunny     resp_p->u.joinresp_p.accept = true;    resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id;    //    resp_p->u.joinresp_p.exp_src.node_id = original_dst.node_id;    memcpy(& resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in));    send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr);//#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-jqf) : sent response\n", id);//#endif   }    else { /* Attach appropriately to the cluster */    LayerAgentInfo *la_ag = new LayerAgentInfo(src_ag.agent_id,src_ag.agent_addr);    if (this_layer->AddClusterMember(la_ag) == false) {      delete la_ag;      la_ag = this_layer->FindClusterMember(src_ag.agent_id);      la_ag->refresh = true;    }    // if (ap->st == JOIN_QUERY) { // and not JOIN_FORWARD    //      la_ag->dist = /* Scheduler::my_clock() */ my_clock() - src_time;    if ((la_ag->valid_dist_clock == false) || (my_clock() - la_ag->dist_clock >= DIST_CLOCK_THRESHOLD)) {      //  dist = MAX_RTT;      struct sockaddr_in their_addr;       memcpy(& their_addr, & la_ag->ag.agent_addr, sizeof(struct sockaddr_in));      their_addr.sin_port = htons(DIST_EST_PORT);      la_ag->dist = estimate_rtt_wrapper(their_addr);      la_ag->dist_clock = my_clock();      la_ag->valid_dist_clock = true;    }    //#ifdef LOG_COOP_JUNK    printf ("[coop %d ] . . (c-jqf) : *attach*\n", id);//#endif         // Immediate update of new member joining the cluster    this_layer->cr_msgt.CancelTimer();    send_all_cluster_refresh_packet(this_layer,false,true,NULL);    this_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  }    return;}void coopAgent::split_cluster_using_two_partition (LayerInfo * l) {  LayerAgentInfo ** ag_arr = l->CreateLayerAgentInfoArray();  assert (ag_arr != NULL);  double * cost = create_cost_matrix(id,ag_arr,l->ag_list.GetSize());  int root1_index, root2_index;  int * index_set1 = NULL;  int * index_set2 = NULL;  int set1_size, set2_size;  // Partition the set of members  int lower_size = l->ag_list.GetSize() / 2;  two_partition(l->ag_list.GetSize(),cost,lower_size,index_set1,set1_size,root1_index,index_set2,set2_size,root2_index);  assert (index_set1 != NULL);  assert (index_set2 != NULL);  free(cost);  // After the split, cluster1 is my cluster, cluster2 is the other cluster  LayerAgentInfo ** cluster1 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set1_size);  bool need_to_swap = true;  for (int i = 0; i < set1_size; i++) {    cluster1[i] = ag_arr[index_set1[i]];    if (self_check(cluster1[i]) == true)      need_to_swap = false;  }  LayerAgentInfo ** cluster2 = (LayerAgentInfo **) safe_malloc (sizeof (LayerAgentInfo *) * set2_size);  for (int i = 0; i < set2_size; i++)    cluster2[i] = ag_arr[index_set2[i]];  LayerAgentInfo * c1_root = ag_arr[root1_index];  LayerAgentInfo * c2_root = ag_arr[root2_index];  if (need_to_swap == true) {    LayerAgentInfo ** tmp_cl = cluster1;    cluster1 = cluster2;    cluster2 = tmp_cl;    int tmp_cl_size = set1_size;    set1_size = set2_size;    set2_size = tmp_cl_size;    LayerAgentInfo * tmp_root = c1_root;    c1_root = c2_root;    c2_root = tmp_root;  }  free(ag_arr);  free(index_set1);  free(index_set2);  // Purge my own cluster of members that are in the other cluster  // Create a temporary layer to send out the root transfer message  LayerInfo * tmp_layer = new LayerInfo (l->lid,this);  for (int i = 0; i < set2_size; i++) {    tmp_layer->AddClusterMember(cluster2[i]);    l->DeleteClusterMember(cluster2[i]);  }  tmp_layer->root = c2_root;//#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f split-cluster-self lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);//#endif   // Send refresh or root transfer for my cluster  if (self_check(c1_root) == false) {    do_internal_root_transfer(l,c1_root);  }  else {    l->cr_msgt.CancelTimer();    send_all_cluster_refresh_packet(l,false,true,NULL);    l->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  }//#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f split-cluster-other lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), l->lid);//#endif   // Transfer control of the other cluster  send_cluster_remove(tmp_layer,true);  // Cleanup ...

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -