📄 talker.cc,v
字号:
head 1.3;access;symbols;locks; strict;comment @// @;1.3date 2002.08.30.16.50.49; author rbraud; state Exp;branches;next 1.2;1.2date 2002.07.26.19.41.42; author rbraud; state Exp;branches;next 1.1;1.1date 2002.07.25.20.44.28; author rbraud; state Exp;branches;next ;desc@@1.3log@changed addr and port to unsigned@text@#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <sys/time.h>#include <unistd.h>#include <netinet/in.h>#include <netdb.h>#include <sys/socket.h>#include <sys/wait.h>#include <arpa/inet.h>#include <pthread.h>#include <list>#include "talker.h"#define MAXBUF 10#define MINIMUM_WAIT_TIME 150000#define MAXIMUM_WAIT_TIME 700000#define REMOVE_TIMER 10int confidence(long *, int, float, float *, float *);static host_info_node_t* data_list = NULL;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;host_info_node_t* get_ping_list() { static host_info_node_t* head = 0x0; host_info_node_t* curr, *newnode, *last; static struct timeval last_created = {0, 0}; struct timeval now; gettimeofday(&now, 0x0); if(now.tv_sec - last_created.tv_sec >= REMOVE_TIMER) { last_created = now; delete_ping_list(head); head = 0x0; pthread_mutex_lock(&mutex); curr = data_list; while(curr != 0x0) { if(curr->data->refreshed) { // Add this node to our list newnode = new host_info_node_t; newnode->data = curr->data; newnode->next = 0x0; // See if this is our first node if(head == 0x0) head = newnode; else last->next = newnode; last = newnode; curr->data->refreshed = 0; } curr = curr->next; } pthread_mutex_unlock(&mutex); } return head;}void delete_ping_list(host_info_node_t* ping_list) { host_info_node_t* curr = ping_list, *next; while(curr != 0x0) { // Don't delete curr->data because we are using that in // the data_list next = curr->next; delete curr; curr = next; }}void* ping_thread_func(void* data) { // Constantly loop through our list pinging the addresses host_info_node_t* ping_list = 0x0; int ping_time = *((int*)data); while(1) { ping_list = get_ping_list(); send_pings(ping_list, ping_time); }}void send_pings(host_info_node_t* ping_list, int ping_time) { // This function should not return until at least <ping_time> // milliseconds have passed struct timeval now, last, tv; host_info_node_t* curr; float ping; long diff; cout<<"send pings called"<<endl; curr = ping_list; gettimeofday(&last, 0x0); while(curr != 0x0) { cout<<"pinging "<<curr->data->addr<<endl; ping = estimate_rtt(curr->data->host_addr); pthread_mutex_lock(&mutex); { curr->data->ping_time = ping; } pthread_mutex_unlock(&mutex); gettimeofday(&curr->data->time_of_ping, 0x0); curr = curr->next; } gettimeofday(&now, 0x0); diff = ping_time * 1000 - ((now.tv_sec - last.tv_sec) * 1000000L + (now.tv_usec - last.tv_usec)); if(diff > 0) { // We took less time pinging all the hosts than what was requested // so we need to wait for the remaining time tv.tv_sec = diff / 1000000; tv.tv_usec = diff % 1000000; select(0, NULL, NULL, NULL, &tv); }}int accept_connection(int accept_sock) { struct sockaddr_in client_addr; unsigned client_len; int new_sock; client_len = sizeof(client_addr); memset(&client_addr, 0, client_len); if((new_sock = accept(accept_sock, (struct sockaddr*)&client_addr, &client_len)) < 0) { printf("Talker: Error accepting connection\n"); return -1; } else return new_sock;}int handle_request(int sock, char* buf, host_info_node_t*& head) { unsigned addr; unsigned short port; host_info_node_t* curr = head, *newest = 0x0; float ping; memcpy(&addr, buf, sizeof(unsigned)); memcpy(&port, &buf[4], sizeof(unsigned short)); cout<<"got a request for addr "<<addr<<" on port "<<ntohs(port)<<endl; while(curr != 0x0) { if(curr->data->addr == addr && curr->data->port == port) { pthread_mutex_lock(&mutex); { curr->data->refreshed = 1; ping = curr->data->ping_time; } pthread_mutex_unlock(&mutex); return send_response(sock, ping); } else curr = curr->next; } cout<<"didn't find it in database"<<endl; newest = new host_info_node_t; newest->data = new host_info_t; newest->data->addr = addr; newest->data->port = port; newest->data->refreshed = 1; newest->data->ping_time = 40000; memset(&newest->data->host_addr, 0, sizeof(struct sockaddr_in)); newest->data->host_addr.sin_family = AF_INET; newest->data->host_addr.sin_addr.s_addr = addr; newest->data->host_addr.sin_port = port; // Haven't pinged it yet newest->data->time_of_ping.tv_sec = 0; newest->data->time_of_ping.tv_usec = 0; // Insert this node into the beginning of the list pthread_mutex_lock(&mutex); { newest->next = head; head = newest; ping = newest->data->ping_time; } pthread_mutex_unlock(&mutex); return send_response(sock, ping);}int send_response(int sock, float ping_time) { char buf[4]; long val = htonl((long)ping_time); cout<<"sending back response "<<ping_time<<endl; memcpy(buf, &val, sizeof(long)); return send(sock, buf, 4, 0);}double estimate_rtt(struct sockaddr_in their_addr) { // host is IP addr. int sockfd, i=0; // struct sockaddr_in my_addr; /* my address information */ // struct sockaddr_in their_addr,their_addr1; /* connector's address information */ // struct hostent *he; struct timeval start_time, end_time, tv; long total_time_in_musecs = 0; int cnt = 0, numbytes, selectRet; long confArr[100] = { 0 }; float avg, delta; char message[]="hello"; char buf[MAXBUF]; fd_set rset; // int MYPORT; avg = 0; FD_ZERO(&rset); gettimeofday(&start_time,NULL); if ((sockfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { perror("socket"); exit(1); } FD_SET(sockfd,&rset); //their_addr.sin_family = AF_INET; /* host byte order */ while ((i++<10) && (total_time_in_musecs < MAXIMUM_WAIT_TIME)) { sprintf(message,"%d",i); gettimeofday(&start_time,NULL); if ((numbytes=sendto(sockfd,message,strlen(message),0, (struct sockaddr *)&their_addr, sizeof(struct sockaddr))) == -1) { perror("sending problem"); printf("Continuing.. "); continue; } if(FD_ISSET(sockfd,&rset)) { tv.tv_sec = 0; tv.tv_usec = MINIMUM_WAIT_TIME; do { sprintf(buf,"%d",i); //printf("ENTER - select\n"); fflush(0); // do { if((selectRet = select(sockfd+1,&rset,NULL,NULL,&tv)) > 0 ) { numbytes = recvfrom(sockfd,buf,MAXBUF,0,NULL,NULL); if (numbytes<0) { perror("Problem in recvfrom"); continue; } //printf("DONE - recvfrom\n"); fflush(0); if(atoi(buf) == i) { gettimeofday(&end_time,NULL); confArr[cnt++] = 1000000L*(end_time.tv_sec-start_time.tv_sec)+ (end_time.tv_usec -start_time.tv_usec); } } gettimeofday(&end_time,NULL); total_time_in_musecs += 1000000L*(end_time.tv_sec-start_time.tv_sec)+ (end_time.tv_usec -start_time.tv_usec); fflush(0); } while (atoi(buf)<i && total_time_in_musecs < MAXIMUM_WAIT_TIME); FD_SET(sockfd,&rset); } if(cnt >= 3) { confidence(confArr, cnt, 0.95, &avg, &delta); if(delta <= 0.1 * avg) break; } }//end of while close(sockfd); // printf("CNT = %d AVG = %.0f DELTA = %.0f\n NUMPACKETS LOST = %d", cnt, avg, delta, i - cnt); if(avg == 0) return 200000; return avg;}double estimate_rtt_wrapper(struct sockaddr_in their_addr) { double dist = estimate_rtt(their_addr); if (dist < 1000.0) return 500.0; if (dist < 5000.0) return 3000.0; if (dist < 10000.0) return 7500.0; if (dist < 20000.0) return 15000.0; if (dist < 40000.0) return 30000.0; if (dist < 80000.0) return 60000.0; if (dist < 160000.0) return 120000.0; return 200000.0;}int main(int argc, char ** argv) { pthread_t ping_thread; struct sockaddr_in my_addr; list<int> sock_vec; list<int>::iterator i; fd_set sock_set; fd_set copy_set; int accept_sock, max_desc, new_sock, num_fds; int ping_time; const int MAX_OUTSTANDING = 5; if (argc != 3) { fprintf(stderr,"usage: %s <port> <time between pings (ms)>\n", argv[0]); exit(-1); } ping_time = atoi(argv[2]); if((accept_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { printf("Talker: Could not create accept socket\n"); exit(-1); } memset(&my_addr, 0, sizeof(my_addr)); my_addr.sin_family = AF_INET; my_addr.sin_addr.s_addr = htonl(INADDR_ANY); my_addr.sin_port = htons(atoi(argv[1])); if(bind(accept_sock, (struct sockaddr*)&my_addr, sizeof(my_addr)) < 0) { printf("Talker: Could not bind socket to port %d\n", atoi(argv[1])); exit(-1); } if(listen(accept_sock, MAX_OUTSTANDING) < 0) { printf("Talker: Could not listen on socket\n"); exit(-1); } FD_ZERO(&sock_set); FD_SET(accept_sock, &sock_set); max_desc = accept_sock; if(pthread_create(&ping_thread, NULL, ping_thread_func, &ping_time) != 0) { printf("Talker: Could not create ping thread\n"); exit(-1); } if(pthread_detach(ping_thread) != 0) { printf("Talker: Could not detach ping thread\n"); exit(-1); } while(1) { // Handle new connections and build our list of hosts to query memcpy(©_set, &sock_set, sizeof(fd_set)); if((num_fds = select(max_desc+1, ©_set, NULL, NULL, NULL)) < 0) { printf("Talker: Error with select\n"); continue; } else { if(FD_ISSET(accept_sock, ©_set)) { new_sock = accept_connection(accept_sock); if(new_sock >= 0) { FD_SET(new_sock, &sock_set); if(new_sock > max_desc) max_desc = new_sock; sock_vec.push_back(new_sock); } } for(i=sock_vec.begin(); i!=sock_vec.end(); i++) { int num_bytes; char buf[6] = {0}; if(FD_ISSET(*i, ©_set)) { if((num_bytes = recv(*i, buf, 6, 0)) < 0) { printf("Talker: Error receiving\n"); return -1; } if(!strcmp(buf, "bye")) { list<int>::iterator j; printf("got bye msg\n"); close(*i); FD_CLR(*i, &sock_set); j = i; i++; sock_vec.erase(j); } else handle_request(*i, buf, data_list); } } } } //printf(" %8.4f is the delay \n", estimate_rtt(their_addr)); return 0; }@1.2log@Now handles the bye message to close connections to clients who don't want toping us anymore.@text@d135 2a136 2 int addr; short port;d140 2a141 2 memcpy(&addr, buf, sizeof(int)); memcpy(&port, &buf[4], sizeof(short));@1.1log@Initial revision@text@d14 1a14 1#include <vector>d134 2a135 2int handle_request(int sock, host_info_node_t*& head) { int num_bytes, addr;a136 1 char buf[6];a139 4 if((num_bytes = recv(sock, buf, 6, 0)) < 0) { printf("Talker: Error receiving\n"); return -1; }d288 2a289 2 vector<int> sock_vec; vector<int>::iterator i;d346 3d350 15a364 1 handle_request(*i, data_list);@
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -