📄 diffusion.cc
字号:
// // diffusion.cc : Main Diffusion program// authors : Chalermek Intanagonwiwat and Fabio Silva//// Copyright (C) 2000-2002 by the University of Southern California// $Id: diffusion.cc,v 1.6 2002/09/16 17:57:27 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#include "diffusion.hh"#ifndef NS_DIFFUSIONDiffusionCoreAgent *agent;#endif // !NS_DIFFUSIONclass HashEntry {public: bool dummy; HashEntry() { dummy = false; }};class NeighborEntry {public: int32_t id; struct timeval tmv; NeighborEntry(int _id) : id(_id) { GetTime(&tmv); }};int NeighborsTimeoutTimer::expire(){ agent_->neighborsTimeout(); return 0;}int FilterTimeoutTimer::expire(){ agent_->filterTimeout(); return 0;}int DiffusionStopTimer::expire(){ agent_->timeToStop();#ifndef NS_DIFFUSION exit(0);#endif //NS_DIFFUSION // Never gets here ! return 0;}void DiffusionCoreAgent::timeToStop(){ FILE *outfile = NULL; if (global_debug_level > DEBUG_SOME_DETAILS){#ifdef NS_DIFFUSION outfile = fopen("/tmp/diffusion.out", "a");#else outfile = fopen("/tmp/diffusion.out", "w");#endif // NS_DIFFUSION if (outfile == NULL){ DiffPrint(DEBUG_ALWAYS ,"Diffusion Error: Can't create /tmp/diffusion.out\n"); return; } }#ifdef STATS stats_->printStats(stdout); if (outfile) stats_->printStats(outfile);# ifndef WIRED# ifdef USE_RPC rpcstats_->printStats(stdout); if (outfile) rpcstats_->printStats(outfile);# endif // USE_RPC# endif // WIRED#endif // STATS if (outfile) fclose(outfile);}#ifndef NS_DIFFUSIONvoid signal_handler(int p){ agent->timeToStop(); exit(0);}void DiffusionCoreAgent::usage(){ DiffPrint(DEBUG_ALWAYS, "Usage: diffusion [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]\n\n"); DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n"); DiffPrint(DEBUG_ALWAYS, "\t-t - Schedule diffusion to exit after stoptime seconds\n"); DiffPrint(DEBUG_ALWAYS, "\t-f - Uses filename as the config file\n"); DiffPrint(DEBUG_ALWAYS, "\t-v - Prints diffusion version\n"); DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n"); DiffPrint(DEBUG_ALWAYS, "\t-p - Sets diffusion port to port\n"); DiffPrint(DEBUG_ALWAYS, "\n"); exit(0);}void DiffusionCoreAgent::run(){ DeviceList::iterator device_itr; DiffPacket in_pkt; fd_set fds; // DiffusionEvent *e; bool flag; int status, max_sock, fd; struct timeval tv; // Main Select Loop while (1){ // Wait for incoming packets FD_ZERO(&fds); max_sock = 0; // Figure out how much time to wait timers_manager_->nextTimerTime(&tv); if (tv.tv_sec == 0 && tv.tv_usec == 0){ // Timer has expired ! timers_manager_->executeAllExpiredTimers(); continue; } for (device_itr = in_devices_.begin(); device_itr != in_devices_.end(); ++device_itr){ (*device_itr)->addInFDS(&fds, &max_sock); } status = select(max_sock+1, &fds, NULL, NULL, &tv); if (status == 0){ // We process all expired timers timers_manager_->executeAllExpiredTimers(); } // Check for new packets if (status > 0){ do{ flag = false; for (device_itr = in_devices_.begin(); device_itr != in_devices_.end(); ++device_itr){ fd = (*device_itr)->checkInFDS(&fds); if (fd != 0){ // Message waiting in_pkt = (*device_itr)->recvPacket(fd); if (in_pkt) recvPacket(in_pkt); // Clear this fd FD_CLR(fd, &fds); status--; flag = true; } } } while ((status > 0) && (flag == true)); } // This should not happen if (status < 0){ DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status); } }}#endif // !NS_DIFFUSIONvoid DiffusionCoreAgent::neighborsTimeout(){ struct timeval tmv; NeighborEntry *neighbor_entry; NeighborList::iterator neighbor_itr; DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !\n"); GetTime(&tmv); neighbor_itr = neighbor_list_.begin(); while(neighbor_itr != neighbor_list_.end()){ neighbor_entry = *neighbor_itr; if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){ // This neighbor expired neighbor_itr = neighbor_list_.erase(neighbor_itr); delete neighbor_entry; } else{ neighbor_itr++; } }}void DiffusionCoreAgent::filterTimeout(){ struct timeval tmv; FilterEntry *filter_entry; FilterList::iterator filter_itr; DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !\n"); GetTime(&tmv); filter_itr = filter_list_.begin(); while(filter_itr != filter_list_.end()){ filter_entry = *filter_itr; if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){ // This filter expired DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !\n", filter_entry->agent_, filter_entry->handle_, filter_entry->priority_); filter_itr = filter_list_.erase(filter_itr); delete filter_entry; } else{ filter_itr++; } }}void DiffusionCoreAgent::sendMessage(Message *msg){ Tcl_HashEntry *tcl_hash_entry; unsigned int key[2]; Message *send_message; send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_, 0, 0, msg->pkt_num_, msg->rdm_id_, msg->next_hop_, 0); send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_); send_message->num_attr_ = send_message->msg_attr_vec_->size(); send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_); // Adjust message size for logging and check hash key[0] = msg->pkt_num_; key[1] = msg->rdm_id_; tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key); if (tcl_hash_entry) msg->new_message_ = 0; else msg->new_message_ = 1; send_message->new_message_ = msg->new_message_; // Check if message goes to an agent or the network if (msg->next_port_){ // Message goes to an agent send_message->last_hop_ = LOCALHOST_ADDR; // If it's a local message, it has to go to a local agent if (send_message->next_hop_ != LOCALHOST_ADDR){ DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !\n"); delete send_message; return; } // Send the message to the agent specified sendMessageToLibrary(send_message, msg->next_port_); } else{ // Message goes to the network send_message->last_hop_ = my_id_;#ifdef STATS stats_->logOutgoingMessage(send_message);#endif // STATS // Add message to the hash table if (tcl_hash_entry == NULL) putHash(key[0], key[1]); else DiffPrint(DEBUG_DETAILS, "Message being sent is an old message !\n"); // Send Message sendMessageToNetwork(send_message); } delete send_message;}void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry){ RedirectMessage *original_hdr; NRAttribute *original_header_attr; Message *send_message; // Create an attribute with the original header original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_, msg->source_port_, msg->data_len_, msg->num_attr_, msg->rdm_id_, msg->pkt_num_, msg->next_hop_, msg->last_hop_, filter_entry->handle_, msg->next_port_); original_header_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr, sizeof(RedirectMessage)); send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_); // Increment pkt_counter pkt_count_++; // Duplicate the message's attributes send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_); // Add the extra attribute send_message->msg_attr_vec_->push_back(original_header_attr); send_message->num_attr_ = send_message->msg_attr_vec_->size(); send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_); sendMessageToLibrary(send_message, filter_entry->agent_); delete send_message; delete original_hdr;}#ifndef NS_DIFFUSIONvoid DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id){ DiffPacket out_pkt = NULL; struct hdr_diff *dfh; int len; char *pos; out_pkt = AllocateBuffer(msg->msg_attr_vec_); dfh = HDR_DIFF(out_pkt); pos = (char *) out_pkt; pos = pos + sizeof(struct hdr_diff); len = PackAttrs(msg->msg_attr_vec_, pos); LAST_HOP(dfh) = htonl(msg->last_hop_); NEXT_HOP(dfh) = htonl(msg->next_hop_); DIFF_VER(dfh) = msg->version_; MSG_TYPE(dfh) = msg->msg_type_; DATA_LEN(dfh) = htons(len); PKT_NUM(dfh) = htonl(msg->pkt_num_); RDM_ID(dfh) = htonl(msg->rdm_id_); NUM_ATTR(dfh) = htons(msg->num_attr_); SRC_PORT(dfh) = htons(msg->source_port_); sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id); delete [] out_pkt;}#elsevoid DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id){ Message *send_message; DeviceList::iterator device_itr; int len; send_message = CopyMessage(msg); len = CalculateSize(send_message->msg_attr_vec_); len = len + sizeof(struct hdr_diff); for (device_itr = local_out_devices_.begin(); device_itr != local_out_devices_.end(); ++device_itr){ (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id); }}#endif // !NS_DIFFUSION#ifndef NS_DIFFUSIONvoid DiffusionCoreAgent::sendMessageToNetwork(Message *msg){ DiffPacket out_pkt = NULL; struct hdr_diff *dfh; int len; char *pos; out_pkt = AllocateBuffer(msg->msg_attr_vec_); dfh = HDR_DIFF(out_pkt); pos = (char *) out_pkt; pos = pos + sizeof(struct hdr_diff); len = PackAttrs(msg->msg_attr_vec_, pos); LAST_HOP(dfh) = htonl(msg->last_hop_); NEXT_HOP(dfh) = htonl(msg->next_hop_); DIFF_VER(dfh) = msg->version_; MSG_TYPE(dfh) = msg->msg_type_; DATA_LEN(dfh) = htons(len); PKT_NUM(dfh) = htonl(msg->pkt_num_); RDM_ID(dfh) = htonl(msg->rdm_id_); NUM_ATTR(dfh) = htons(msg->num_attr_); SRC_PORT(dfh) = htons(msg->source_port_); sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_); delete [] out_pkt;}#elsevoid DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -