⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 agent.cc

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC
字号:
/* * File: agent.cc * Author: Suman Banerjee <suman@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>// #include <node.h>#include "agent.h"#include "timer.h"#include "app-packet.h"#include <sys/socket.h>#include <arpa/inet.h>#include <netinet/in.h>#include <unistd.h>#include <netdb.h>#include <sys/time.h>#define STATE_SIZE	256#define MAX_RECV_BYTES 10000#define MAX_PAYLOAD 10000char rand_seq_state[STATE_SIZE];struct timeval start_time; float start_time_usec_float;Agent::Agent (int Id, int Index) {  t = AGENT_NONE;  id = Id;  index = Index;  //  n = N;  started = false;  //n->a = this;  FD_ZERO(&socket_set);  num_sockets = 0;  max_socket_fd = 0;  drawer_sock = -1;}void Agent::init (int Id, int Index, char* drawer, unsigned short dport) {  struct hostent* dr;  id = Id;  index = Index;  //  n = N;  started = false;  FD_ZERO(&socket_set);  num_sockets = 0;  max_socket_fd = 0;  drawer_sock = -1;  // Set up the stuff for visualizing the multicast group. --Ryan  if(drawer != NULL) {    dr = gethostbyname(drawer);    if(dr == NULL) {      printf("Warning: address \"%s\" not known.\n)", drawer);      printf("Not sending packets to drawer\n");    }    else {      drawer_addr.sin_family = AF_INET;      drawer_addr.sin_port = htons(dport);      drawer_addr.sin_addr.s_addr = *((unsigned long*)dr->h_addr_list[0]);      if((drawer_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)	printf("Error creating socket\n");      if(connect(drawer_sock, (struct sockaddr*)&drawer_addr, sizeof(drawer_addr)) < 0)	printf("Error connecting to drawer\n");    }  }  //  n->a = this;  //  memcpy(& agent_addr, &sa, sizeof(struct sockaddr_in));  /*  memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in));  udp_recv_agent_addr.sin_family = AF_INET;  //  udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY);  udp_recv_agent_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  udp_recv_agent_addr.sin_port = UDP_RECV_AGENT_PORT;  */}void Agent::start (void) {  //  assert (! started);    started = true;    gettimeofday(& start_time, NULL);    start_time_usec_float = ((float)start_time.tv_usec)/1000000.0;    return;}void Agent::stop (void) {  assert (started);  started = false;  assert(close(udp_sock) == 0);  return;}int Agent::struct_pkt_reset() {  pkt_info.reset();  return 0;}int Agent::struct_pkt_set_size(int size) {  pkt_info.set_size(size);  return 0;}int Agent::struct_pkt_add_member(int id) {  pkt_info.add_member(id);  return 0;}int Agent::struct_pkt_set_lid(int lid) {  pkt_info.lid = lid;  return 0;}int Agent::struct_pkt_set_ldr(int ldr) {  pkt_info.leader = ldr;  return 0;}int Agent::send_struct_pkt() {  if(drawer_sock < 0)    return 0;  assert(pkt_info.gr_size <= UPPER_3K);  // This should afford us a little extra room, just in case;  int pkt_size = UPPER_3K+sizeof(struct StructPktInfo);  char pkt[pkt_size];  memset(pkt, 0, pkt_size);  put_in_pkt(pkt, htonl(pkt_info.my_id), 0);  put_in_pkt(pkt, htonl(pkt_info.gr_size), 4);  put_in_pkt(pkt, htonl(pkt_info.lid), 8);  put_in_pkt(pkt, htonl(pkt_info.leader), 12);  for(int x = 0; x < pkt_info.gr_size; x++) {    put_in_pkt(pkt, htonl(pkt_info.members[x]), 16+4*x);  }  if(send(drawer_sock, pkt, 16+4*pkt_info.gr_size, 0) < 0) {    printf("Error sending packet to drawer\n");    return -1;  }  return 0;}void Agent::put_in_pkt(char* pkt, int value, int index) {  memcpy(&pkt[index], &value, sizeof(int));}void Agent::add_listening_fd(int fd, SOCKETHANDLER callback) {  FD_SET(fd, &socket_set);  // callback might be NULL, but that's ok.  Just means we don't  // want a callback for this fd.  socket_map[fd] = callback;  num_sockets++;  if(fd > max_socket_fd)    max_socket_fd = fd;}void Agent::remove_listening_fd(int fd) {  FD_CLR(fd, &socket_set);  socket_map.erase(fd);  num_sockets--;  if(fd == max_socket_fd) {    // we need to compute the new max value    SocketHandlerMap::const_iterator i;    max_socket_fd = 0;    for(i=socket_map.begin(); i!=socket_map.end(); i++) {      if(i->first > max_socket_fd)	max_socket_fd = i->first;    }  }}/*// flatten out the app packet into the payload and send it to dst_ag_addr. I tried to maintain the same byte ordering as defined in AppPacket.h  void Agent::send_pkt (Packet *p, int dst_ag, struct sockaddr_in dst_ag_addr) {  char payload[MAX_PAYLOAD];  AppPacket *ap = (AppPacket *) p;    assert (dst_ag != id);  assert (p);  switch (p->t) {  case (PACKET_APP) :      int len = flatten_apppacket(payload, p, dst_ag); { int i;      printf ("len %d\n", len);    for (i=0; i < len; i++)      printf ("%x\n", payload[i]);    printf ("\n");    }  if (sendto(udp_sock, payload, len, 0, (struct sockaddr *) & dst_ag_addr, sizeof(struct sockaddr_in)) != len) {    printf("%s, %d, couldn't send msg properly\n", __FILE__, __LINE__);    return;     }  }}*/void Agent::send_pkt (char * payload, int len, int dst_ag, struct sockaddr_in dst_ag_addr) {  //  char payload[MAX_PAYLOAD];  //  AppPacket *ap = (AppPacket *) p;    assert (dst_ag != id);  // assert (p);  //switch (p->t) {  //case (PACKET_APP) :      //int len = flatten_apppacket(payload, p, dst_ag);  /*    { int i;      printf ("len %d\n", len);    for (i=0; i < len; i++)      printf ("%x\n", payload[i]);    printf ("\n");    }  */  dst_ag_addr.sin_family = AF_INET;  if (sendto(udp_sock, payload, len, 0, (struct sockaddr *) &	     dst_ag_addr, sizeof(struct sockaddr_in)) != len) {    perror("send");    printf("couldn't send msg properly with len = %d\n",len);    printf(" host %s port %d\n",inet_ntoa(dst_ag_addr.sin_addr), ntohs(dst_ag_addr.sin_port));    // }  }  //  else printf("send msg size is %d\n",len);  return; }/*void Agent::generic_send_data_pkt (SourceDistributionType sdt, int burst_size, double burst_gap) {  if (burst_size > 0) {    specific_send_data_pkt();    if (burst_size == 1)      return;    AgentEvent * ag_e =  new AgentEvent(AGENT_SOURCE);    ag_e->sdt = sdt;    ag_e->source_burst = burst_size - 1;    ag_e->source_gap = burst_gap;     EventInfo * sim_ev = new EventInfo(EVENT_AGENT,(void*)this,(void*)ag_e);    double actual_gap;    switch (sdt) {    case DIST_CONSTANT:      actual_gap = burst_gap;      break;    case DIST_UNIFORM:      actual_gap = burst_gap * ( ((double)(get_pkt_source_random())) / ((double)RAND_MAX) );      break;    default:      assert(0);    }    Scheduler::AddRelativeEvent(actual_gap,sim_ev);  }  return;}*/void Agent::init_data_traffic_params(SourceDistributionType sdt, int burst_size, double burst_gap, long ticks_from_now) {    data_traffic_params.t = AGENT_SOURCE;    data_traffic_params.sdt = sdt;    data_traffic_params.source_burst = burst_size;    data_traffic_params.source_gap = burst_gap;    assert(dt.a != NULL);    dt.SetTimer(ticks_from_now);    return; }void Agent::generic_send_data_pkt (char* payload, int data_len) {  if (data_traffic_params.source_burst > 0) {    specific_send_data_pkt(payload, data_len);    if (data_traffic_params.source_burst == 1)      return;        data_traffic_params.source_burst --;    double actual_gap;    switch (data_traffic_params.sdt) {    case DIST_CONSTANT:      actual_gap = data_traffic_params.source_gap;      break;    case DIST_UNIFORM:      actual_gap = data_traffic_params.source_gap * ( ((double)(get_pkt_source_random())) / ((double)RAND_MAX) );      break;    default:      assert(0);    }    //    Scheduler::AddRelativeEvent(actual_gap,sim_ev);    dt.SetTimer(actual_gap);  }  return;}void Agent::specific_send_data_pkt (char* payload, int data_len) {    printf("in agent send data\n");  return;}int Agent::generic_rx_pkt_handler (Packet *p) {  //printf ("the dest agent id is ----- %d\n",p->dst_agent);  //printf("and the id i am checking is  ---------- %d\n", id);  //printf("AAAAAAAAAAAAAAAAAAAAAA\n");  //printf("The packet send time %f\n", p->send_time);  //printf("The packet seq no %d\n", p->seq_no);  if (p->dst_agent != id) { /* Incorrect forwarding of the packet */    printf ("[Err] Packet %d incorrectly forwarded to < ag %d > correct is < ag %d > source is < ag %d > \n", p->id, id, p->dst_agent, p->src_agent);    return -1;  }  /* Do app level packet handling here */  return specific_rx_pkt_handler(p);}int Agent::specific_rx_pkt_handler (Packet *p) {  return 0;}void Agent::EventHandler (AgentEvent *ae) {  switch (ae->t) {    case AGENT_START :       start ();      /* send_pkt(new Packet,2,8); */      break;    case AGENT_STOP :      stop ();      break;    case AGENT_SOURCE :      //      generic_send_data_pkt(ae->sdt,ae->source_burst,ae->source_gap);      break;    default :    printf ("[Err] Illegal agent event\n");    exit(-1);  }  delete ae;  return;}void init_random_for_pkt_sources (unsigned int seed) {  //  char * old_state =   initstate(seed,rand_seq_state,STATE_SIZE);  // setstate(old_state);  return;}long int get_pkt_source_random (void) {  //  char * old_state =   setstate(rand_seq_state);  // assert (old_state != NULL);  long int ret_val = random();  // setstate(old_state);  return ret_val;}void DataTimer::EventHandler(bool from_handle_expired_timers) {    a->generic_send_data_pkt("testing", 7);  return; }double my_clock() {  struct timeval time1;  gettimeofday(&time1, NULL);  double d_time;  d_time = (time1.tv_sec + (((float)time1.tv_usec) / 1000000.0));  d_time -= (start_time.tv_sec + start_time_usec_float);  return d_time;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -