⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 talker.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
字号:
head	1.3;access;symbols;locks; strict;comment	@// @;1.3date	2002.08.30.16.50.49;	author rbraud;	state Exp;branches;next	1.2;1.2date	2002.07.26.19.41.42;	author rbraud;	state Exp;branches;next	1.1;1.1date	2002.07.25.20.44.28;	author rbraud;	state Exp;branches;next	;desc@@1.3log@changed addr and port to unsigned@text@#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <sys/time.h>#include <unistd.h>#include <netinet/in.h>#include <netdb.h>#include <sys/socket.h>#include <sys/wait.h>#include <arpa/inet.h>#include <pthread.h>#include <list>#include "talker.h"#define MAXBUF 10#define MINIMUM_WAIT_TIME	150000#define MAXIMUM_WAIT_TIME	700000#define REMOVE_TIMER            10int confidence(long *, int, float, float *, float *);static host_info_node_t* data_list = NULL;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;host_info_node_t* get_ping_list() {  static host_info_node_t* head = 0x0;  host_info_node_t* curr, *newnode, *last;  static struct timeval last_created = {0, 0};  struct timeval now;    gettimeofday(&now, 0x0);  if(now.tv_sec - last_created.tv_sec >= REMOVE_TIMER) {    last_created = now;    delete_ping_list(head);    head = 0x0;    pthread_mutex_lock(&mutex);    curr = data_list;    while(curr != 0x0) {      if(curr->data->refreshed) {	// Add this node to our list	newnode = new host_info_node_t;	newnode->data = curr->data;	newnode->next = 0x0;	// See if this is our first node	if(head == 0x0)	  head = newnode;	else	  last->next = newnode;	last = newnode;	curr->data->refreshed = 0;      }      curr = curr->next;    }    pthread_mutex_unlock(&mutex);  }    return head;}void delete_ping_list(host_info_node_t* ping_list) {  host_info_node_t* curr = ping_list, *next;  while(curr != 0x0) {    // Don't delete curr->data because we are using that in     // the data_list    next = curr->next;    delete curr;    curr = next;  }}void* ping_thread_func(void* data) {  // Constantly loop through our list pinging the addresses  host_info_node_t* ping_list = 0x0;  int ping_time = *((int*)data);    while(1) {    ping_list = get_ping_list();    send_pings(ping_list, ping_time);  }}void send_pings(host_info_node_t* ping_list, int ping_time) {  // This function should not return until at least <ping_time>   // milliseconds have passed  struct timeval now, last, tv;  host_info_node_t* curr;  float ping;  long diff;  cout<<"send pings called"<<endl;  curr = ping_list;  gettimeofday(&last, 0x0);  while(curr != 0x0) {    cout<<"pinging "<<curr->data->addr<<endl;    ping = estimate_rtt(curr->data->host_addr);    pthread_mutex_lock(&mutex);    {      curr->data->ping_time = ping;    }    pthread_mutex_unlock(&mutex);    gettimeofday(&curr->data->time_of_ping, 0x0);    curr = curr->next;  }  gettimeofday(&now, 0x0);  diff = ping_time * 1000 - ((now.tv_sec - last.tv_sec) * 1000000L 			     + (now.tv_usec - last.tv_usec));  if(diff > 0) {    // We took less time pinging all the hosts than what was requested    // so we need to wait for the remaining time    tv.tv_sec = diff / 1000000;    tv.tv_usec = diff % 1000000;    select(0, NULL, NULL, NULL, &tv);  }}int accept_connection(int accept_sock) {  struct sockaddr_in client_addr;  unsigned client_len;  int new_sock;    client_len = sizeof(client_addr);  memset(&client_addr, 0, client_len);  if((new_sock = accept(accept_sock, (struct sockaddr*)&client_addr, &client_len)) < 0) {    printf("Talker: Error accepting connection\n");    return -1;  }  else    return new_sock;}int handle_request(int sock, char* buf, host_info_node_t*& head) {  unsigned addr;  unsigned short port;  host_info_node_t* curr = head, *newest = 0x0;  float ping;  memcpy(&addr, buf, sizeof(unsigned));  memcpy(&port, &buf[4], sizeof(unsigned short));  cout<<"got a request for addr "<<addr<<" on port "<<ntohs(port)<<endl;  while(curr != 0x0) {    if(curr->data->addr == addr && curr->data->port == port) {      pthread_mutex_lock(&mutex);       { 	curr->data->refreshed = 1;	ping = curr->data->ping_time;      }      pthread_mutex_unlock(&mutex);      return send_response(sock, ping);    }    else      curr = curr->next;  }  cout<<"didn't find it in database"<<endl;  newest = new host_info_node_t;  newest->data = new host_info_t;  newest->data->addr = addr;  newest->data->port = port;  newest->data->refreshed = 1;  newest->data->ping_time = 40000;  memset(&newest->data->host_addr, 0, sizeof(struct sockaddr_in));  newest->data->host_addr.sin_family = AF_INET;  newest->data->host_addr.sin_addr.s_addr = addr;  newest->data->host_addr.sin_port = port;  // Haven't pinged it yet  newest->data->time_of_ping.tv_sec = 0;  newest->data->time_of_ping.tv_usec = 0;  // Insert this node into the beginning of the list  pthread_mutex_lock(&mutex);  {    newest->next = head;    head = newest;    ping = newest->data->ping_time;  }  pthread_mutex_unlock(&mutex);  return send_response(sock, ping);}int send_response(int sock, float ping_time) {  char buf[4];  long val = htonl((long)ping_time);  cout<<"sending back response "<<ping_time<<endl;  memcpy(buf, &val, sizeof(long));  return send(sock, buf, 4, 0);}double estimate_rtt(struct sockaddr_in their_addr) { // host is IP addr.   int sockfd, i=0;  //  struct sockaddr_in my_addr;    /* my address information */  //  struct sockaddr_in their_addr,their_addr1; /* connector's address information */  //  struct hostent *he;  struct timeval start_time, end_time, tv;  long total_time_in_musecs = 0;  int cnt = 0, numbytes, selectRet;  long confArr[100] = { 0 };  float avg, delta;  char message[]="hello";  char buf[MAXBUF];  fd_set rset;  //  int MYPORT;  avg = 0;  FD_ZERO(&rset);  gettimeofday(&start_time,NULL);  if ((sockfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {    perror("socket");    exit(1);  }  FD_SET(sockfd,&rset);  //their_addr.sin_family = AF_INET;         /* host byte order */    while ((i++<10) && (total_time_in_musecs < MAXIMUM_WAIT_TIME))  {    sprintf(message,"%d",i);    gettimeofday(&start_time,NULL);    if ((numbytes=sendto(sockfd,message,strlen(message),0,			 (struct sockaddr *)&their_addr, 			 sizeof(struct sockaddr))) == -1) {      perror("sending problem");      printf("Continuing..  ");      continue;    }        if(FD_ISSET(sockfd,&rset)) {      tv.tv_sec = 0;      tv.tv_usec = MINIMUM_WAIT_TIME;      do {	sprintf(buf,"%d",i);	//printf("ENTER - select\n");	fflush(0);	//  do {	if((selectRet = select(sockfd+1,&rset,NULL,NULL,&tv)) > 0 ) {	  numbytes = recvfrom(sockfd,buf,MAXBUF,0,NULL,NULL);	  if (numbytes<0) {	    perror("Problem in recvfrom");	    continue;	  }	  //printf("DONE - recvfrom\n");	  fflush(0);	  if(atoi(buf) == i) {	    gettimeofday(&end_time,NULL);	    confArr[cnt++] = 1000000L*(end_time.tv_sec-start_time.tv_sec)+	      (end_time.tv_usec -start_time.tv_usec);	  }	} 	gettimeofday(&end_time,NULL);	total_time_in_musecs += 1000000L*(end_time.tv_sec-start_time.tv_sec)+	  (end_time.tv_usec -start_time.tv_usec);        fflush(0);      } while (atoi(buf)<i  && total_time_in_musecs < MAXIMUM_WAIT_TIME);      FD_SET(sockfd,&rset);    }    if(cnt >= 3) {      confidence(confArr, cnt, 0.95, &avg, &delta);      if(delta <= 0.1 * avg)	break;    }  }//end of while   close(sockfd);  //      printf("CNT = %d  AVG = %.0f  DELTA = %.0f\n NUMPACKETS LOST = %d", cnt, avg, delta, i - cnt);  if(avg == 0)    return 200000;  return avg;}double estimate_rtt_wrapper(struct sockaddr_in their_addr) {  double dist = estimate_rtt(their_addr);  if (dist < 1000.0) return 500.0;  if (dist < 5000.0) return 3000.0;  if (dist < 10000.0) return 7500.0;  if (dist < 20000.0) return 15000.0;  if (dist < 40000.0) return 30000.0;  if (dist < 80000.0) return 60000.0;  if (dist < 160000.0) return 120000.0;  return 200000.0;}int main(int argc, char ** argv) {  pthread_t ping_thread;  struct sockaddr_in my_addr;  list<int> sock_vec;  list<int>::iterator i;  fd_set sock_set;  fd_set copy_set;  int accept_sock, max_desc, new_sock, num_fds;  int ping_time;  const int MAX_OUTSTANDING = 5;  if (argc != 3) {    fprintf(stderr,"usage: %s <port> <time between pings (ms)>\n", argv[0]);    exit(-1);  }  ping_time = atoi(argv[2]);  if((accept_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {    printf("Talker: Could not create accept socket\n");    exit(-1);  }  memset(&my_addr, 0, sizeof(my_addr));  my_addr.sin_family = AF_INET;  my_addr.sin_addr.s_addr = htonl(INADDR_ANY);  my_addr.sin_port = htons(atoi(argv[1]));  if(bind(accept_sock, (struct sockaddr*)&my_addr, sizeof(my_addr)) < 0) {    printf("Talker: Could not bind socket to port %d\n", atoi(argv[1]));    exit(-1);  }  if(listen(accept_sock, MAX_OUTSTANDING) < 0) {    printf("Talker: Could not listen on socket\n");    exit(-1);  }  FD_ZERO(&sock_set);  FD_SET(accept_sock, &sock_set);  max_desc = accept_sock;  if(pthread_create(&ping_thread, NULL, ping_thread_func, &ping_time) != 0) {    printf("Talker: Could not create ping thread\n");    exit(-1);  }  if(pthread_detach(ping_thread) != 0) {    printf("Talker: Could not detach ping thread\n");    exit(-1);  }  while(1) {    // Handle new connections and build our list of hosts to query    memcpy(&copy_set, &sock_set, sizeof(fd_set));    if((num_fds = select(max_desc+1, &copy_set, NULL, NULL, NULL)) < 0) {      printf("Talker: Error with select\n");      continue;    }    else {      if(FD_ISSET(accept_sock, &copy_set)) {	new_sock = accept_connection(accept_sock);	if(new_sock >= 0) {	  FD_SET(new_sock, &sock_set);	  if(new_sock > max_desc)	    max_desc = new_sock;	  sock_vec.push_back(new_sock);	}      }      for(i=sock_vec.begin(); i!=sock_vec.end(); i++) {	int num_bytes;	char buf[6] = {0};	if(FD_ISSET(*i, &copy_set)) {	  if((num_bytes = recv(*i, buf, 6, 0)) < 0) {	    printf("Talker: Error receiving\n");	    return -1;	  }	  if(!strcmp(buf, "bye")) {	    list<int>::iterator j;	    printf("got bye msg\n");	    close(*i);	    FD_CLR(*i, &sock_set);	    j = i;	    i++;	    sock_vec.erase(j);	  }	  else	    handle_request(*i, buf, data_list);	}      }    }  }    //printf(" %8.4f is the delay \n", estimate_rtt(their_addr));  return 0; }@1.2log@Now handles the bye message to close connections to clients who don't want toping us anymore.@text@d135 2a136 2  int addr;  short port;d140 2a141 2  memcpy(&addr, buf, sizeof(int));  memcpy(&port, &buf[4], sizeof(short));@1.1log@Initial revision@text@d14 1a14 1#include <vector>d134 2a135 2int handle_request(int sock, host_info_node_t*& head) {  int num_bytes, addr;a136 1  char buf[6];a139 4  if((num_bytes = recv(sock, buf, 6, 0)) < 0) {    printf("Talker: Error receiving\n");    return -1;  }d288 2a289 2  vector<int> sock_vec;  vector<int>::iterator i;d346 3d350 15a364 1	  handle_request(*i, data_list);@

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -