📄 coop-agent.cc
字号:
//PRM change int this_lid = ap->u.data_p.lid; // additional changes/* if ( ap->u.data_p.seq_no < CUT_OFF) {*/ num_all_recv++; m_all_recv++; if (this_lid==-1) { num_rand_recv++; }//sunny change4fd=fopen(filename, "w");fprintf(fd, "id:%d, num_all_recv:%d, pk_duplicate:%d", this->id, num_all_recv, pk_duplicate);fclose(fd);/* } else m_mature=1;*/ // end of add //PRM data_pkt *dptr = &ap->u.data_p; //double now = Scheduler::Clock(); double now = my_clock(); if (max_delay < now - dptr->orig_time ) max_delay = now - dptr->orig_time; if ( dptr->seq_no > m_max_seq_recv ) { m_max_seq_recv = dptr->seq_no; if ( dptr->type == FRESH_DATA ) { m_dist_from_src = dptr->overlay_hop + 1; m_parent = ap->src_agent; } }//sunny if (m_use_packet_cache == true) {printf("sunny: before add_to _packet_cache in handle_data_pkt \n"); //sunny change// addtional change place changed if (add_to_packet_cache(ap->u.data_p.original_src.agent_id, /* ap->u.data_p.original_src.node_id, */ ap->u.data_p.seq_no, ap->u.data_p.type, ap->u.data_p.orig_time, ap->u.data_p.data_len, ap->u.data_p.payload) == false) { pk_duplicate++;//sunny change4fd=fopen(filename, "w");fprintf(fd, "id:%d, num_all_recv:%d, pk_duplicate:%d", this->id, num_all_recv, pk_duplicate);fclose(fd); return 1; } else if ( now- dptr->orig_time > VERY_LATE + m_dist_from_src * ONE_HOP_DELAY ) return 1;}printf("sunny: after add_to_packet_cache in handle_data_packet\n"); if ( SHOW_ALL ) { if ( dptr->seq_no < CUT_OFF && dptr->orig_time > m_recent_up_time ) printf("pkt_deliv ag %d s %d at %8.4f lat %8.4f %d %d\n", id, dptr->seq_no, now, now-dptr->orig_time, dptr->type, m_parent); /* printf ("[coop %d %d ] At %8.4f recv data-pkt : < [ %d %d ] > seq %d : ffrom < [ %d %d ] > type: %d\n", id, n->id, Scheduler::Clock(), ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.node_id, ap->u.data_p.seq_no, ap->src_agent, ap->src, ap->u.data_p.type); */ } //PRM//RPM ---- handle_data_pkt///* pkt_recv_count++; int unit = BLOCK_SIZE + FEC_SIZE; int start = dptr->seq_no - dptr->seq_no % unit; if ( dptr->seq_no < CUT_OFF && dptr->orig_time > m_recent_up_time ) { if ( m_src_aid < 0 ) { m_src_aid = dptr->original_src.agent_id; //m_src_nid should not be need(->fillbitmap->lookup_packet_cache) //m_src_nid = dptr->original_src.node_id; } num_pkt_deliv++; m_pkt_deliv++; num_pkt_deliv_type[dptr->type]++; m_num_pkt_deliv_type[dptr->type]++; m_latency += now - dptr->orig_time; deliv_per_pkt[dptr->seq_no]++; latency_per_pkt[dptr->seq_no] += now - dptr->orig_time; } g_one_hop_latency_sum += now - ap->send_time; g_one_hop_cnt++; if ( isInternal() ) { g_nonleaf_latency_sum += now - ap->send_time; g_nonleaf_cnt++;*/ /* if ( dptr->seq_no % 1000 == 1 && ap->src_agent == 0 && dptr->type == FRESH_DATA ) fprintf(stderr, "one-hop latency from source = %f\n", now - ap->send_time ); *//* } if ( !isInternal() && dptr->type == FRESH_DATA ) { g_leaf_latency_sum += now - dptr->orig_time; g_leaf_cnt++; }*/ //PRM //LayerInfo *l = layers.arr[this_lid]; //PRM LayerInfo *l; if ( this_lid >= 0 ) l = layers.arr[this_lid]; else l = NULL;//PRM /* The 3 if conditions below enforce strict checking of packet source */ if (l != NULL) { if (l->me_in_layer == true) { // Somewhat looser checking of data packet forwarding // if (l->ag_list.Locate(ap->src_agent) != NULL) {printf(" forward data packet in handle_Data pkt\n"); forward_data_packet(ap->u.data_p.payload, ap->u.data_p.data_len, ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.agent_addr, ap->u.data_p.seq_no, ap->u.data_p.base_lid, ap->src_agent, false,dptr->type, dptr->orig_time, dptr->overlay_hop); // } } } else if (this_lid < 0) {printf("sunny: retransmitted pkt: this_lis is less than 0 in handle_data_pkt\n"); forward_data_packet(ap->u.data_p.payload, ap->u.data_p.data_len, ap->u.data_p.original_src.agent_id, ap->u.data_p.original_src.agent_addr, ap->u.data_p.seq_no, ap->u.data_p.base_lid, ap->src_agent, false,dptr->type, dptr->orig_time, dptr->overlay_hop); } //proactive retransmit request requestProactive(ap); //PRM return 0;}//modifiedvoid coopAgent::forward_data_packet (char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, bool is_source, int type, double ot, int overlay_hop) { int count_pkts = 0; for (int i = 0; i < MAX_LAYERS; i++) { LayerInfo * l = layers.arr[i]; if (l != NULL) { if (l->me_in_layer == true) { // Forward to this layer only if predecessor not in this layer if (l->ag_list.Locate(pred_aid) == NULL) { if (l->lid >= base_lid) count_pkts += send_data_pkt_to_layer(payload, data_len, l,src_aid,base_lid,src_agent_addr,src_seqno, type, ot, overlay_hop); } } } } if (state == JOIN) { if ( (curr_join_q_lid < MAX_LAYERS) && (curr_join_q_lid > 0) ) { LayerInfo * q_l = layers.arr[curr_join_q_lid]; assert (q_l != NULL); assert (q_l->me_in_layer == false); if (q_l->lid >= base_lid) count_pkts += send_data_pkt_to_layer(payload, data_len, q_l,src_aid,base_lid,src_agent_addr,src_seqno,type,ot,overlay_hop); } if (curr_join_q_lid < 0) { // Starting join query at BSE // For join to higher layers, we probably already have the higher // target layer from the previous leader which left the cluster or // did a root xfer to me. Can use that as a fall back option. // This should not be for layer 0. LayerInfo * target_layer = layers.arr[target_join_q_lid]; if (target_layer != NULL) { assert (target_layer->me_in_layer == false); if (target_layer->lid >= base_lid) count_pkts += send_data_pkt_to_layer(payload, data_len, target_layer,src_aid,base_lid,src_agent_addr,src_seqno,type,ot,overlay_hop); } } } //PRM//random send //call random send if rand() > r if ( type != PROACTIVE_DATA && rand() < RAND_MAX * PROB_RAND_SEND ) { /* int ra_id; struct sockaddr_in random_target_addr; int lid = RANDOM_SEND_LID; int repeat; int num_sent = 0; int arr[RAND_NUM_AT_A_TIME]; */ /*char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, bool is_source, int type, double ot, int overlay_hop */ count_pkts += randomSend(payload, data_len,src_aid, src_agent_addr, src_seqno, ot, -1, RANDOM_SEND_LID); /* repeat = idCache.getSize(); printf("*******repeat : cache size %d\n", repeat); if ( repeat > RAND_NUM_AT_A_TIME ) repeat = RAND_NUM_AT_A_TIME; */ } //#endif return;}//PRM - random Send//int coopAgent::randomSend( int src_aid, int src_nid, int src_seq, double ot, int aid, int nid, int type ) {int coopAgent::randomSend(char * payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seq , double ot, int aid, int type) { //koo printf("^^^^^^^^^^^^^^^^^^^^^^^\n"); printf("Random Send is called\n"); int ra_id; struct sockaddr_in random_target_addr; int lid = RANDOM_SEND_LID; int repeat; int num_sent = 0; int arr[RAND_NUM_AT_A_TIME]; //if ( type == ADOPT_LID ) { // lid = ADOPT_LID; // repeat = 1; //} //else { repeat = idCache.getSize(); printf("*******repeat : cache size %d\n", repeat); if ( repeat > RAND_NUM_AT_A_TIME ) repeat = RAND_NUM_AT_A_TIME; //} //do some bitmap stuff here int i, base; char bitmap[MAX_BITMAP]; for ( i = 0, base = m_max_seq_recv - BITMAP_SIZE; i < BITMAP_SIZE; i++ ) { if ( base+i < 0 ) continue; //lookup packet cache stores a list of packets that have been received if ( lookup_packet_cache( src_aid, /*src_nid,*/ base+i ) ) bitmap[i] = 1; else bitmap[i] = 0; } //repeat = beta for ( i = 0 ; i < repeat; i++ ) { int itemp = 20; int flag = 1; while (--itemp && flag) { if ( aid < 0 ) { ra_id = getRandomAgent(MISS_BASED_CORR); //rn_id = getRandomNode(ra_id); //getRandomNode(ra_id, &random_target_addr); printf("@@@@@@@@@@@@!!!!!!RA_ID get %d\n",ra_id); } else { ra_id = aid; } int j; for ( j = 0 ; j < num_sent; j++ ) { if ( ra_id == arr[j] ) break; } if ( j == num_sent ) flag = 0; } if ( flag == 0 ) arr[num_sent] = ra_id; else ra_id = -1; printf("@@@@@@@@@@@@!!!!!!RA_ID get %d, and ****** my id %d\n",ra_id, id); if ( ra_id >= 0 && ra_id != id ) { AppPacket *ap = new AppPacket(PACKET_DATA); ap->u.data_p.original_src.agent_id = src_aid; //ap->u.data_p.original_src.node_id = src_nid; memcpy(&ap->u.data_p.original_src.agent_addr, &src_agent_addr, sizeof(struct sockaddr_in)); //char* payload, int data_len, int src_aid, struct sockaddr_in src_agent_addr, int src_seqno, int base_lid, int pred_aid, //bool is_source, int type, double ot, int overlay_hop ap->u.data_p.seq_no = src_seq; ap->u.data_p.lid = lid; ap->u.data_p.type = RAND_SEND_DATA; ap->u.data_p.orig_time = ot; ap->u.data_p.data_len = data_len; printf("----------------------------\n"); printf("PPPPPPPPP rinting the contents of payload before : %s\n", payload); ap->u.data_p.payload = (char *) malloc (sizeof(char) * data_len); memcpy(ap->u.data_p.payload, payload, data_len); printf("SSSSSSSSS after memory copy %s\n",ap->u.data_p.payload); memcpy( ap->u.data_p.bitmap, bitmap, BITMAP_SIZE *sizeof (char) ); ap->u.data_p.type = type; getRandomNode(ra_id, &random_target_addr);//sunny change3 (6lines) ap->src_agent=this->id; memcpy(& ap->src_agent_addr, & this->udp_recv_agent_addr, sizeof(struct sockaddr_in)); ap->dst_agent=ra_id; memcpy(& ap->dst_agent_addr, & random_target_addr, sizeof(struct sockaddr_in));//sunny send_pkt_wrapper(ap, ra_id, random_target_addr); if ( ap->u.data_p.seq_no < CUT_OFF ) num_rand_send++; num_sent++;printf("ra_id : %d\n", ra_id); } }printf("randomsend\n"); return num_sent;}//PRM -random send//PRM-changedint coopAgent::send_data_pkt_to_layer (char* payload, int data_len, LayerInfo * l, int src_aid, int base_lid, struct sockaddr_in src_agent_addr, int src_seqno ,int type, double ot, int overlay_hop) { int count_pkts = 0; //PRM //do some bitmap stuff here int i, base; char bitmap[MAX_BITMAP]; //for ( i = 0, base = src_seqno - BITMAP_SIZE; i < BITMAP_SIZE; i++ ) { for ( i = 0, base = m_max_seq_recv - BITMAP_SIZE; type != PROACTIVE_DATA && i < BITMAP_SIZE; i++ ) { if ( base+i < 0 ) continue; //we are not keeping fixed size of data packet if ( base+i < m_max_seq_recv - BITMAP_SIZE ) bitmap[i] = 0; else if ( lookup_packet_cache( src_aid, /*src_nid,*/ base+i ) ) bitmap[i] = 1; else bitmap[i] = 0; }printf("sunny :: bitmap made in send_data_pkt_to_layer\n"); //PRM AppPacket *ap = new AppPacket(PACKET_DATA); ap->u.data_p.original_src.agent_id = src_aid; memcpy(& ap->u.data_p.original_src.agent_addr, & src_agent_addr, sizeof(struct sockaddr_in)); ap->u.data_p.seq_no = src_seqno; ap->u.data_p.lid = l->lid; ap->u.data_p.base_lid = base_lid; ap->u.data_p.data_len = data_len; //ap->u.data_p.payload = (char *)malloc(data_len); ap->u.data_p.payload = payload; //memcpy(ap->u.data_p.payload, payload, data_len);//PRM ap->u.data_p.type = type; ap->u.data_p.orig_time = ot; //fprintf(stderr, "\t\t %d\n", ap->u.data_p.hop_count ); memcpy( ap->u.data_p.bitmap, bitmap, BITMAP_SIZE * sizeof (char) ); ap->u.data_p.overlay_hop = overlay_hop +1; //fprintf(stderr, "\t\t %d\n", ap->u.data_p.hop_count );printf("sunny: appPacket in send_data_pkt_to_layer\n");//PRM for (void * pos = l->ag_list.GetHeadPosition(); pos != NULL; l->ag_list.GetNext(pos) ) { LayerAgentInfo * this_la = l->ag_list.GetAt(pos); if (self_check(this_la) == false) { if ( ( (bse.agent_id != this_la->ag.agent_id) /* || (bse.node_id != this_la->ag.node_id) */) && ( ( (this_la->ag.agent_id != src_aid) /*|| (this_la->ag.node_id != src_nid) */) ) ) {#ifdef LOG_DATA_PACKET_SEND printf ("[coop %d ] At %8.4f send data-pkt : < [ %d ] > seq %d : to < [ %d ] > lid %d dist_est %f len %d\n", id,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -