📄 pathload_rcv_func.c
字号:
/* This file is part of pathload. pathload is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. pathload is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with pathload; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA*//*------------------------------------------------- pathload : an end-to-end available bandwidth estimation tool Author : Manish Jain ( jain@cc.gatech.edu ) Constantinos Dovrolis (dovrolis@cc.gatech.edu ) Release : Ver 1.3.2 Support : This work was supported by the SciDAC program of the US department --------------------------------------------------*//* * $Header: /net/cvs/bwtest/pathload/pathload_rcv_func.c,v 1.294 2006/05/19 20:21:15 jain Exp $ */#include "pathload_gbls.h"#include "pathload_rcv.h"l_int32 recvfrom_latency(struct sockaddr_in rcv_udp_addr){ char *random_data; float min_OSdelta[50], ord_min_OSdelta[50]; l_int32 j ; struct timeval current_time, first_time ; if ( (random_data = malloc(max_pkt_sz*sizeof(char)) ) == NULL ) { printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz); exit(-1); } srandom(getpid()); /* Create random payload; does it matter? */ for (j=0; j<max_pkt_sz-1; j++) random_data[j]=(char)(random()&0x000000ff); for (j=0; j<50; j++) { if ( sendto(sock_udp, random_data, max_pkt_sz, 0, (struct sockaddr*)&rcv_udp_addr,sizeof(rcv_udp_addr)) == -1) perror("recvfrom_latency"); gettimeofday(&first_time, NULL); recvfrom(sock_udp, random_data, max_pkt_sz, 0, NULL, NULL); gettimeofday(¤t_time, NULL); min_OSdelta[j]= time_to_us_delta(first_time, current_time); } /* Use median of measured latencies to avoid outliers */ order_float(min_OSdelta, ord_min_OSdelta,0,50); free(random_data); return((l_int32)ord_min_OSdelta[25]); }double get_adr() { struct timeval select_tv,arrv_tv[MAX_STREAM_LEN] ; double delta ; double bw_msr = 0; double bad_bw_msr[10] ; int num_bad_train=0 ; int first = 1 ; double sum =0 ; l_int32 exp_train_id ; l_int32 bad_train = 1; l_int32 retry = 0 ; l_int32 ctr_code ; l_int32 ctr_msg_rcvd ; l_int32 train_len=0; l_int32 last=0,i; l_int32 spacecnt=24 ; char ctr_buff[8]; l_int32 num_burst; if (Verbose) printf(" ADR ["); fflush(stdout); fprintf(pathload_fp," ADR ["); fflush(pathload_fp); ctr_code = SEND_TRAIN | CTR_CODE ; send_ctr_mesg(ctr_buff, ctr_code); exp_train_id = 0 ; for(i=0;i<100;i++) { arrv_tv[i].tv_sec=0; arrv_tv[i].tv_usec=0; } while ( retry < MAX_TRAIN && bad_train ) { if ( train_len == 5) train_len = 3; else train_len = TRAIN_LEN - exp_train_id*15; if (Verbose) printf("."); fflush(stdout); fprintf(pathload_fp,"."); spacecnt--; ctr_msg_rcvd = 0 ; bad_train = recv_train(exp_train_id, arrv_tv, train_len); /* Compute dispersion and bandwidth measurement */ if (!bad_train) { num_burst=0; interrupt_coalescence=check_intr_coalescence(arrv_tv,train_len,&num_burst); last=train_len; while(!arrv_tv[last].tv_sec) --last; delta = time_to_us_delta(arrv_tv[1], arrv_tv[last]); bw_msr = ((28+max_pkt_sz) << 3) * (last-1) / delta; /* tell sender that it was agood train.*/ ctr_code = GOOD_TRAIN | CTR_CODE ; send_ctr_mesg(ctr_buff, ctr_code ) ; } else { retry++ ; /* wait for atleast 10msec before requesting another train */ last=train_len; while(!arrv_tv[last].tv_sec) --last; first=1 ; while(!arrv_tv[first].tv_sec) ++first ; delta = time_to_us_delta(arrv_tv[first], arrv_tv[last]); bad_bw_msr[num_bad_train++] = ((28+max_pkt_sz) << 3) * (last-first-1) / delta; select_tv.tv_sec=0;select_tv.tv_usec=10000; select(0,NULL,NULL,NULL,&select_tv); ctr_code = BAD_TRAIN | CTR_CODE ; send_ctr_mesg(ctr_buff, ctr_code ) ; exp_train_id++ ; } } if (Verbose) { i = spacecnt; putchar(']'); while(--i>0)putchar(' '); printf(":: "); } fputc(']',pathload_fp); while(--spacecnt>0)fputc(' ',pathload_fp); fprintf(pathload_fp,":: "); if ( !bad_train) { if(Verbose) printf("%.2fMbps\n", bw_msr ) ; fprintf(pathload_fp,"%.2fMbps\n", bw_msr ) ; } else { for ( i=0;i<num_bad_train;i++) if ( finite(bad_bw_msr[i])) sum += bad_bw_msr[i] ; bw_msr = sum/num_bad_train ; if(Verbose) printf("%.2fMbps (I)\n", bw_msr ) ; fprintf(pathload_fp,"%.2fMbps (I)\n", bw_msr ) ; } return bw_msr ;}/* Receive a complete packet train from the sender */l_int32 recv_train( l_int32 exp_train_id, struct timeval *time,l_int32 train_len){ struct sigaction sigstruct ; struct timeval current_time; struct timeval select_tv; fd_set readset; l_int32 ret_val ; l_int32 pack_id , exp_pack_id ; l_int32 bad_train = 0 ; l_int32 train_id ; l_int32 rcvd=0; char *pack_buf ; char ctr_buff[8];#ifdef THRLIB thr_arg arg ; pthread_t tid; pthread_attr_t attr ;#endif exp_pack_id=0; if ( ( pack_buf = malloc(max_pkt_sz*sizeof(char))) == NULL ) { printf("ERROR : unable to malloc %ld bytes \n",max_pkt_sz); exit(-1); } sigstruct.sa_handler = sig_sigusr1 ; sigemptyset(&sigstruct.sa_mask); sigstruct.sa_flags = 0 ; #ifdef SA_INTERRUPT sigstruct.sa_flags |= SA_INTERRUPT ; #endif sigaction(SIGUSR1 , &sigstruct,NULL );#ifdef THRLIB arg.finished_stream=0; arg.ptid = pthread_self() ; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 ) { perror("recv_train::pthread_create"); fprintf(stdout,"Failed to create thread. exiting...\n"); fprintf(pathload_fp,"Failed to create thread. exiting...\n"); exit(-1); }#endif do {#ifndef THRLIB FD_ZERO(&readset); FD_SET(sock_tcp,&readset); FD_SET(sock_udp,&readset); select_tv.tv_sec=1000;select_tv.tv_usec=0; if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) { if (FD_ISSET(sock_udp,&readset) ) {#endif if (recvfrom(sock_udp, pack_buf, max_pkt_sz, 0, NULL, NULL) != -1) { gettimeofday(¤t_time, NULL); memcpy(&train_id, pack_buf, sizeof(l_int32)); train_id = ntohl(train_id) ; memcpy(&pack_id, pack_buf+sizeof(l_int32), sizeof(l_int32)); pack_id=ntohl(pack_id); if (train_id == exp_train_id && pack_id==exp_pack_id ) { rcvd++; time[pack_id] = current_time ; exp_pack_id++; } else bad_train=1; }#ifndef THRLIB } // end of FD_ISSET if ( FD_ISSET(sock_tcp,&readset) ) { /* check the control connection.*/ if (( ret_val = recv_ctr_mesg(sock_tcp,ctr_buff)) != -1 ) { if ( (((ret_val & CTR_CODE) >> 31) == 1) && ((ret_val & 0x7fffffff) == FINISHED_TRAIN ) ) { break; } } } } // end of select } while (1);#else } while (!arg.finished_stream);#endif if ( rcvd != train_len+1 ) bad_train=1; gettimeofday(&time[pack_id+1], NULL); sigstruct.sa_handler = SIG_DFL ; sigemptyset(&sigstruct.sa_mask); sigstruct.sa_flags = 0 ; sigaction(SIGUSR1 , &sigstruct,NULL ); free(pack_buf); return bad_train ; }l_int32 check_intr_coalescence(struct timeval time[],l_int32 len, l_int32 *burst){ double delta[MAX_STREAM_LEN]; l_int32 b2b=0,tmp=0; l_int32 i; l_int32 min_gap; min_gap = MIN_TIME_INTERVAL > 3*rcv_latency ? MIN_TIME_INTERVAL : 3*rcv_latency ; //printf("---%d\n",len); for (i=2;i<len;i++) { delta[i] = time_to_us_delta(time[i-1],time[i]); if ( delta[i] <= min_gap ) { b2b++ ; tmp++; } else { if ( tmp >=3 ) { (*burst)++; tmp=0; } } } //fprintf(stderr,"\tNumber of b2b %d, Number of burst %d\n",b2b,*burst); if ( b2b > .6*len ) { return 1; } else return 0;}/* Receive N streams . After each stream, compute the loss rate. Mark a stream "lossy" , if losss rate in that stream is more than a threshold.*/l_int32 recv_fleet(){ struct sigaction sigstruct ; struct timeval snd_tv[MAX_STREAM_LEN], arrv_tv[MAX_STREAM_LEN]; struct timeval current_time, first_time; double pkt_loss_rate ; double owd[MAX_STREAM_LEN] ; double snd_tm[MAX_STREAM_LEN] ; double arrv_tm[MAX_STREAM_LEN]; l_int32 ctr_code ; l_int32 pkt_lost = 0 ; l_int32 stream_id_n , stream_id=0 ; l_int32 total_pkt_rcvd=0 ,pkt_rcvd = 0 ; l_int32 pkt_id = 0 ; l_int32 pkt_id_n = 0 ; l_int32 exp_pkt_id = 0 ; l_int32 stream_cnt = 0 ; /* 0->n*/ l_int32 fleet_id , fleet_id_n = 0 ; l_int32 lossy_stream = 0 ; l_int32 return_val = 0 ; l_int32 finished_stream = 0 ; l_int32 stream_duration ; l_int32 num_sndr_cs[20],num_rcvr_cs[20]; char ctr_buff[8]; char *pkt_buf ; double owdfortd[MAX_STREAM_LEN]; l_int32 num_substream,substream[MAX_STREAM_LEN]; l_int32 low,high,len,j; l_int32 b2b_pkt_per_stream[20]; l_int32 tmp_b2b;#ifdef THRLIB pthread_t tid ; thr_arg arg ; pthread_attr_t attr ;#endif l_int32 num_bursts; l_int32 abort_fleet=0; l_int32 p=0; struct timeval select_tv; fd_set readset; l_int32 ret_val ; if ( (pkt_buf = malloc(cur_pkt_sz*sizeof(char)) ) == NULL ) { printf("ERROR : unable to malloc %ld bytes \n",cur_pkt_sz); exit(-1); } trend_idx=0; ic_flag = 0; if(verbose&&!Verbose) printf("Receiving Fleet %ld, Rate %.2fMbps\n",exp_fleet_id,tr); if(Verbose) { printf("\nReceiving Fleet %ld\n",exp_fleet_id); printf(" Fleet Parameter(req) :: R=%.2fMbps, L=%ldB, K=%ldpackets, \T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval) ; } fprintf(pathload_fp,"\nReceiving Fleet %ld\n",exp_fleet_id); fprintf(pathload_fp," Fleet Parameter(req) :: R=%.2fMbps, L=%ldB, \K=%ldpackets, T=%ldusec\n",tr, cur_pkt_sz , stream_len,time_interval); if(Verbose) printf(" Lossrate per stream :: "); fprintf(pathload_fp," Lossrate per stream :: "); sigstruct.sa_handler = sig_sigusr1 ; sigemptyset(&sigstruct.sa_mask); sigstruct.sa_flags = 0 ; #ifdef SA_INTERRUPT sigstruct.sa_flags |= SA_INTERRUPT ; #endif sigaction(SIGUSR1 , &sigstruct,NULL ); while ( stream_cnt < num_stream ) {#ifdef THRLIB arg.finished_stream=0; arg.ptid = pthread_self() ; arg.stream_cnt = stream_cnt ; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); if (pthread_create(&tid,&attr,ctrl_listen, &arg ) != 0 ) { perror("recv_fleet::pthread_create"); exit(-1); }#endif pkt_lost = 0 ; first_time.tv_sec = 0 ; for (j=0; j < stream_len; j++ ) { snd_tv[j].tv_sec=0 ; snd_tv[j].tv_usec=0 ; arrv_tv[j].tv_sec=0; arrv_tv[j].tv_usec=0; } /* Receive K packets of ith stream */#ifdef THRLIB while(!arg.finished_stream) #else while(1)#endif {#ifndef THRLIB FD_ZERO(&readset); FD_SET(sock_tcp,&readset); FD_SET(sock_udp,&readset); select_tv.tv_sec=1000;select_tv.tv_usec=0; if (select(sock_tcp+1,&readset,NULL,NULL,&select_tv) > 0 ) { if (FD_ISSET(sock_udp,&readset) ) {#endif if( recvfrom(sock_udp,pkt_buf,cur_pkt_sz,0,NULL,NULL) > 0 ) { gettimeofday(¤t_time,NULL); memcpy(&fleet_id_n,pkt_buf , sizeof(l_int32)); fleet_id = ntohl(fleet_id_n) ; memcpy(&stream_id_n,pkt_buf+sizeof(l_int32) , sizeof(l_int32)); stream_id = ntohl(stream_id_n) ; memcpy(&pkt_id_n, pkt_buf+2*sizeof(l_int32), sizeof(l_int32)); pkt_id = ntohl(pkt_id_n) ; if ( fleet_id == exp_fleet_id && stream_id == stream_cnt && pkt_id >= exp_pkt_id ) { if ( first_time.tv_sec == 0 ) first_time = current_time; arrv_tv[pkt_id] = current_time ; memcpy(&(snd_tv[pkt_id].tv_sec) , pkt_buf+3*sizeof(l_int32), sizeof(l_int32)); memcpy(&(snd_tv[pkt_id].tv_usec), pkt_buf+4*sizeof(l_int32), sizeof(l_int32)); if ( pkt_id > exp_pkt_id ) /* reordered are considered as lost */ { pkt_lost += ( pkt_id - exp_pkt_id ) ; exp_pkt_id = pkt_id ; } ++exp_pkt_id ; ++pkt_rcvd; } } // end of recvfrom#ifndef THRLIB } // end of FD_ISSET
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -