⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bse-agent.cc,v

📁 这是P2P流媒体方案-NICE的实现源码
💻 CC,V
📖 第 1 页 / 共 2 页
字号:
head	1.2;access;symbols;locks	rbraud:1.2; strict;comment	@// @;1.2date	2002.10.07.17.12.51;	author rbraud;	state Exp;branches;next	1.1;1.1date	2002.07.02.19.28.13;	author rbraud;	state Exp;branches;next	;desc@@1.2log@Newest revision?@text@/* * File: bse-agent.cc * Author: Suman Banerjee <suman@@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>#include <sys/socket.h>#include <arpa/inet.h>#include <sys/types.h>#include <netinet/in.h>#include <netdb.h>#include <unistd.h>//#include "nicenode.h"//#include <scheduler.h>#include "app-packet.h"#include "bse-agent.h"#include "setpartition.h"#include "common.h"#include "layerinfo.h"#include "o_timeout.h"#include "talker_utils.h"// #define CLOCK 0.00#define LOG_BSE_START#define LOG_BSE_STOP#define LOG_BSE_SEND#define LOG_BSE_RECV#define LOG_BSE_JUNK#define LOG_BSE_PARTITION#define LOG_BSE_CLUSTER_CHANGE// change this back#define LOG_CLUSTER_INFO_DETAILED#define LOG_DATA_PACKET_SEND#define LOG_DATA_PACKET_RECV#define MAX_RECV_BYTES 10000#define UDP_RECV_AGENT_PORT 5000void bseAgent::init (int Id, int Index) {  char hostname[512];  struct hostent he;  Agent::init(Id,Index);  memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in));  udp_recv_agent_addr.sin_family = AF_INET;  //  udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY);  gethostname(hostname, 512);  memcpy(& he, gethostbyname(hostname), sizeof(he));  udp_recv_agent_addr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]);  //  udp_recv_agent_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  printf("binding to addr %s\n",inet_ntoa(udp_recv_agent_addr.sin_addr));  udp_recv_agent_addr.sin_port = htons(UDP_RECV_AGENT_PORT);  t = AGENT_APPLICATION_BSE;  top_layer = NULL;  coop_agents_sofar = 0;fflush(0);  return;}void bseAgent::start (void) {  char recv_buffer[MAX_RECV_BYTES];  struct sockaddr_in sender_addr;  unsigned int sender_addr_len = sizeof(struct sockaddr_in);#define MAX_BYTES       128  char recvfrom_error_str[MAX_BYTES];  sprintf(recvfrom_error_str,"recvfrom < %d >:", id);  Agent::start();  reset_layer_info(0);  log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE  display_top_layer_info();#endif   if ((udp_sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)    printf("socket() failed\n");  if (bind(udp_sock, (struct sockaddr *) & udp_recv_agent_addr, sizeof(struct sockaddr)) < 0)     printf("bind() failed\n");#ifdef LOG_BSE_START  printf ("[bse %d ] At %8.4f start\n", id, /* Scheduler::Clock */ my_clock());#endif   //handle_timer_now = false;  disable_handle_timers();  for (;;) {    int recvmsgsize;    //handling expired timers here....    handle_expired_timers();        //handle_timer_now = true;    enable_handle_timers();    while ((recvmsgsize = recvfrom(udp_sock, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *) & sender_addr, & sender_addr_len)) < 0)      perror(recvfrom_error_str);    //handle_timer_now = false;    disable_handle_timers();    //    printf("Handling sender %s , recvmsgsize %d\n",inet_ntoa(sender_addr.sin_addr), recvmsgsize);    /*     { int i;    for (i=0; i < recvmsgsize; i++)      printf ("%d: %x\n", i, recv_buffer[i]);    printf ("\n");    }    */        assert(sender_addr_len == sizeof(struct sockaddr_in));    AppPacket *ap = (AppPacket *) reclaim_apppacket(recv_buffer, recvmsgsize);    //    memcpy(& ((Packet *)ap)->src_agent_addr, & sender_addr, sender_addr_len);    generic_rx_pkt_handler(ap);    fflush(stdout);    free_app_packet(ap);  }  return;}void bseAgent::stop (void) {#ifdef LOG_BSE_STOP  printf ("[bse %d ] At %8.4f stop\n", id, /* Scheduler::Clock */ my_clock());#endif   assert (started == true);  if (top_layer != NULL)    delete top_layer;  Agent::stop();fflush(0);  return;}int bseAgent::specific_rx_pkt_handler  (Packet *p) {  if (p->t != PACKET_APP) {     printf ("[Err] BSE received unknown packet type\n");fflush(0);     return -1;  }  AppPacket *ap = (AppPacket *)p;  rx_pkt_wrapper(ap);  switch (ap->st) {      case JOIN_QUERY :        handle_join_query(ap);    break;  case CLUSTER_REFRESH :    handle_cluster_refresh(ap);    break;      case CLUSTER_MERGE :    handle_cluster_merge(ap);    break;      case JOIN_FORWARD :    printf ("[Err] BSE received Join-forward, ignored\n");    break;  case PING_QUERY :  case PING_RESPONSE :    printf ("[Err] BSE received ping, ignored\n");    break;  case PACKET_DATA :    return handle_data_packet(ap);  case PACKET_DATA_ACK :    handle_data_ack_packet(ap);    break;  default :    printf ("[Err] BSE received unknown packet subtype\n");    return -1;  }fflush(0);  return 0;}void bseAgent::reset_layer_info (int top_lid) {  if (top_layer != NULL) {    top_layer->MakeEmpty();    top_layer->lid = top_lid;    // delete top_layer;#ifdef LOG_BSE_CLUSTER_CHANGE  printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif   }  else    top_layer = new LayerInfo(top_lid,this);#ifdef LOG_BSE_JUNK  printf ("[bse %d ] At %8.4f reset-layer : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_lid);#endif   LayerAgentInfo * self = new LayerAgentInfo (id,udp_recv_agent_addr);  self->dist = 0.0;  top_layer->AddClusterRoot(self);  top_layer->me_in_layer = true;  top_layer->cr_chkt.CancelTimer();  top_layer->cr_msgt.CancelTimer();  top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT);  top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);fflush(0);  return;}void bseAgent::handle_join_query (AppPacket *p) {  assert (p->st == JOIN_QUERY); // || (p->st == JOIN_FORWARD) );  /* Join forward not accepted right now. To do so, changes needed:     See BSE-JF change in coop-agent.cc  */  int qlid;  AgentInfo src_ag;  AgentInfo original_dst;  double src_time;  if (p->st == JOIN_QUERY) {    qlid = p->u.joinq_p.q_lid;    src_ag.agent_id = p->src_agent;    memcpy(& src_ag.agent_addr, &p->src_agent_addr, sizeof(struct sockaddr_in));    /*    if (src_ag.agent_id == UNDEFINED_AGENT_ID) {      src_ag.agent_id = ++ coop_agents_sofar;    }     memcpy (& src_ag.agent_addr, &p->u.joinq_p.reply_addr, sizeof(struct sockaddr_in));    */    original_dst.agent_id = id;    memcpy (& original_dst.agent_addr, & udp_recv_agent_addr, sizeof(struct sockaddr_in));    src_time = p->u.joinq_p.send_time;  }  else {    qlid = p->u.joinforward_p.q_lid;    src_ag.agent_id = p->u.joinforward_p.src_ag.agent_id;    memcpy (& src_ag.agent_addr, & p->u.joinforward_p.src_ag.agent_addr, sizeof(struct sockaddr_in));    original_dst.agent_id = p->u.joinforward_p.original_dst.agent_id;    memcpy (& original_dst.agent_addr , & p->u.joinforward_p.original_dst.agent_addr, sizeof (struct sockaddr_in));    src_time = p->u.joinforward_p.send_time;  }    #ifdef LOG_BSE_RECV  printf ("[bse %d ] At %8.4f recvd-join-query : < [ %d ] > lid %d ( top lid %d )\n", id, /* Scheduler::Clock */ my_clock(), src_ag.agent_id, qlid, top_layer->lid);#endif   if (top_layer->lid < qlid) {    // Someone claims to need to join to a higher layer than I know of    reset_layer_info(qlid);  }  bool do_attach = false;  LayerAgentInfo * new_member = NULL;  if (top_layer->lid == qlid) {    do_attach = true;    new_member = new LayerAgentInfo(src_ag.agent_id, src_ag.agent_addr);    bool is_new_member = top_layer->AddClusterMember(new_member);    if (is_new_member == false) {      delete new_member;      new_member = top_layer->FindClusterMember(src_ag.agent_id);      new_member->refresh = true;    }    else {      log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE      display_top_layer_info();#endif     }    //    new_member->dist = /* Scheduler::Clock */ my_clock() - src_time;    new_member->dist = MAX_RTT;    struct sockaddr_in their_addr;     memcpy(& their_addr, & new_member->ag.agent_addr, sizeof(struct sockaddr_in));    their_addr.sin_port = htons(DIST_EST_PORT);    new_member->dist = estimate_rtt_wrapper(their_addr);    //if (is_new_member == false)    //  return;  }  /* Create response packet */  AppPacket *resp_p = new AppPacket(JOIN_RESPONSE);  resp_p->u.joinresp_p.your_id = src_ag.agent_id; //send new id  put_layer_cluster_info_into_packet(top_layer,resp_p);  resp_p->u.joinresp_p.accept = true;  resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id;  memcpy ( & resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in));  send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr);  if (do_attach == true) {#ifdef LOG_BSE_SEND    printf ("\n[bse %d ] At %8.4f : new-member < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), new_member->ag.agent_id);#endif     top_layer->cr_msgt.CancelTimer();    send_top_layer_cluster_refresh(new_member);    top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);  }fflush(0);  return;}// Accept the merge only if this is the only member in the top layer// or if there is no membervoid bseAgent::handle_cluster_merge (AppPacket * ap) {  if ( top_layer->lid != ap->u.clustermerge_p.layer_id )    return;  if ( (top_layer->ag_list.GetSize() == 1) ||       ( (top_layer->ag_list.GetSize() == 2) && (top_layer->ag_list.Locate(ap->src_agent) != NULL) ) ) {#ifdef LOG_BSE_RECV    printf ("[bse %d ] At %8.4f recvd-valid-merge : < [ %d ] > lid %d\n", id, /* Scheduler::Clock */ my_clock(), ap->src_agent, ap->u.clustermerge_p.layer_id);#endif     reset_layer_info(ap->u.clustermerge_p.layer_id);    top_layer->AddExtraMembersFromAltRootPacket(ap);    log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE    display_top_layer_info();#endif   }  fflush(0);  return;}void bseAgent::handle_cluster_refresh (AppPacket *ap) {  if (top_layer->lid != ap->u.clusterrefresh_p.layer_id)    return;  void * pos = top_layer->ag_list.Locate(ap->src_agent);  if (pos == NULL)    return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -