📄 pathload_rcv_func.c
字号:
if ( FD_ISSET(sock_tcp,&readset) ) { /* check the control connection.*/ if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 ) { if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_STREAM ) ) { while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ; if ( ret_val == stream_cnt ) { break ; } } } } } // end of select else { perror("select"); exit(0); }#endif } for (j=0; j < stream_len; j++ ) { snd_tv[j].tv_sec = ntohl(snd_tv[j].tv_sec); snd_tv[j].tv_usec = ntohl(snd_tv[j].tv_usec); snd_tm[j]= snd_tv[j].tv_sec * 1000000.0 + snd_tv[j].tv_usec ; arrv_tm[j] = arrv_tv[j].tv_sec * 1000000.0 + arrv_tv[j].tv_usec ; owd[j] = arrv_tm[j] - snd_tm[j] ; } total_pkt_rcvd += pkt_rcvd ; finished_stream = 0 ; pkt_lost += stream_len - exp_pkt_id ; pkt_loss_rate = (double )pkt_lost * 100. / stream_len ; if(Verbose) printf(":%.1f",pkt_loss_rate ) ; fprintf(pathload_fp,":%.1f",pkt_loss_rate ) ; exp_pkt_id = 0 ; stream_cnt++ ; num_bursts=0; if ( interrupt_coalescence ) ic_flag=check_intr_coalescence(arrv_tv,pkt_rcvd,&num_bursts); if ( pkt_loss_rate < HIGH_LOSS_RATE && pkt_loss_rate >= MEDIUM_LOSS_RATE ) lossy_stream++ ; if ( pkt_loss_rate >= HIGH_LOSS_RATE || ( stream_cnt >= num_stream && lossy_stream*100./stream_cnt >= MAX_LOSSY_STREAM_FRACTION )) { if ( increase_stream_len ) { increase_stream_len=0; lower_bound=1; } if(Verbose) printf("\n Fleet aborted due to high lossrate"); fprintf(pathload_fp,"\n Fleet aborted due to high lossrate"); abort_fleet=1; break ; } else { /* analyze trend in stream */ num += get_sndr_time_interval(snd_tm,&snd_time_interval) ; adjust_offset_to_zero(owd, stream_len); num_substream = eliminate_sndr_side_CS(snd_tm,substream); num_sndr_cs[stream_cnt-1] = num_substream ; substream[num_substream++]=stream_len-1; low=0; num_rcvr_cs[stream_cnt-1]=0; tmp_b2b=0; for (j=0;j<num_substream;j++) { high=substream[j]; if ( ic_flag ) { if ( num_bursts < 2 ) { if ( ++repeat_1 == 3) { repeat_1=0; /* Abort fleet and try to find lower bound */ abort_fleet=1; lower_bound=1; increase_stream_len=0; break ; } } else if ( num_bursts <= 5 ) { if ( ++repeat_2 == 3) { repeat_2=0; /* Abort fleet and retry with longer stream length */ abort_fleet=1; increase_stream_len=1; break ; } } else { increase_stream_len=0; len=eliminate_b2b_pkt_ic(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b); /* for(p=0;p<len;p++) printf("%d %f\n",p,owdfortd[p]); */ pct_metric[trend_idx]= pairwise_comparision_test(owdfortd , 0 , len ); pdt_metric[trend_idx]= pairwise_diff_test(owdfortd , 0, len ); trend_idx+=1; } } else { len=eliminate_rcvr_side_CS(arrv_tm,owd,owdfortd,low,high,&num_rcvr_cs[stream_cnt-1],&tmp_b2b); if ( len > MIN_STREAM_LEN ) { get_trend(owdfortd,len); } } low=high+1; } if ( abort_fleet ) break; else { b2b_pkt_per_stream[stream_cnt-1] = tmp_b2b ; ctr_code = CONTINUE_STREAM | CTR_CODE; send_ctr_mesg(ctr_buff, ctr_code); } } pkt_rcvd = 0 ; /* A hack for slow links */ stream_duration = stream_len * time_interval ; if ( stream_duration >= 500000 ) { slow=1; break ; } } /*end of while (stream_cnt < num_stream ). */ if ( Verbose ) printf("\n"); fprintf(pathload_fp,"\n"); if ( abort_fleet ) { printf("\tAborting fleet. Stream_cnt %d\n",stream_cnt); ctr_code = ABORT_FLEET | CTR_CODE; send_ctr_mesg(ctr_buff , ctr_code ) ; return_val = -1 ; } else print_contextswitch_info(num_sndr_cs,num_rcvr_cs,b2b_pkt_per_stream,stream_cnt); exp_fleet_id++ ; free(pkt_buf); return return_val ;}void print_contextswitch_info(l_int32 num_sndr_cs[], l_int32 num_rcvr_cs[],l_int32 discard[],l_int32 stream_cnt){ l_int32 j; if (Verbose) printf(" # of CS @ sndr :: "); fprintf(pathload_fp," # of CS @ sndr :: "); for(j=0;j<stream_cnt-1;j++) { if (Verbose) printf(":%2d",num_sndr_cs[j]); fprintf(pathload_fp,":%2d",num_sndr_cs[j]); } if ( Verbose ) printf("\n"); fprintf(pathload_fp,"\n"); if (Verbose) printf(" # of CS @ rcvr :: "); fprintf(pathload_fp," # of CS @ rcvr :: "); for(j=0;j<stream_cnt-1;j++) { if (Verbose) printf(":%2d",num_rcvr_cs[j]); fprintf(pathload_fp,":%2d",num_rcvr_cs[j]); } if ( Verbose ) printf("\n"); fprintf(pathload_fp,"\n"); if (Verbose) printf(" # of DS @ rcvr :: "); fprintf(pathload_fp," # of DS @ rcvr :: "); for(j=0;j<stream_cnt-1;j++) { if (Verbose) printf(":%2d",discard[j]); fprintf(pathload_fp,":%2d",discard[j]); } if ( Verbose ) printf("\n"); fprintf(pathload_fp,"\n");}void sig_sigusr1(){ return;}void sig_alrm(){ terminate_gracefully(exp_start_time); exit(0);}void *ctrl_listen(void *arg){ struct timeval select_tv; fd_set readset; l_int32 ret_val ; char ctr_buff[8]; #ifdef THRLIB FD_ZERO(&readset); FD_SET(sock_tcp,&readset); select_tv.tv_sec=100;select_tv.tv_usec=0; if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) { /* check ... mesg received */ if ( FD_ISSET(sock_tcp,&readset) ) { /* check the control connection.*/ if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 ) { if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_STREAM ) ) { while((ret_val = recv_ctr_mesg(sock_tcp,ctr_buff ))== -1) ; if ( ret_val == ((thr_arg *)arg)->stream_cnt ) { ((thr_arg *)arg)->finished_stream =1 ; pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1); pthread_exit(NULL); } } else if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) ) { select_tv.tv_usec = 2000 ; select_tv.tv_sec = 0 ; select(1,NULL,NULL,NULL,&select_tv); ((thr_arg *)arg)->finished_stream =1 ; pthread_kill(((thr_arg *)arg)->ptid,SIGUSR1); pthread_exit(NULL); } } } }#endif return NULL;}void get_trend(double owdfortd[],l_int32 pkt_cnt ){ double median_owd[MAX_STREAM_LEN]; l_int32 median_owd_len=0; double ordered[MAX_STREAM_LEN]; l_int32 j,count,pkt_per_min; //pkt_per_min = 5 ; pkt_per_min = (int)floor(sqrt((double)pkt_cnt)); count = 0 ; for ( j = 0 ; j < pkt_cnt ; j=j+pkt_per_min ) { if ( j+pkt_per_min >= pkt_cnt ) count = pkt_cnt - j ; else count = pkt_per_min; order_dbl(owdfortd , ordered ,j,count ) ; if ( count % 2 == 0 ) median_owd[median_owd_len++] = ( ordered[(int)(count*.5) -1] + ordered[(int)(count*0.5)] )/2 ; else median_owd[median_owd_len++] = ordered[(int)(count*0.5)] ; } pct_metric[trend_idx]= pairwise_comparision_test(median_owd , 0 , median_owd_len ); pdt_metric[trend_idx]= pairwise_diff_test(median_owd , 0, median_owd_len ); trend_idx+=1;}/* Order an array of doubles using bubblesort */void order_dbl(double unord_arr[], double ord_arr[],l_int32 start, l_int32 num_elems){ l_int32 i,j,k; double temp; for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i]; for (i=1;i<num_elems;i++) { for (j=i-1;j>=0;j--) if (ord_arr[j+1] < ord_arr[j]) { temp=ord_arr[j]; ord_arr[j]=ord_arr[j+1]; ord_arr[j+1]=temp; } else break; }}/* Order an array of float using bubblesort */void order_float(float unord_arr[], float ord_arr[],l_int32 start, l_int32 num_elems){ l_int32 i,j,k; double temp; for (i=start,k=0;i<start+num_elems;i++,k++) ord_arr[k]=unord_arr[i]; for (i=1;i<num_elems;i++) { for (j=i-1;j>=0;j--) if (ord_arr[j+1] < ord_arr[j]) { temp=ord_arr[j]; ord_arr[j]=ord_arr[j+1]; ord_arr[j+1]=temp; } else break; }}/* Order an array of l_int32 using bubblesort */void order_int(l_int32 unord_arr[], l_int32 ord_arr[], l_int32 num_elems){ l_int32 i,j; l_int32 temp; for (i=0;i<num_elems;i++) ord_arr[i]=unord_arr[i]; for (i=1;i<num_elems;i++) { for (j=i-1;j>=0;j--) if (ord_arr[j+1] < ord_arr[j]) { temp=ord_arr[j]; ord_arr[j]=ord_arr[j+1]; ord_arr[j+1]=temp; } else break; }}/* Send a message through the control stream*/void send_ctr_mesg(char *ctr_buff, l_int32 ctr_code){ l_int32 ctr_code_n = htonl(ctr_code); memcpy((void*)ctr_buff, &ctr_code_n, sizeof(l_int32)); if (write(sock_tcp, ctr_buff, sizeof(l_int32)) != sizeof(l_int32)) { fprintf(stderr, "send control message failed:\n"); exit(-1); }}/* Receive message from the control stream*/l_int32 recv_ctr_mesg(l_int32 ctr_strm, char *ctr_buff){ l_int32 ctr_code; gettimeofday(&first_time,0); if (read(ctr_strm, ctr_buff, sizeof(l_int32)) != sizeof(l_int32)) return(-1); gettimeofday(&second_time,0); memcpy(&ctr_code, ctr_buff, sizeof(l_int32)); return(ntohl(ctr_code));}/* Compute the time difference in microseconds between two timeval measurements*/double time_to_us_delta(struct timeval tv1, struct timeval tv2){ double time_us; time_us = (double) ((tv2.tv_sec-tv1.tv_sec)*1000000+(tv2.tv_usec-tv1.tv_usec)); return time_us;}/* Compute the average of the set of measurements <data>.*/double get_avg(double data[], l_int32 num_values){ l_int32 i; double sum_; sum_ = 0; for (i=0; i<num_values; i++) sum_ += data[i]; return (sum_ / (double)num_values);}/* PCT test to detect increasing trend in stream*/double pairwise_comparision_test (double array[] ,l_int32 start , l_int32 end){ l_int32 improvement = 0 ,i ; double total ; if ( ( end - start ) >= MIN_PARTITIONED_STREAM_LEN ) { for ( i = start ; i < end - 1 ; i++ ) { if ( array[i] < array[i+1] ) improvement += 1 ; } total = ( end - start ) ; return ( (double)improvement/total ) ; } else return -1 ;}/* PDT test to detect increasing trend in stream*/double pairwise_diff_test(double array[] ,l_int32 start , l_int32 end){ double y = 0 , y_abs = 0 ; l_int32 i ; if ( ( end - start ) >= MIN_PARTITIONED_STREAM_LEN ) { for ( i = start+1 ; i < end ; i++ ) { y += array[i] - array[i-1] ; y_abs += fabs(array[i] - array[i-1]) ; } return y/y_abs ; } else return 2. ;}double grey_bw_resolution() { if ( adr ) return (.05*adr<12?.05*adr:12) ; else return min_rate ;}/*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -