📄 coop-agent.cc,v
字号:
head 1.1;access;symbols;locks rbraud:1.1; strict;comment @// @;1.1date 2002.07.02.19.29.00; author rbraud; state Exp;branches;next ;desc@@1.1log@Initial revision@text@/* * File: coop-agent.cc * Author: Suman Banerjee <suman@@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>#include <sys/socket.h>#include <sys/types.h>#include <arpa/inet.h>#include <netdb.h>#include <unistd.h>//#define LOG_COOP_SEND//#define LOG_COOP_RECV//#define LOG_COOP_JUNK//#define LOG_COOP_RECV_2//#define LOG_COOP_JUNK_PING//#define LOG_COOP_JUNK_PING_SWITCH//#define LOG_COOP_START//#define LOG_COOP_STOP//#define LOG_COOP_CLUSTER_CHANGE//#define LOG_COOP_CLUSTER_PERIODIC//#undef LOG_CLUSTER_INFO_DETAILED//#define LOG_DATA_PACKET_INIT//#define LOG_DATA_PACKET_RECV//#define LOG_DATA_PACKET_SEND//#define LOG_DATA_PACKET_SEND_COUNT//#define TESTING_JUNK// #define CLOCK 0.00#define UDP_RECV_BSE_PORT 5000#define UDP_RECV_AGENT_PORT 5001#define UDP_RECV_BSE_ADDR "128.8.129.18" //#include "nicenode.h"//#include <scheduler.h>#include "app-packet.h"#include "coop-agent.h"#include "setpartition.h"#include "common.h"#include "o_timeout.h"#include "talker.h"#define MAX_RECV_BYTES 10000// KCR-CHANGE#define MAX_RANDOM_DIST_DOUBLE 200000.0bool global_use_packet_cache = true;coopAgent::coopAgent (void) : commonAgent () { jqt.ca = this; m_hlprt.ca = this; m_hlpit.ca = this; dt.a = this;}coopAgent::~coopAgent (void) { if (started == true) { init_layer_arr(); jqt.CancelTimer(); cancel_higher_layer_ping_timers(); }}void coopAgent::init (int Id, int Index, int port_no, char * bsehostname) { struct sockaddr_in bseaddr; char hostname[512]; struct hostent he; Agent::init(Id,Index); memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in)); udp_recv_agent_addr.sin_family = AF_INET; // udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY); gethostname(hostname, 512); memcpy(& he, gethostbyname(hostname), sizeof(he)); udp_recv_agent_addr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]); // udp_recv_agent_addr.sin_port = UDP_RECV_AGENT_PORT; udp_recv_agent_addr.sin_port = htons(port_no); if ((udp_sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) printf("socket() failed\n"); if (bind(udp_sock, (struct sockaddr *) & udp_recv_agent_addr, sizeof(struct sockaddr)) < 0) printf("bind() failed\n"); t = AGENT_APPLICATION_COOP; for (int i = 0; i < MAX_LAYERS; i++) { layers.arr[i] = NULL; } m_data_pkt_seq = 0; for (int i = 0; i < MAX_LAYERS; i++) m_last_change[i] = -1.0; memset( & bseaddr, 0, sizeof(struct sockaddr_in)); bseaddr.sin_family = AF_INET; memcpy(& he, gethostbyname(bsehostname), sizeof(he)); bseaddr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]); // bseaddr.sin_addr.s_addr = inet_addr(UDP_RECV_BSE_ADDR); bseaddr.sin_port = htons(UDP_RECV_BSE_PORT); set_bse_agent(0, bseaddr); fflush(stdout); return;}void coopAgent::start (void) { char recv_buffer[MAX_RECV_BYTES]; struct sockaddr_in sender_addr; unsigned int sender_addr_len = sizeof(struct sockaddr_in); AppPacketSubType st; int ret, id;#define MAX_BYTES 128 char recvfrom_error_str[MAX_BYTES]; sprintf(recvfrom_error_str,"recvfrom < %d >:", id); started = true; if (bse.agent_id == -1) { printf ("[Err] BSE is not known to agent %d\n", id); exit (-1); } // printf("bse.agent_id is %d\n",bse.agent_id); state = INIT; Agent::start();#ifdef LOG_COOP_START printf ("[coop %d ] At %8.4f start\n", id, /* Scheduler::my_clock() */ my_clock());#endif m_ping_in_progress = false; // maybe this is the right place init_layer_arr(); init_join_query(0); init_packet_cache(); m_use_packet_cache = global_use_packet_cache; // m_ping_in_progress = false; //handle_timer_now = false; disable_handle_timers(); for (;;) { int recvmsgsize, num_fds, found_fds = 0; fd_set copy_set; SocketHandlerMap::iterator i; //handling expired timers here.... handle_expired_timers(); enable_handle_timers(); //handle_timer_now = true; /*while ((recvmsgsize = recvfrom(udp_sock, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *) & sender_addr, & sender_addr_len)) < 0) { perror(recvfrom_error_str); }*/ // make a copy of our socket set so it doesn't get changed // by select memcpy(©_set, &socket_set, sizeof(socket_set)); while((num_fds = select(max_socket_fd+1, ©_set, NULL, NULL, NULL)) < 0) { //perror("error with select"); } disable_handle_timers(); for(i = socket_map.begin(); i!=socket_map.end(); i++) { if(FD_ISSET(i->first, ©_set)) { found_fds++; while((recvmsgsize = recvfrom(i->first, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *)&sender_addr, &sender_addr_len)) < 0) { perror(recvfrom_error_str); } // printf("Handling sender %s, recvmsgsize %d\n",inet_ntoa(sender_addr.sin_addr), recvmsgsize); assert(sender_addr_len == sizeof(struct sockaddr_in)); AppPacket *ap = (AppPacket *) this->reclaim_apppacket(recv_buffer, recvmsgsize); ret = generic_rx_pkt_handler(ap); st = ap->st; id = ap->u.data_p.seq_no; fflush(stdout); free_app_packet(ap); } // Now call back on the provided function if this is a data // packet, there is a function to call, and the packet was not // already in the cache (ie it is not a duplicate) if(i->second != NULL && st == PACKET_DATA && ret == 0) { i->second(&recv_buffer[sizeof(const_data_pkt)+sizeof(int)*3 +sizeof(struct sockaddr_in)], recvmsgsize-(sizeof(const_data_pkt)+sizeof(int)*3 +sizeof(struct sockaddr_in))); printf("received seqno %d\n", id); } if(found_fds == num_sockets) { // we have gone through all the sockets that we are going to find // so we might as well stop break; } } } fflush(stdout); return;}/* dst_join_lid < 0 => that attempt to join the same layer as before */void coopAgent::init_join_query (int dst_join_lid) { if (dst_join_lid >= 0) { if (state == JOIN) { if (target_join_q_lid > dst_join_lid) target_join_q_lid = dst_join_lid; return; } } else { assert (state == JOIN); } assert (m_ping_in_progress == false);#ifdef LOG_COOP_JUNK printf ("[coop %d ] At %8.4f init-join-query lid %d\n", id, /* Scheduler::my_clock() */ my_clock(), dst_join_lid);#endif state = JOIN; curr_join_q_lid = -1; if (dst_join_lid >= 0) target_join_q_lid = dst_join_lid; AppPacket *p = new AppPacket(JOIN_QUERY); init_join_query_packet(p,target_join_q_lid,/* Scheduler::my_clock() */ my_clock(),false, udp_recv_agent_addr); send_pkt_wrapper(p,bse.agent_id,bse.agent_addr); /* Set the timeout to resend the request if needed */ jqt.SetTimer(CONST_JOIN_QUERY_TIMEOUT); return;}void coopAgent::init_layer_arr (void) { for (int i = 0; i < MAX_LAYERS; i++) { if (layers.arr[i] != NULL) { delete layers.arr[i]; layers.arr[i] = NULL; } } return;}void coopAgent::stop (void) { assert (started == true);#ifdef LOG_COOP_STOP printf ("[coop %d ] At %8.4f stop\n", id, /* Scheduler::my_clock() */ my_clock());#endif jqt.CancelTimer(); cancel_higher_layer_ping_timers(); init_layer_arr(); Agent::stop(); return;}void coopAgent::specific_send_data_pkt (char* payload, int data_len) { assert (started == true);#ifdef LOG_DATA_PACKET_INIT printf ("[coop %d ] At %8.4f source data-pkt : seq %d\n", id, my_clock(), m_data_pkt_seq);#endif forward_data_packet(payload, data_len, id, udp_recv_agent_addr, m_data_pkt_seq ++,0,-1,true); return;}int coopAgent::specific_rx_pkt_handler (Packet *p) { if (p->t != PACKET_APP) { printf ("[Err] COOP %d received unknown packet type\n", id); return -1; } AppPacket *ap = (AppPacket *)p; rx_pkt_wrapper(ap); switch (ap->st) { case JOIN_QUERY : case JOIN_FORWARD : handle_join_query_forward(ap); break; case JOIN_RESPONSE : handle_join_response(ap); break; case CLUSTER_REFRESH : handle_cluster_refresh(ap); break; case CLUSTER_MERGE : handle_cluster_merge(ap); break; case PING_QUERY : handle_ping_query(ap); break; case PING_RESPONSE : handle_ping_response(ap); break; case PACKET_DATA : return handle_data_packet(ap); case PACKET_DATA_ACK : handle_data_ack_packet(ap); break; default : printf ("[Err] COOP %d received unknown packet subtype\n", id); return -1; } return 0;}void coopAgent::handle_join_query_forward (AppPacket * ap) { assert ( (ap->st == JOIN_QUERY) || (ap->st == JOIN_FORWARD) ); int qlid; double src_time; bool attach; AgentInfo src_ag; AgentInfo original_dst; if (ap->st == JOIN_QUERY) { qlid = ap->u.joinq_p.q_lid; attach = ap->u.joinq_p.attach; src_ag.agent_id = ap->src_agent; memcpy(& src_ag.agent_addr, & ap->src_agent_addr, sizeof(struct sockaddr_in)); //src_ag.agent_addr = ap->src; original_dst.agent_id = id; // original_dst.node_id = n->id; memcpy(& original_dst.agent_addr, & udp_recv_agent_addr, sizeof(struct sockaddr_in)); src_time = ap->u.joinq_p.send_time; } else { qlid = ap->u.joinforward_p.q_lid; attach = ap->u.joinforward_p.attach; src_ag.agent_id = ap->u.joinforward_p.src_ag.agent_id; // src_ag.node_id = ap->u.joinforward_p.src_ag.node_id; memcpy(& src_ag.agent_addr, & ap->u.joinforward_p.src_ag.agent_addr, sizeof(struct sockaddr_in)); original_dst.agent_id = ap->u.joinforward_p.original_dst.agent_id; // original_dst.node_id = ap->u.joinforward_p.original_dst.node_id; memcpy(& original_dst.agent_addr, & ap->u.joinforward_p.original_dst.agent_addr, sizeof(struct sockaddr_in)); src_time = ap->u.joinforward_p.send_time; } int required_qlid; if (attach == true) required_qlid = qlid;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -