⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
📖 第 1 页 / 共 5 页
字号:
/* * File: coop-agent.cc * Author: Suman Banerjee <suman@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>#include <errno.h>#include <sys/socket.h>#include <sys/types.h>#include <arpa/inet.h>#include <netdb.h>#include <unistd.h>//#define LOG_COOP_SEND//#define LOG_COOP_RECV//#define LOG_COOP_JUNK//#define LOG_COOP_RECV_2//#define LOG_COOP_JUNK_PING//#define LOG_COOP_JUNK_PING_SWITCH//#define LOG_COOP_START//#define LOG_COOP_STOP#define LOG_COOP_CLUSTER_CHANGE#define  LOG_COOP_CLUSTER_PERIODIC// change this back to #undef//#define LOG_CLUSTER_INFO_DETAILED//PRM -change#undef LOG__CLUSTER_INFO_DETAILED//#define LOG_DATA_PACKET_INIT//#define LOG_DATA_PACKET_RECV//#define LOG_DATA_PACKET_SEND//#define LOG_DATA_PACKET_SEND_COUNT//#define TESTING_JUNK// #define CLOCK 0.00#define UDP_RECV_BSE_PORT 5000#define UDP_RECV_AGENT_PORT 5001#define UDP_RECV_BSE_ADDR "128.8.129.18" //#include "nicenode.h"//#include <scheduler.h>#include "app-packet.h"#include "coop-agent.h"#include "setpartition.h"#include "common.h"#include "o_timeout.h"#include "talker_utils.h"#define MAX_RECV_BYTES 10000//PRM - Magic numbers #define		SLACK		1#define		FLAG_SEND_ORPHAN	0#define		TIME_START		500.0#define		TIME_ACTIVE		250#define		DOWN_DURATION		60.0#define		ONE_HOP_DELAY		0.050#define		VERY_LATE		8.0#define		RANDOM_SEED		1234//PRM//PRM-the following variables may not be needed//for stats///////////////////	STAT/////////////////int *recv_per_pkt;int *deliv_per_pkt;int *up_per_pkt;double *latency_per_pkt;int UpCount;int num_pkt_init;int num_pkt_deliv;int num_rand_send;int num_rand_recv;int num_all_recv;int num_all_recv_type[3];int num_pkt_deliv_type[3];int num_rand_response;int sum_hop_count;int sum_rand_data_hop_count;int sum_rand_hop_count;int cached_type;double cached_time;int duplicate[3][3];int num_retransmit_request;int num_retransmit_total;int return_from_sent_proactive;double g_one_hop_latency_sum;double g_leaf_latency_sum;double g_nonleaf_latency_sum;int g_one_hop_cnt, g_leaf_cnt, g_nonleaf_cnt;double g_rtt;int g_rtt_cnt;double max_delay;//PRM// KCR-CHANGE#define MAX_RANDOM_DIST_DOUBLE 200000.0extern int errno;bool global_use_packet_cache = true;//PRMint g_initialized = 0 ;struct sockaddr_in nodes[NUM_AGENT+1]; //sunny//coopAgent * agents[NUM_AGENT+1]; //sunnyint down_no; int down_internal, down_leaf;int g_start, g_stop; FILE* fd;char *filename;//PRMcoopAgent::coopAgent (void) : commonAgent () {  jqt.ca = this;  m_hlprt.ca = this;  m_hlpit.ca = this;  //PRM  m_agent_down = 0;  //PRM  dt.a = this;}coopAgent::~coopAgent (void) {  if (started == true) {    init_layer_arr();    jqt.CancelTimer();    cancel_higher_layer_ping_timers();  }}void coopAgent::init (int Id, int Index, int port_no, char * bsehostname,		      char* talker, unsigned short dport) {  struct sockaddr_in bseaddr;  char hostname[512];  struct hostent he;//sunny	filename=(char*)malloc(sizeof(char)*20);	sprintf(filename, "coop%d.dat", Id);	fd=(FILE*)malloc(sizeof(FILE));//sunny  Agent::init(Id,Index, talker, dport);  memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in));  udp_recv_agent_addr.sin_family = AF_INET;  //  udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY);  gethostname(hostname, 512);  memcpy(& he, gethostbyname(hostname), sizeof(he));  udp_recv_agent_addr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]);  //  udp_recv_agent_addr.sin_port = UDP_RECV_AGENT_PORT;  udp_recv_agent_addr.sin_port = htons(port_no);  if ((udp_sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)    printf("socket() failed\n");    if (bind(udp_sock, (struct sockaddr *) & udp_recv_agent_addr, sizeof(struct sockaddr)) < 0)     printf("bind() failed\n");   t = AGENT_APPLICATION_COOP;  /* PRM: moved to coopAgent::start  for (int i = 0; i < MAX_LAYERS; i++) {    layers.arr[i] = NULL;  }  */  m_data_pkt_seq = 0;// PRM -initialization of varaibles//sunny  	pkt_recv_count = 0;	m_sent_proactive = 0;	m_pkt_deliv = 0;	m_all_recv = 0;	m_max_seq_recv = -1;	m_dist_from_src = 1;	m_src_aid = m_src_nid = -1;	m_latency = 0.0;	m_total_up_time = 0.0;	m_recent_up_time = LONG_TIME;	//The following for-loops may be only for simulator /*	for ( int i = 0 ; i < 3; i++ ) {		m_num_all_recv_type[i] = 0;		m_num_pkt_deliv_type[i] = 0;		for ( int j = 0 ; j < 3; j++ )			m_duplicate[i][j] = 0;	}	m_was_dead = 0;*///sunny	//PRM	  /* PRM: moved to coopAgent::start  for (int i = 0; i < MAX_LAYERS; i++)    m_last_change[i] = -1.0;  *///PRM  - setting random seed         	if ( g_initialized == 0 ) {	  srand(RANDOM_SEED);	  g_initialized = 1;printf("random seed\n");	}//PRM    memset( & bseaddr, 0, sizeof(struct sockaddr_in));  bseaddr.sin_family = AF_INET;  memcpy(& he, gethostbyname(bsehostname), sizeof(he));  bseaddr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]);  //  bseaddr.sin_addr.s_addr = inet_addr(UDP_RECV_BSE_ADDR);  bseaddr.sin_port = htons(UDP_RECV_BSE_PORT);  set_bse_agent(0, bseaddr);//sunnyprintf("coop-init finished\n");//sunny  fflush(stdout);  return;}void coopAgent::start (void) {  char recv_buffer[MAX_RECV_BYTES];  struct sockaddr_in sender_addr;  unsigned int sender_addr_len = sizeof(struct sockaddr_in);  AppPacketSubType st;  int ret, id;#define MAX_BYTES	128  char recvfrom_error_str[MAX_BYTES];  sprintf(recvfrom_error_str,"recvfrom < %d >:", id);    started = true;  if (bse.agent_id == -1) {    printf ("[Err] BSE is not known to agent %d\n", id);    exit (-1);  }  //printf("bse.agent_id is %d\n",bse.agent_id);  state = INIT;  Agent::start();  //PRM//sunny/*  if ( id <= NUM_AGENT ) {    memcpy(& (nodes[id]), &udp_recv_agent_addr, sizeof(struct sockaddr_in));     agents[id] = this;    g_start++;  }*///sunny  //PRM//#ifdef LOG_COOP_START  printf ("[coop %d ] At %8.4f start\n", id, /* Scheduler::my_clock() */ my_clock());//#endif   m_ping_in_progress = false; // maybe this is the right place  init_layer_arr();  init_join_query(0);  init_packet_cache();  //PRM//sunny  	//the following 4 lines are from coopAgent::init	for (int i = 0; i < MAX_LAYERS; i++) {		layers.arr[i] = NULL;		m_last_change[i] = -1.0;	}printf("layer\n");	qrt.ca = this;printf("this\n");	qrt.SetTimer(QUERY_RANDOM_NODE_TIMEOUT);printf("timer\n");	idCache.clear();printf("cache\n");//sunny	m_mature = 0;       	/*PRM change*/	//m_recent_up_time = Scheduler::Clock();	m_recent_up_time = my_clock();	/*variables from sleeDisplay*/	//UpCount++;  m_use_packet_cache = global_use_packet_cache;  //  m_ping_in_progress = false;  //handle_timer_now = false;  disable_handle_timers();//sunnyprintf("before while\n");//sunny  while(started) {    int recvmsgsize, num_fds, found_fds = 0;    struct timeval tv;    fd_set copy_set;    SocketHandlerMap::iterator i;    //handling expired timers here....    handle_expired_timers();    enable_handle_timers();    //handle_timer_now = true;    // make a copy of our socket set so it doesn't get changed    // by select    memcpy(&copy_set, &socket_set, sizeof(socket_set));    errno = 0;    tv.tv_sec = 3;    tv.tv_usec = 0;    while((num_fds = select(max_socket_fd+1, &copy_set, NULL, NULL, &tv)) < 0) {      if(errno != EINTR)	perror("error with select");      errno = 0;      tv.tv_sec = 3;      tv.tv_usec = 0;    }    disable_handle_timers();    for(i = socket_map.begin(); i!=socket_map.end(); i++) {      if(FD_ISSET(i->first, &copy_set)) {	found_fds++;	errno = 0;	while((recvmsgsize = recvfrom(i->first, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *)&sender_addr, &sender_addr_len)) < 0) {	  if(errno != EINTR)	    perror(recvfrom_error_str);	  errno = 0;	}	//    printf("Handling sender %s, recvmsgsize %d\n",inet_ntoa(sender_addr.sin_addr), recvmsgsize);	assert(sender_addr_len == sizeof(struct sockaddr_in));		AppPacket *ap = (AppPacket *) this->reclaim_apppacket(recv_buffer, recvmsgsize);	ret = generic_rx_pkt_handler(ap);	st = ap->st;	id = ap->u.data_p.seq_no;	fflush(stdout);	free_app_packet(ap);	if(st == PACKET_DATA) {	  delete ap;	}      }      // Now call back on the provided function if this is a data       // packet, there is a function to call, and the packet was not      // already in the cache (ie it is not a duplicate)      if(i->second != NULL && st == PACKET_DATA && ret == 0) {	i->second(&recv_buffer[sizeof(const_data_pkt)+sizeof(int)*3		  +sizeof(struct sockaddr_in)], 		  recvmsgsize-(sizeof(const_data_pkt)+sizeof(int)*3		  +sizeof(struct sockaddr_in)));      }      if(found_fds == num_sockets) {	// we have gone through all the sockets that we are going to find	// so we might as well stop	break;      }    }  }   fflush(stdout);  return;}/* dst_join_lid < 0 => that attempt to join the same layer as before */void coopAgent::init_join_query (int dst_join_lid) {  if (dst_join_lid >= 0) {    if (state == JOIN) {      if (target_join_q_lid > dst_join_lid)	target_join_q_lid = dst_join_lid;      return;    }  }  else {    assert (state == JOIN);  }  assert (m_ping_in_progress == false);//#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f init-join-query lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), dst_join_lid);//#endif   state = JOIN;  curr_join_q_lid = -1;  if (dst_join_lid >= 0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -