📄 bse-agent.cc,v
字号:
head 1.2;access;symbols;locks rbraud:1.2; strict;comment @// @;1.2date 2002.10.07.17.12.51; author rbraud; state Exp;branches;next 1.1;1.1date 2002.07.02.19.28.13; author rbraud; state Exp;branches;next ;desc@@1.2log@Newest revision?@text@/* * File: bse-agent.cc * Author: Suman Banerjee <suman@@cs.umd.edu> * Date: 15th February, 2002 * Terms: GPL * * NICE Application Layer Multicast */#include <stdio.h>#include <stdlib.h>#include <assert.h>#include <sys/socket.h>#include <arpa/inet.h>#include <sys/types.h>#include <netinet/in.h>#include <netdb.h>#include <unistd.h>//#include "nicenode.h"//#include <scheduler.h>#include "app-packet.h"#include "bse-agent.h"#include "setpartition.h"#include "common.h"#include "layerinfo.h"#include "o_timeout.h"#include "talker_utils.h"// #define CLOCK 0.00#define LOG_BSE_START#define LOG_BSE_STOP#define LOG_BSE_SEND#define LOG_BSE_RECV#define LOG_BSE_JUNK#define LOG_BSE_PARTITION#define LOG_BSE_CLUSTER_CHANGE// change this back#define LOG_CLUSTER_INFO_DETAILED#define LOG_DATA_PACKET_SEND#define LOG_DATA_PACKET_RECV#define MAX_RECV_BYTES 10000#define UDP_RECV_AGENT_PORT 5000void bseAgent::init (int Id, int Index) { char hostname[512]; struct hostent he; Agent::init(Id,Index); memset( & udp_recv_agent_addr, 0, sizeof(struct sockaddr_in)); udp_recv_agent_addr.sin_family = AF_INET; // udp_recv_agent_addr.sin_addr.s_addr = htonl(INADDR_ANY); gethostname(hostname, 512); memcpy(& he, gethostbyname(hostname), sizeof(he)); udp_recv_agent_addr.sin_addr.s_addr = * ((unsigned long *) he.h_addr_list[0]); // udp_recv_agent_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); printf("binding to addr %s\n",inet_ntoa(udp_recv_agent_addr.sin_addr)); udp_recv_agent_addr.sin_port = htons(UDP_RECV_AGENT_PORT); t = AGENT_APPLICATION_BSE; top_layer = NULL; coop_agents_sofar = 0;fflush(0); return;}void bseAgent::start (void) { char recv_buffer[MAX_RECV_BYTES]; struct sockaddr_in sender_addr; unsigned int sender_addr_len = sizeof(struct sockaddr_in);#define MAX_BYTES 128 char recvfrom_error_str[MAX_BYTES]; sprintf(recvfrom_error_str,"recvfrom < %d >:", id); Agent::start(); reset_layer_info(0); log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif if ((udp_sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) printf("socket() failed\n"); if (bind(udp_sock, (struct sockaddr *) & udp_recv_agent_addr, sizeof(struct sockaddr)) < 0) printf("bind() failed\n");#ifdef LOG_BSE_START printf ("[bse %d ] At %8.4f start\n", id, /* Scheduler::Clock */ my_clock());#endif //handle_timer_now = false; disable_handle_timers(); for (;;) { int recvmsgsize; //handling expired timers here.... handle_expired_timers(); //handle_timer_now = true; enable_handle_timers(); while ((recvmsgsize = recvfrom(udp_sock, recv_buffer, MAX_RECV_BYTES, 0, (struct sockaddr *) & sender_addr, & sender_addr_len)) < 0) perror(recvfrom_error_str); //handle_timer_now = false; disable_handle_timers(); // printf("Handling sender %s , recvmsgsize %d\n",inet_ntoa(sender_addr.sin_addr), recvmsgsize); /* { int i; for (i=0; i < recvmsgsize; i++) printf ("%d: %x\n", i, recv_buffer[i]); printf ("\n"); } */ assert(sender_addr_len == sizeof(struct sockaddr_in)); AppPacket *ap = (AppPacket *) reclaim_apppacket(recv_buffer, recvmsgsize); // memcpy(& ((Packet *)ap)->src_agent_addr, & sender_addr, sender_addr_len); generic_rx_pkt_handler(ap); fflush(stdout); free_app_packet(ap); } return;}void bseAgent::stop (void) {#ifdef LOG_BSE_STOP printf ("[bse %d ] At %8.4f stop\n", id, /* Scheduler::Clock */ my_clock());#endif assert (started == true); if (top_layer != NULL) delete top_layer; Agent::stop();fflush(0); return;}int bseAgent::specific_rx_pkt_handler (Packet *p) { if (p->t != PACKET_APP) { printf ("[Err] BSE received unknown packet type\n");fflush(0); return -1; } AppPacket *ap = (AppPacket *)p; rx_pkt_wrapper(ap); switch (ap->st) { case JOIN_QUERY : handle_join_query(ap); break; case CLUSTER_REFRESH : handle_cluster_refresh(ap); break; case CLUSTER_MERGE : handle_cluster_merge(ap); break; case JOIN_FORWARD : printf ("[Err] BSE received Join-forward, ignored\n"); break; case PING_QUERY : case PING_RESPONSE : printf ("[Err] BSE received ping, ignored\n"); break; case PACKET_DATA : return handle_data_packet(ap); case PACKET_DATA_ACK : handle_data_ack_packet(ap); break; default : printf ("[Err] BSE received unknown packet subtype\n"); return -1; }fflush(0); return 0;}void bseAgent::reset_layer_info (int top_lid) { if (top_layer != NULL) { top_layer->MakeEmpty(); top_layer->lid = top_lid; // delete top_layer;#ifdef LOG_BSE_CLUSTER_CHANGE printf ("[bse %d ] At %8.4f cluster-info : lid %d : deleted\n", id, /* Scheduler::Clock */ my_clock(), top_layer->lid);#endif } else top_layer = new LayerInfo(top_lid,this);#ifdef LOG_BSE_JUNK printf ("[bse %d ] At %8.4f reset-layer : lid %d\n", id, /* Scheduler::Clock */ my_clock(), top_lid);#endif LayerAgentInfo * self = new LayerAgentInfo (id,udp_recv_agent_addr); self->dist = 0.0; top_layer->AddClusterRoot(self); top_layer->me_in_layer = true; top_layer->cr_chkt.CancelTimer(); top_layer->cr_msgt.CancelTimer(); top_layer->cr_chkt.SetTimer(CONST_CLUSTER_REFRESH_CHECK_TIMEOUT); top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT);fflush(0); return;}void bseAgent::handle_join_query (AppPacket *p) { assert (p->st == JOIN_QUERY); // || (p->st == JOIN_FORWARD) ); /* Join forward not accepted right now. To do so, changes needed: See BSE-JF change in coop-agent.cc */ int qlid; AgentInfo src_ag; AgentInfo original_dst; double src_time; if (p->st == JOIN_QUERY) { qlid = p->u.joinq_p.q_lid; src_ag.agent_id = p->src_agent; memcpy(& src_ag.agent_addr, &p->src_agent_addr, sizeof(struct sockaddr_in)); /* if (src_ag.agent_id == UNDEFINED_AGENT_ID) { src_ag.agent_id = ++ coop_agents_sofar; } memcpy (& src_ag.agent_addr, &p->u.joinq_p.reply_addr, sizeof(struct sockaddr_in)); */ original_dst.agent_id = id; memcpy (& original_dst.agent_addr, & udp_recv_agent_addr, sizeof(struct sockaddr_in)); src_time = p->u.joinq_p.send_time; } else { qlid = p->u.joinforward_p.q_lid; src_ag.agent_id = p->u.joinforward_p.src_ag.agent_id; memcpy (& src_ag.agent_addr, & p->u.joinforward_p.src_ag.agent_addr, sizeof(struct sockaddr_in)); original_dst.agent_id = p->u.joinforward_p.original_dst.agent_id; memcpy (& original_dst.agent_addr , & p->u.joinforward_p.original_dst.agent_addr, sizeof (struct sockaddr_in)); src_time = p->u.joinforward_p.send_time; } #ifdef LOG_BSE_RECV printf ("[bse %d ] At %8.4f recvd-join-query : < [ %d ] > lid %d ( top lid %d )\n", id, /* Scheduler::Clock */ my_clock(), src_ag.agent_id, qlid, top_layer->lid);#endif if (top_layer->lid < qlid) { // Someone claims to need to join to a higher layer than I know of reset_layer_info(qlid); } bool do_attach = false; LayerAgentInfo * new_member = NULL; if (top_layer->lid == qlid) { do_attach = true; new_member = new LayerAgentInfo(src_ag.agent_id, src_ag.agent_addr); bool is_new_member = top_layer->AddClusterMember(new_member); if (is_new_member == false) { delete new_member; new_member = top_layer->FindClusterMember(src_ag.agent_id); new_member->refresh = true; } else { log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif } // new_member->dist = /* Scheduler::Clock */ my_clock() - src_time; new_member->dist = MAX_RTT; struct sockaddr_in their_addr; memcpy(& their_addr, & new_member->ag.agent_addr, sizeof(struct sockaddr_in)); their_addr.sin_port = htons(DIST_EST_PORT); new_member->dist = estimate_rtt_wrapper(their_addr); //if (is_new_member == false) // return; } /* Create response packet */ AppPacket *resp_p = new AppPacket(JOIN_RESPONSE); resp_p->u.joinresp_p.your_id = src_ag.agent_id; //send new id put_layer_cluster_info_into_packet(top_layer,resp_p); resp_p->u.joinresp_p.accept = true; resp_p->u.joinresp_p.exp_src.agent_id = original_dst.agent_id; memcpy ( & resp_p->u.joinresp_p.exp_src.agent_addr, & original_dst.agent_addr, sizeof(struct sockaddr_in)); send_pkt_wrapper(resp_p,src_ag.agent_id, src_ag.agent_addr); if (do_attach == true) {#ifdef LOG_BSE_SEND printf ("\n[bse %d ] At %8.4f : new-member < [ %d ] >\n", id, /* Scheduler::Clock */ my_clock(), new_member->ag.agent_id);#endif top_layer->cr_msgt.CancelTimer(); send_top_layer_cluster_refresh(new_member); top_layer->cr_msgt.SetTimer(CONST_CLUSTER_REFRESH_MSG_TIMEOUT); }fflush(0); return;}// Accept the merge only if this is the only member in the top layer// or if there is no membervoid bseAgent::handle_cluster_merge (AppPacket * ap) { if ( top_layer->lid != ap->u.clustermerge_p.layer_id ) return; if ( (top_layer->ag_list.GetSize() == 1) || ( (top_layer->ag_list.GetSize() == 2) && (top_layer->ag_list.Locate(ap->src_agent) != NULL) ) ) {#ifdef LOG_BSE_RECV printf ("[bse %d ] At %8.4f recvd-valid-merge : < [ %d ] > lid %d\n", id, /* Scheduler::Clock */ my_clock(), ap->src_agent, ap->u.clustermerge_p.layer_id);#endif reset_layer_info(ap->u.clustermerge_p.layer_id); top_layer->AddExtraMembersFromAltRootPacket(ap); log_cluster_change_info ();#ifdef LOG_BSE_CLUSTER_CHANGE display_top_layer_info();#endif } fflush(0); return;}void bseAgent::handle_cluster_refresh (AppPacket *ap) { if (top_layer->lid != ap->u.clusterrefresh_p.layer_id) return; void * pos = top_layer->ag_list.Locate(ap->src_agent); if (pos == NULL) return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -