📄 filter_core.cc
字号:
#ifdef USE_EMSIM char *sim_id = getenv("SIM_ID"); char *sim_group = getenv("SIM_GROUP"); int32_t group_id;#endif // USE_EMSIM long stop_time; struct timeval tv;#ifdef IO_LOG IOLog *pseudo_io_device; bool use_io_log = false;#endif // IO_LOG#ifdef STATS bool use_io_stats = true; int stats_warm_up_time = 0;#endif // STATS bool node_id_configured = false; opterr = 0; config_file_ = NULL; stop_time = 0; node_id_env = getenv("node_addr"); diffusion_port_ = DEFAULT_DIFFUSION_PORT;#ifndef NS_DIFFUSION // Parse command line options while (1){ opt = getopt(argc, argv, COMMAND_LINE_ARGS); switch(opt){ case 'p': diffusion_port_ = (u_int16_t) atoi(optarg); if ((diffusion_port_ < 1024) || (diffusion_port_ >= 65535)){ DiffPrint(DEBUG_ALWAYS, "Diffusion Error: Port must be between 1024 and 65535 !\n"); exit(-1); } break; case 't': stop_time = atol(optarg); if (stop_time <= 0){ DiffPrint(DEBUG_ALWAYS, "Diffusion Error: stop time must be > 0\n"); exit(-1); } else{ DiffPrint(DEBUG_ALWAYS, "%s will stop after %ld seconds\n", PROGRAM, stop_time); } break;#ifdef IO_LOG case 'l': use_io_log = true; break;#endif // IO_LOG#ifdef STATS case 's': use_io_stats = false; break; case 'i': stats_warm_up_time = atoi(optarg); if (stats_warm_up_time < 0){ DiffPrint(DEBUG_ALWAYS, "Diffusion Error: warm_up_time must be > 0\n"); exit(-1); } break;#endif // STATS case 'h': usage(argv[0]); break; case 'v': DiffPrint(DEBUG_ALWAYS, "\n%s %s\n", PROGRAM, RELEASE); exit(0); break; case 'd': debug_level = atoi(optarg); if (debug_level < 1 || debug_level > 10){ DiffPrint(DEBUG_ALWAYS, "Error: Debug level outside range or missing !\n"); usage(argv[0]); } global_debug_level = debug_level; break; case 'f': if (!strncasecmp(optarg, "-", 1)){ DiffPrint(DEBUG_ALWAYS, "Error: Parameter is missing !\n"); usage(argv[0]); } config_file_ = strdup(optarg); break; case '?': DiffPrint(DEBUG_ALWAYS, "Error: %c isn't a valid option or its parameter is missing !\n", optopt); usage(argv[0]); break; case ':': DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n"); usage(argv[0]); break; } if (opt == -1) break; } if (!config_file_) config_file_ = strdup(DEFAULT_CONFIG_FILE); // Get diffusion ID if (!node_id_configured){ // Try to get id from environment variable if (node_id_env != NULL){ my_id_ = atoi(node_id_env); node_id_configured = true; } }#ifdef USE_EMSIM if (!node_id_configured){ // Try to read groups and node id from emsim environment variables if (sim_id && sim_group){ my_id_ = atoi(sim_id); group_id = atoi(sim_group); diffusion_port_ = diffusion_port_ + my_id_ + (100 * group_id); node_id_configured = true; } }#endif // USE_EMSIM // Use random node id if user has not specified it if (!node_id_configured){ DiffPrint(DEBUG_ALWAYS, "Diffusion : node_addr not set. Using random id.\n"); // Generate random ID do{ GetTime(&tv); SetSeed(&tv); my_id_ = GetRand(); } while(my_id_ == LOCALHOST_ADDR || my_id_ == BROADCAST_ADDR); }#else my_id_ = nodeid;#endif // !NS_DIFFUSION // Initialize variables lon_ = 0.0; lat_ = 0.0;#ifdef STATS if (use_io_stats) stats_ = new DiffusionStats(my_id_, stats_warm_up_time); else stats_ = NULL;#endif // STATS GetTime(&tv); SetSeed(&tv); pkt_count_ = GetRand(); random_id_ = GetRand(); Tcl_InitHashTable(&htable_, 2); // Initialize EventQueue timers_manager_ = new TimerManager; // Create regular timers callback = new NeighborsTimeoutTimer(this); timers_manager_->addTimer(NEIGHBORS_DELAY, callback); callback = new FilterTimeoutTimer(this); timers_manager_->addTimer(FILTER_DELAY, callback); if (stop_time > 0){ callback = new DiffusionStopTimer(this); timers_manager_->addTimer((stop_time * 1000), callback); } GetTime(&tv); // Print Initialization message DiffPrint(DEBUG_ALWAYS, "Diffusion : starting at time %ld:%ld\n", tv.tv_sec, tv.tv_usec); DiffPrint(DEBUG_ALWAYS, "Diffusion : Node id = %d\n", my_id_); // Initialize diffusion io devices#ifdef IO_LOG if (use_io_log){ pseudo_io_device = new IOLog(my_id_); in_devices_.push_back(pseudo_io_device); out_devices_.push_back(pseudo_io_device); in_devices = &(pseudo_io_device->in_devices_); out_devices = &(pseudo_io_device->out_devices_); local_out_devices = &(local_out_devices_); } else{ in_devices = &(in_devices_); out_devices = &(out_devices_); local_out_devices = &(local_out_devices_); }#else in_devices = &(in_devices_); out_devices = &(out_devices_); local_out_devices = &(local_out_devices_);#endif // IO_LOG#ifdef NS_DIFFUSION device = new LocalApp(diffrtg); local_out_devices->push_back(device); device = new LinkLayerAbs(diffrtg); out_devices->push_back(device);#endif // NS_DIFFUSION#ifdef UDP device = new UDPLocal(&diffusion_port_); in_devices->push_back(device); local_out_devices->push_back(device);#ifdef WIRED device = new UDPWired(config_file_); out_devices->push_back(device);#endif // WIRED#endif // UDP#ifdef USE_RPC device = new RPCIO(); in_devices->push_back(device); out_devices->push_back(device);#endif // USE_RPC#ifdef USE_MOTE_NIC device = new MOTEIO(); in_devices->push_back(device); out_devices->push_back(device);#endif // USE_MOTE_NIC#ifdef USE_SMAC device = new SMAC(); in_devices->push_back(device); out_devices->push_back(device);#endif // USE_SMAC#ifdef USE_EMSTAR#ifdef USE_EMSIM device = new Emstar(my_id_, group_id, true);#else device = new Emstar();#endif // USE_EMSIM in_devices->push_back(device); out_devices->push_back(device);#endif // USE_EMSTAR#ifdef USE_WINSNG2 device = new WINSNG2(); in_devices->push_back(device); out_devices->push_back(device);#endif // USE_WINSNG2}HashEntry * DiffusionCoreAgent::getHash(unsigned int pkt_num, unsigned int rdm_id){ unsigned int key[2]; key[0] = pkt_num; key[1] = rdm_id; Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key); if (entryPtr == NULL) return NULL; return (HashEntry *)Tcl_GetHashValue(entryPtr);}void DiffusionCoreAgent::putHash(unsigned int pkt_num, unsigned int rdm_id){ Tcl_HashEntry *tcl_hash_entry; HashEntry *hash_entry; HashList::iterator hash_itr; unsigned int key[2]; int new_hash_key; if (hash_list_.size() == HASH_TABLE_MAX_SIZE){ // Hash table reached maximum size for (int i = 0; ((i < HASH_TABLE_REMOVE_AT_ONCE) && (hash_list_.size() > 0)); i++){ hash_itr = hash_list_.begin(); tcl_hash_entry = *hash_itr; hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry); delete hash_entry; hash_list_.erase(hash_itr); Tcl_DeleteHashEntry(tcl_hash_entry); } } key[0] = pkt_num; key[1] = rdm_id; tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *)key, &new_hash_key); if (new_hash_key == 0){ DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n"); return; } hash_entry = new HashEntry; Tcl_SetHashValue(tcl_hash_entry, hash_entry); hash_list_.push_back(tcl_hash_entry);}#ifndef NS_DIFFUSIONvoid DiffusionCoreAgent::recvPacket(DiffPacket pkt){ struct hdr_diff *dfh = HDR_DIFF(pkt); Message *rcv_message = NULL; int8_t version, msg_type; u_int16_t data_len, num_attr, source_port; int32_t rdm_id, pkt_num, next_hop, last_hop; // Read header version = DIFF_VER(dfh); msg_type = MSG_TYPE(dfh); source_port = ntohs(SRC_PORT(dfh)); pkt_num = ntohl(PKT_NUM(dfh)); rdm_id = ntohl(RDM_ID(dfh)); num_attr = ntohs(NUM_ATTR(dfh)); next_hop = ntohl(NEXT_HOP(dfh)); last_hop = ntohl(LAST_HOP(dfh)); data_len = ntohs(DATA_LEN(dfh)); // Packet is good, create a message rcv_message = new Message(version, msg_type, source_port, data_len, num_attr, pkt_num, rdm_id, next_hop, last_hop); // Read all attributes into the Message structure rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr); // Process the incoming message recvMessage(rcv_message); // Don't forget to message when we're done delete rcv_message; delete [] pkt;}#endif // !NS_DIFFUSIONvoid DiffusionCoreAgent::recvMessage(Message *msg){ BlackList::iterator black_list_itr; Tcl_HashEntry *tcl_hash_entry; unsigned int key[2]; // Check version if (msg->version_ != DIFFUSION_VERSION) return; // Check for ID conflict if (msg->last_hop_ == my_id_){ DiffPrint(DEBUG_ALWAYS, "Error: A diffusion ID conflict has been detected !\n"); exit(-1); } // Address filtering if ((msg->next_hop_ != BROADCAST_ADDR) && (msg->next_hop_ != LOCALHOST_ADDR) && (msg->next_hop_ != my_id_)) return; // Blacklist filtering black_list_itr = black_list_.begin(); while (black_list_itr != black_list_.end()){ if (*black_list_itr == msg->last_hop_){ DiffPrint(DEBUG_DETAILS, "Ignoring message from blacklisted node %d !\n", msg->last_hop_); return; } black_list_itr++; } // Control Messages are unique and don't go to the hash if (msg->msg_type_ != CONTROL){ // Hash table keeps info about packets key[0] = msg->pkt_num_; key[1] = msg->rdm_id_; tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key); if (tcl_hash_entry != NULL){ DiffPrint(DEBUG_DETAILS, "Node%d: Received old message !\n", my_id_); msg->new_message_ = 0; } else{ // Add message to the hash table putHash(key[0], key[1]); msg->new_message_ = 1; } }#ifdef STATS if (stats_) stats_->logIncomingMessage(msg);#endif // STATS // Check if it's a control of a regular message if (msg->msg_type_ == CONTROL) processControlMessage(msg); else processMessage(msg);}#ifndef USE_SINGLE_ADDRESS_SPACEint main(int argc, char **argv){ agent = new DiffusionCoreAgent(argc, argv); signal(SIGINT, signal_handler); agent->run(); return 0;}#endif // !USE_SINGLE_ADDRESS_SPACE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -