⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coop-agent.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
📖 第 1 页 / 共 5 页
字号:
head	1.1;access;symbols;locks	rbraud:1.1; strict;comment	@// @;1.1date	2002.07.02.19.29.00;	author rbraud;	state Exp;branches;next	;desc@@1.1log@Initial revision@text@/* * File: coop-agent.cc * Author: Suman Banerjee <suman@@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>#include <sys/socket.h>#include <sys/types.h>#include <arpa/inet.h>#include <netdb.h>#include <unistd.h>//#define LOG_COOP_SEND//#define LOG_COOP_RECV//#define LOG_COOP_JUNK//#define LOG_COOP_RECV_2//#define LOG_COOP_JUNK_PING//#define LOG_COOP_JUNK_PING_SWITCH//#define LOG_COOP_START//#define LOG_COOP_STOP//#define LOG_COOP_CLUSTER_CHANGE//#define  LOG_COOP_CLUSTER_PERIODIC//#undef LOG_CLUSTER_INFO_DETAILED//#define LOG_DATA_PACKET_INIT//#define LOG_DATA_PACKET_RECV//#define LOG_DATA_PACKET_SEND//#define LOG_DATA_PACKET_SEND_COUNT//#define TESTING_JUNK// #define CLOCK 0.00#define UDP_RECV_BSE_PORT 5000#define UDP_RECV_AGENT_PORT 5001#define UDP_RECV_BSE_ADDR "128.8.129.18" //#include "nicenode.h"//#include <scheduler.h>#include "app-packet.h"#include "coop-agent.h"#include "setpartition.h"#include "common.h"#include "o_timeout.h"#include "talker.h"#define MAX_RECV_BYTES 10000// KCR-CHANGE#define MAX_RANDOM_DIST_DOUBLE 200000.0bool global_use_packet_cache = true;coopAgent::coopAgent (void) : commonAgent () {  jqt.ca = this;  m_hlprt.ca = this;  m_hlpit.ca = this;  dt.a = this;}coopAgent::~coopAgent (void) {  if (started == true) {    init_layer_arr();    jqt.CancelTimer();    cancel_higher_layer_ping_timers();  }}void coopAgent::init (int Id, int Index, int port_no, char * bsehostname) {  struct sockaddr_in bseaddr;  char hostname[512];  struct hostent he;  Agent::init(Id,Index);  memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in));  udp_recv_agent_addr.sin_family = AF_INET;  //  udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY);  gethostname(hostname, 512);  memcpy(& he, gethostbyname(hostname), sizeof(he));  udp_recv_agent_addr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]);  //  udp_recv_agent_addr.sin_port = UDP_RECV_AGENT_PORT;  udp_recv_agent_addr.sin_port = htons(port_no);  if ((udp_sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)    printf("socket() failed\n");    if (bind(udp_sock, (struct sockaddr *) & udp_recv_agent_addr, sizeof(struct sockaddr)) < 0)     printf("bind() failed\n");   t = AGENT_APPLICATION_COOP;  for (int i = 0; i < MAX_LAYERS; i++) {    layers.arr[i] = NULL;  }  m_data_pkt_seq = 0;  for (int i = 0; i < MAX_LAYERS; i++)    m_last_change[i] = -1.0;  memset( & bseaddr, 0, sizeof(struct sockaddr_in));  bseaddr.sin_family = AF_INET;  memcpy(& he, gethostbyname(bsehostname), sizeof(he));  bseaddr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]);  //  bseaddr.sin_addr.s_addr = inet_addr(UDP_RECV_BSE_ADDR);  bseaddr.sin_port = htons(UDP_RECV_BSE_PORT);  set_bse_agent(0, bseaddr);  fflush(stdout);  return;}void coopAgent::start (void) {  char recv_buffer[MAX_RECV_BYTES];  struct sockaddr_in sender_addr;  unsigned int sender_addr_len = sizeof(struct sockaddr_in);  AppPacketSubType st;  int ret, id;#define MAX_BYTES	128  char recvfrom_error_str[MAX_BYTES];  sprintf(recvfrom_error_str,"recvfrom < %d >:", id);  started = true;  if (bse.agent_id == -1) {    printf ("[Err] BSE is not known to agent %d\n", id);    exit (-1);  }  //  printf("bse.agent_id is %d\n",bse.agent_id);  state = INIT;  Agent::start();#ifdef LOG_COOP_START  printf ("[coop %d ] At %8.4f start\n", id, /* Scheduler::my_clock() */ my_clock());#endif     m_ping_in_progress = false; // maybe this is the right place  init_layer_arr();  init_join_query(0);  init_packet_cache();  m_use_packet_cache = global_use_packet_cache;  //  m_ping_in_progress = false;  //handle_timer_now = false;  disable_handle_timers();  for (;;) {    int recvmsgsize, num_fds, found_fds = 0;    fd_set copy_set;    SocketHandlerMap::iterator i;    //handling expired timers here....    handle_expired_timers();    enable_handle_timers();    //handle_timer_now = true;    /*while ((recvmsgsize = recvfrom(udp_sock, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *) & sender_addr, & sender_addr_len)) < 0) {      perror(recvfrom_error_str);      }*/    // make a copy of our socket set so it doesn't get changed    // by select    memcpy(&copy_set, &socket_set, sizeof(socket_set));    while((num_fds = select(max_socket_fd+1, &copy_set, NULL, NULL, NULL)) < 0) {      //perror("error with select");    }    disable_handle_timers();    for(i = socket_map.begin(); i!=socket_map.end(); i++) {      if(FD_ISSET(i->first, &copy_set)) {	found_fds++;	while((recvmsgsize = recvfrom(i->first, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *)&sender_addr, &sender_addr_len)) < 0) {	  perror(recvfrom_error_str);	}	//    printf("Handling sender %s, recvmsgsize %d\n",inet_ntoa(sender_addr.sin_addr), recvmsgsize);	assert(sender_addr_len == sizeof(struct sockaddr_in));		AppPacket *ap = (AppPacket *) this->reclaim_apppacket(recv_buffer, recvmsgsize);	ret = generic_rx_pkt_handler(ap);	st = ap->st;	id = ap->u.data_p.seq_no;	fflush(stdout);	free_app_packet(ap);      }      // Now call back on the provided function if this is a data       // packet, there is a function to call, and the packet was not      // already in the cache (ie it is not a duplicate)      if(i->second != NULL && st == PACKET_DATA && ret == 0) {	i->second(&recv_buffer[sizeof(const_data_pkt)+sizeof(int)*3		  +sizeof(struct sockaddr_in)], 		  recvmsgsize-(sizeof(const_data_pkt)+sizeof(int)*3		  +sizeof(struct sockaddr_in)));	printf("received seqno %d\n", id);      }      if(found_fds == num_sockets) {	// we have gone through all the sockets that we are going to find	// so we might as well stop	break;      }    }  }   fflush(stdout);  return;}/* dst_join_lid < 0 => that attempt to join the same layer as before */void coopAgent::init_join_query (int dst_join_lid) {  if (dst_join_lid >= 0) {    if (state == JOIN) {      if (target_join_q_lid > dst_join_lid)	target_join_q_lid = dst_join_lid;      return;    }  }  else {    assert (state == JOIN);  }  assert (m_ping_in_progress == false);#ifdef LOG_COOP_JUNK  printf ("[coop %d ] At %8.4f init-join-query lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), dst_join_lid);#endif   state = JOIN;  curr_join_q_lid = -1;  if (dst_join_lid >= 0)    target_join_q_lid = dst_join_lid;  AppPacket *p = new AppPacket(JOIN_QUERY);  init_join_query_packet(p,target_join_q_lid,/* Scheduler::my_clock() */ my_clock(),false, udp_recv_agent_addr);  send_pkt_wrapper(p,bse.agent_id,bse.agent_addr);  /* Set the timeout to resend the request if needed */  jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT);  return;}void coopAgent::init_layer_arr (void) {  for (int i = 0; i < MAX_LAYERS; i++) {    if (layers.arr[i] != NULL) {      delete layers.arr[i];      layers.arr[i] = NULL;    }  }  return;}void coopAgent::stop (void) {  assert (started == true);#ifdef LOG_COOP_STOP  printf ("[coop %d ] At %8.4f stop\n", id, /* Scheduler::my_clock() */ my_clock());#endif   jqt.CancelTimer();  cancel_higher_layer_ping_timers();  init_layer_arr();  Agent::stop();  return;}void coopAgent::specific_send_data_pkt (char* payload, int data_len) {  assert (started == true);#ifdef LOG_DATA_PACKET_INIT  printf ("[coop %d ] At %8.4f source data-pkt : seq %d\n", id, my_clock(), m_data_pkt_seq);#endif   forward_data_packet(payload, data_len, id, udp_recv_agent_addr, m_data_pkt_seq ++,0,-1,true);  return;}int coopAgent::specific_rx_pkt_handler  (Packet *p) {  if (p->t != PACKET_APP) {     printf ("[Err] COOP %d received unknown packet type\n", id);     return -1;  }  AppPacket *ap = (AppPacket *)p;  rx_pkt_wrapper(ap);  switch (ap->st) {  case JOIN_QUERY :  case JOIN_FORWARD :    handle_join_query_forward(ap);    break;  case JOIN_RESPONSE :    handle_join_response(ap);    break;  case CLUSTER_REFRESH :    handle_cluster_refresh(ap);    break;  case CLUSTER_MERGE :    handle_cluster_merge(ap);    break;  case PING_QUERY :    handle_ping_query(ap);    break;  case PING_RESPONSE :    handle_ping_response(ap);    break;  case PACKET_DATA :    return handle_data_packet(ap);  case PACKET_DATA_ACK :    handle_data_ack_packet(ap);    break;  default :    printf ("[Err] COOP %d received unknown packet subtype\n", id);    return -1;  }  return 0;}void coopAgent::handle_join_query_forward (AppPacket * ap) {  assert ( (ap->st == JOIN_QUERY) || (ap->st == JOIN_FORWARD) );  int qlid;  double src_time;  bool attach;  AgentInfo src_ag;  AgentInfo original_dst;  if (ap->st == JOIN_QUERY) {    qlid = ap->u.joinq_p.q_lid;    attach = ap->u.joinq_p.attach;    src_ag.agent_id = ap->src_agent;    memcpy(& src_ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in));     //src_ag.agent_addr = ap->src;    original_dst.agent_id = id;    //    original_dst.node_id = n->id;    memcpy(& original_dst.agent_addr, & udp_recv_agent_addr, sizeof(struct sockaddr_in));    src_time = ap->u.joinq_p.send_time;  }  else {    qlid = ap->u.joinforward_p.q_lid;    attach = ap->u.joinforward_p.attach;    src_ag.agent_id = ap->u.joinforward_p.src_ag.agent_id;    //    src_ag.node_id = ap->u.joinforward_p.src_ag.node_id;    memcpy(& src_ag.agent_addr, & ap->u.joinforward_p.src_ag.agent_addr, sizeof(struct sockaddr_in));    original_dst.agent_id = ap->u.joinforward_p.original_dst.agent_id;    //    original_dst.node_id = ap->u.joinforward_p.original_dst.node_id;    memcpy(& original_dst.agent_addr, & ap->u.joinforward_p.original_dst.agent_addr, sizeof(struct sockaddr_in));    src_time = ap->u.joinforward_p.send_time;  }    int required_qlid;  if (attach == true)    required_qlid = qlid;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -