📄 coop-agent.cc
字号:
/* * File: coop-agent.cc * Author: Suman Banerjee <suman@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>#include <errno.h>#include <sys/socket.h>#include <sys/types.h>#include <arpa/inet.h>#include <netdb.h>#include <unistd.h>//#define LOG_COOP_SEND//#define LOG_COOP_RECV//#define LOG_COOP_JUNK//#define LOG_COOP_RECV_2//#define LOG_COOP_JUNK_PING//#define LOG_COOP_JUNK_PING_SWITCH//#define LOG_COOP_START//#define LOG_COOP_STOP#define LOG_COOP_CLUSTER_CHANGE#define LOG_COOP_CLUSTER_PERIODIC// change this back to #undef//#define LOG_CLUSTER_INFO_DETAILED//PRM -change#undef LOG__CLUSTER_INFO_DETAILED//#define LOG_DATA_PACKET_INIT//#define LOG_DATA_PACKET_RECV//#define LOG_DATA_PACKET_SEND//#define LOG_DATA_PACKET_SEND_COUNT//#define TESTING_JUNK// #define CLOCK 0.00#define UDP_RECV_BSE_PORT 5000#define UDP_RECV_AGENT_PORT 5001#define UDP_RECV_BSE_ADDR "128.8.129.18" //#include "nicenode.h"//#include <scheduler.h>#include "app-packet.h"#include "coop-agent.h"#include "setpartition.h"#include "common.h"#include "o_timeout.h"#include "talker_utils.h"#define MAX_RECV_BYTES 10000//PRM - Magic numbers #define SLACK 1#define FLAG_SEND_ORPHAN 0#define TIME_START 500.0#define TIME_ACTIVE 250#define DOWN_DURATION 60.0#define ONE_HOP_DELAY 0.050#define VERY_LATE 8.0#define RANDOM_SEED 1234//PRM//PRM-the following variables may not be needed//for stats/////////////////// STAT/////////////////int *recv_per_pkt;int *deliv_per_pkt;int *up_per_pkt;double *latency_per_pkt;int UpCount;int num_pkt_init;int num_pkt_deliv;int num_rand_send;int num_rand_recv;int num_all_recv;int num_all_recv_type[3];int num_pkt_deliv_type[3];int num_rand_response;int sum_hop_count;int sum_rand_data_hop_count;int sum_rand_hop_count;int cached_type;double cached_time;int duplicate[3][3];int num_retransmit_request;int num_retransmit_total;int return_from_sent_proactive;double g_one_hop_latency_sum;double g_leaf_latency_sum;double g_nonleaf_latency_sum;int g_one_hop_cnt, g_leaf_cnt, g_nonleaf_cnt;double g_rtt;int g_rtt_cnt;double max_delay;//PRM// KCR-CHANGE#define MAX_RANDOM_DIST_DOUBLE 200000.0extern int errno;bool global_use_packet_cache = true;//PRMint g_initialized = 0 ;struct sockaddr_in nodes[NUM_AGENT+1]; //sunny//coopAgent * agents[NUM_AGENT+1]; //sunnyint down_no; int down_internal, down_leaf;int g_start, g_stop; FILE* fd;char *filename;//PRMcoopAgent::coopAgent (void) : commonAgent () { jqt.ca = this; m_hlprt.ca = this; m_hlpit.ca = this; //PRM m_agent_down = 0; //PRM dt.a = this;}coopAgent::~coopAgent (void) { if (started == true) { init_layer_arr(); jqt.CancelTimer(); cancel_higher_layer_ping_timers(); }}void coopAgent::init (int Id, int Index, int port_no, char * bsehostname, char* talker, unsigned short dport) { struct sockaddr_in bseaddr; char hostname[512]; struct hostent he;//sunny filename=(char*)malloc(sizeof(char)*20); sprintf(filename, "coop%d.dat", Id); fd=(FILE*)malloc(sizeof(FILE));//sunny Agent::init(Id,Index, talker, dport); memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in)); udp_recv_agent_addr.sin_family = AF_INET; // udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY); gethostname(hostname, 512); memcpy(& he, gethostbyname(hostname), sizeof(he)); udp_recv_agent_addr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]); // udp_recv_agent_addr.sin_port = UDP_RECV_AGENT_PORT; udp_recv_agent_addr.sin_port = htons(port_no); if ((udp_sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) printf("socket() failed\n"); if (bind(udp_sock, (struct sockaddr *) & udp_recv_agent_addr, sizeof(struct sockaddr)) < 0) printf("bind() failed\n"); t = AGENT_APPLICATION_COOP; /* PRM: moved to coopAgent::start for (int i = 0; i < MAX_LAYERS; i++) { layers.arr[i] = NULL; } */ m_data_pkt_seq = 0;// PRM -initialization of varaibles//sunny pkt_recv_count = 0; m_sent_proactive = 0; m_pkt_deliv = 0; m_all_recv = 0; m_max_seq_recv = -1; m_dist_from_src = 1; m_src_aid = m_src_nid = -1; m_latency = 0.0; m_total_up_time = 0.0; m_recent_up_time = LONG_TIME; //The following for-loops may be only for simulator /* for ( int i = 0 ; i < 3; i++ ) { m_num_all_recv_type[i] = 0; m_num_pkt_deliv_type[i] = 0; for ( int j = 0 ; j < 3; j++ ) m_duplicate[i][j] = 0; } m_was_dead = 0;*///sunny //PRM /* PRM: moved to coopAgent::start for (int i = 0; i < MAX_LAYERS; i++) m_last_change[i] = -1.0; *///PRM - setting random seed if ( g_initialized == 0 ) { srand(RANDOM_SEED); g_initialized = 1;printf("random seed\n"); }//PRM memset( & bseaddr, 0, sizeof(struct sockaddr_in)); bseaddr.sin_family = AF_INET; memcpy(& he, gethostbyname(bsehostname), sizeof(he)); bseaddr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]); // bseaddr.sin_addr.s_addr = inet_addr(UDP_RECV_BSE_ADDR); bseaddr.sin_port = htons(UDP_RECV_BSE_PORT); set_bse_agent(0, bseaddr);//sunnyprintf("coop-init finished\n");//sunny fflush(stdout); return;}void coopAgent::start (void) { char recv_buffer[MAX_RECV_BYTES]; struct sockaddr_in sender_addr; unsigned int sender_addr_len = sizeof(struct sockaddr_in); AppPacketSubType st; int ret, id;#define MAX_BYTES 128 char recvfrom_error_str[MAX_BYTES]; sprintf(recvfrom_error_str,"recvfrom < %d >:", id); started = true; if (bse.agent_id == -1) { printf ("[Err] BSE is not known to agent %d\n", id); exit (-1); } //printf("bse.agent_id is %d\n",bse.agent_id); state = INIT; Agent::start(); //PRM//sunny/* if ( id <= NUM_AGENT ) { memcpy(& (nodes[id]), &udp_recv_agent_addr, sizeof(struct sockaddr_in)); agents[id] = this; g_start++; }*///sunny //PRM//#ifdef LOG_COOP_START printf ("[coop %d ] At %8.4f start\n", id, /* Scheduler::my_clock() */ my_clock());//#endif m_ping_in_progress = false; // maybe this is the right place init_layer_arr(); init_join_query(0); init_packet_cache(); //PRM//sunny //the following 4 lines are from coopAgent::init for (int i = 0; i < MAX_LAYERS; i++) { layers.arr[i] = NULL; m_last_change[i] = -1.0; }printf("layer\n"); qrt.ca = this;printf("this\n"); qrt.SetTimer(QUERY_RANDOM_NODE_TIMEOUT);printf("timer\n"); idCache.clear();printf("cache\n");//sunny m_mature = 0; /*PRM change*/ //m_recent_up_time = Scheduler::Clock(); m_recent_up_time = my_clock(); /*variables from sleeDisplay*/ //UpCount++; m_use_packet_cache = global_use_packet_cache; // m_ping_in_progress = false; //handle_timer_now = false; disable_handle_timers();//sunnyprintf("before while\n");//sunny while(started) { int recvmsgsize, num_fds, found_fds = 0; struct timeval tv; fd_set copy_set; SocketHandlerMap::iterator i; //handling expired timers here.... handle_expired_timers(); enable_handle_timers(); //handle_timer_now = true; // make a copy of our socket set so it doesn't get changed // by select memcpy(©_set, &socket_set, sizeof(socket_set)); errno = 0; tv.tv_sec = 3; tv.tv_usec = 0; while((num_fds = select(max_socket_fd+1, ©_set, NULL, NULL, &tv)) < 0) { if(errno != EINTR) perror("error with select"); errno = 0; tv.tv_sec = 3; tv.tv_usec = 0; } disable_handle_timers(); for(i = socket_map.begin(); i!=socket_map.end(); i++) { if(FD_ISSET(i->first, ©_set)) { found_fds++; errno = 0; while((recvmsgsize = recvfrom(i->first, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *)&sender_addr, &sender_addr_len)) < 0) { if(errno != EINTR) perror(recvfrom_error_str); errno = 0; } // printf("Handling sender %s, recvmsgsize %d\n",inet_ntoa(sender_addr.sin_addr), recvmsgsize); assert(sender_addr_len == sizeof(struct sockaddr_in)); AppPacket *ap = (AppPacket *) this->reclaim_apppacket(recv_buffer, recvmsgsize); ret = generic_rx_pkt_handler(ap); st = ap->st; id = ap->u.data_p.seq_no; fflush(stdout); free_app_packet(ap); if(st == PACKET_DATA) { delete ap; } } // Now call back on the provided function if this is a data // packet, there is a function to call, and the packet was not // already in the cache (ie it is not a duplicate) if(i->second != NULL && st == PACKET_DATA && ret == 0) { i->second(&recv_buffer[sizeof(const_data_pkt)+sizeof(int)*3 +sizeof(struct sockaddr_in)], recvmsgsize-(sizeof(const_data_pkt)+sizeof(int)*3 +sizeof(struct sockaddr_in))); } if(found_fds == num_sockets) { // we have gone through all the sockets that we are going to find // so we might as well stop break; } } } fflush(stdout); return;}/* dst_join_lid < 0 => that attempt to join the same layer as before */void coopAgent::init_join_query (int dst_join_lid) { if (dst_join_lid >= 0) { if (state == JOIN) { if (target_join_q_lid > dst_join_lid) target_join_q_lid = dst_join_lid; return; } } else { assert (state == JOIN); } assert (m_ping_in_progress == false);//#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f init-join-query lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), dst_join_lid);//#endif state = JOIN; curr_join_q_lid = -1; if (dst_join_lid >= 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -