📄 spruce_snd.c
字号:
timeout.tv_sec = 20; timeout.tv_usec = 0; n = select(tcp_control_socket+1, &rset, &wset, NULL, &timeout); if(n < 0){ perror("bug select connect:"); exit(1); } if(n == 0){ /*treat as throughput 0*/ connect_failed=1; } if(n > 0){ int len=sizeof(e); if (getsockopt(tcp_control_socket, SOL_SOCKET, SO_ERROR, &e, &len) < 0) { connect_failed=1; } } } } if(connect_failed){ fprintf(stderr,"failed to connect control socket\n"); exit(1); } //send a NOOP to sink, wait for response. cmd = CONTROL_NOOP; write(tcp_control_socket,&cmd,sizeof(cmd)); FD_ZERO(&rset); FD_SET(tcp_control_socket,&rset); bzero(&timeout,sizeof(timeout)); timeout.tv_sec = 10; sret=select(tcp_control_socket+1,&rset,NULL,NULL,&timeout); if(sret > 0 && FD_ISSET(tcp_control_socket,&rset)){ rret=read(tcp_control_socket,&ack,sizeof(ack)); if(rret != sizeof(ack) || (ack & CONTROL_ACK) == 0){ fprintf(stderr,"bad control response... is receiver running?\n"); exit(1); } } else { fprintf(stderr,"no control response...\n"); exit(1); }}void control_exit(){ char cmd; char ack; fd_set rset; struct timeval timeout; int sret,rret; cmd = CONTROL_EXIT; write(tcp_control_socket,&cmd,sizeof(cmd)); bzero(&timeout,sizeof(timeout)); timeout.tv_sec = 20; FD_ZERO(&rset); FD_SET(tcp_control_socket,&rset); sret = select(tcp_control_socket+1,&rset,NULL,NULL,&timeout); if(sret > 0 && FD_ISSET(tcp_control_socket,&rset)){ rret=read(tcp_control_socket,&ack,sizeof(ack)); close(tcp_control_socket); }}long get_spruce_rate(){ long rate=0; //tell sink to dump all data to file now and reset counters { char cmd; cmd = CONTROL_SPRUCE_OUTPUT; write(tcp_control_socket, &cmd, sizeof(cmd)); } //read the ack from the sink { fd_set rset; struct timeval timeout; int sret, rret; char ack; long retrate; FD_ZERO(&rset); FD_SET(tcp_control_socket,&rset); bzero(&timeout,sizeof(timeout)); timeout.tv_sec = 10; sret=select(tcp_control_socket+1,&rset,NULL,NULL,&timeout); if(sret > 0 && FD_ISSET(tcp_control_socket,&rset)){ rret=read(tcp_control_socket,&ack,sizeof(ack)); if(rret != sizeof(ack) || (ack & CONTROL_ACK) == 0){ fprintf(stderr,"got a bad response. sdf.. %d\n", rret); exit(1); } FD_ZERO(&rset); FD_SET(tcp_control_socket,&rset); bzero(&timeout,sizeof(timeout)); timeout.tv_sec = 10; sret=select(tcp_control_socket+1,&rset,NULL,NULL,&timeout); rret=read(tcp_control_socket,&retrate,sizeof(retrate)); if(rret != sizeof(retrate)){ fprintf(stderr,"got a bad response324985\n"); exit(1); } rate = ntohl(retrate); } else { fprintf(stderr,"no response...\n"); exit(1); } } return rate;}void spruce_test(){ /*note these calculations will fail for anything faster than 2 Gbps*/ /*that's okay since we can't pause for less than one usec anyway*/ long long din = (packet_size * 8 * (long long) (1000000)) / path_capacity; double shortest_allowed = (packet_size+28)*2.*8.*1e6 / path_capacity / 0.05; //need to compute the average pause here int avg_pause = (shortest_allowed > inter_pair_gap) ? shortest_allowed : inter_pair_gap; int elapsed=0; int pause; struct timeval *times; int packetcnt = 0; int overflow; void *buf; int dropcnt=0; int current_gap; times = malloc(sizeof(struct timeval) * total_pairs); assert (times != NULL); buf = malloc(packet_size); assert (buf != NULL); srandom(time(NULL)); gettimeofday(&(times[0]), NULL); times[0].tv_sec+=1; /*don't start right now*/ for(packetcnt = 1 ;packetcnt < total_pairs ; packetcnt++){ pause = rint(gen_pause_exp(avg_pause)); elapsed += pause; times[packetcnt].tv_sec = times[packetcnt-1].tv_sec; times[packetcnt].tv_usec = times[packetcnt-1].tv_usec + pause; overflow = times[packetcnt].tv_usec / 1000000; times[packetcnt].tv_sec += overflow; times[packetcnt].tv_usec -= overflow * 1000000; } for(packetcnt = 0; packetcnt < total_pairs ; /*increment done if successful later*/){ struct timeval sleep_time; struct timeval now; int sret; int busy_wait = 0; long long diff; long gap; gettimeofday(&now,NULL); sleep_time.tv_sec = times[packetcnt].tv_sec - now.tv_sec; sleep_time.tv_usec = times[packetcnt].tv_usec - now.tv_usec; if(sleep_time.tv_usec < 0){ sleep_time.tv_usec += 1000000; sleep_time.tv_sec--; } diff = (times[packetcnt].tv_sec - now.tv_sec) * 1000000 + (times[packetcnt].tv_usec - now.tv_usec); if(diff < -500){ /*more than 500us late*/ /*too late, skip this*/ packetcnt++; dropcnt++; continue; } if(diff < 2 * (1000000 / HZ)){ /*we're now almost committed to sending this*/ busy_wait = 1; } assert(sleep_time.tv_usec >= 0); assert(sleep_time.tv_usec < 1000000); if(busy_wait){ do{ if(diff < (-1 * (1000000 / HZ)/2)){ /*oh well, got interrupted*/ dropcnt++; packetcnt++; continue; } diff = (times[packetcnt].tv_sec - now.tv_sec) * 1000000 + (times[packetcnt].tv_usec - now.tv_usec); gettimeofday(&now,NULL); gap = (now.tv_sec - times[packetcnt].tv_sec) * 1000000 + (now.tv_usec - times[packetcnt].tv_usec); *(((long*)buf)+4) = htonl(gap); } while(diff > 0); } else { /*yield for the minimum possible time*/ sleep_time.tv_sec=0; sleep_time.tv_usec=1; sret = select(0, NULL, NULL, NULL, &sleep_time); if (sret<0){ perror("select"); fprintf(stderr,"%ld %ld\n",sleep_time.tv_sec,sleep_time.tv_usec); exit(1); } continue; } /*now... what to send...what to send*/ { struct timeval first; struct timeval target; struct timeval current_time; register long diff; register long gap; int send_size = packet_size; *(((long*)buf)+0) = htonl(packetcnt); *(((long*)buf)+1) = htonl(1); *(((long*)buf)+3) = htonl(total_pairs); current_gap = din; *(((long*)buf)+2) = htonl(current_gap); gettimeofday(&first,NULL); if(send(spruce_socket, buf, send_size, 0) < 0){ if(errno == ENOBUFS){ packetcnt++; dropcnt++; continue; } perror("send pair1"); exit(1); } *(((long*)buf)+1) = htonl(2); /*spin for a little while, then send second packet*/ target.tv_sec = first.tv_sec; target.tv_usec = first.tv_usec + current_gap; if(target.tv_usec >= 1000000){ target.tv_sec++; target.tv_usec -= 1000000; } do{ gettimeofday(¤t_time,NULL); diff = (target.tv_sec - current_time.tv_sec) * 1000000 + (target.tv_usec - current_time.tv_usec); gap = (current_time.tv_sec - first.tv_sec) * 1000000 + (current_time.tv_usec - first.tv_usec); *(((long*)buf)+4) = htonl(gap); } while(diff > 0); if(send(spruce_socket, buf, send_size, 0) < 0){ if(errno == ENOBUFS){ packetcnt++; dropcnt++; continue; } perror("send pair2"); exit(1); } packetcnt++; } }}intmain(int argc, char *argv[]){ long abw=0; printf("sender starting up\n"); starttime = time(NULL); prepare_buffers(); process_args(argc, argv); control_connect(); prep_sockets(); spruce_test(); abw = get_spruce_rate(); fprintf(stderr, "availalble bandwidth estimate: %ld Kbps\n", abw); control_exit(); printf("sender finished\n"); exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -