📄 one_phase_pull.hh
字号:
//// one_phase_pull.hh : One-Phase Pull Include File// author : Fabio Silva//// Copyright (C) 2000-2003 by the University of Southern California// $Id: one_phase_pull.hh,v 1.3 2004/01/08 23:05:53 haldar Exp $//// This program is free software; you can redistribute it and/or// modify it under the terms of the GNU General Public License,// version 2, as published by the Free Software Foundation.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.//// You should have received a copy of the GNU General Public License along// with this program; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.////#ifndef _ONE_PHASE_PULL_HH_#define _ONE_PHASE_PULL_HH_#ifdef HAVE_CONFIG_H#include "config.h"#endif // HAVE_CONFIG_H#include <algorithm>#include "diffapp.hh"#ifdef NS_DIFFUSION#include <tcl.h>#include "diffagent.h"#else#include "main/hashutils.hh"#endif // NS_DIFFUSION#define ONE_PHASE_PULL_FILTER_PRIORITY 80class OPPGradientEntry {public: OPPGradientEntry(int32_t node_id) : node_id_(node_id) { GetTime(&tv_); }; int32_t node_id_; struct timeval tv_;};typedef list<OPPGradientEntry *> GradientList;class SinkEntry {public: SinkEntry(u_int16_t port) : port_(port) { GetTime(&tv_); }; u_int16_t port_; struct timeval tv_;};typedef list<SinkEntry *> SinkList;class OPPDataNeighborEntry {public: OPPDataNeighborEntry(int32_t node_id) : node_id_(node_id) { messages_ = 1; }; int32_t node_id_; int messages_; bool new_messages_;};typedef list<OPPDataNeighborEntry *> DataNeighborList;class SubscriptionEntry {public: SubscriptionEntry(NRAttrVec *attrs) : attrs_(attrs) { GetTime(&tv_); }; ~SubscriptionEntry() { ClearAttrs(attrs_); delete attrs_; }; struct timeval tv_; NRAttrVec *attrs_;};typedef list<SubscriptionEntry *> SubscriptionList;typedef list<int> FlowIdList;class RoundIdEntry {public: RoundIdEntry(int32_t round_id) : round_id_(round_id) { GetTime(&tv_); }; ~RoundIdEntry() { GradientList::iterator gradient_itr; SinkList::iterator sink_itr; // Clear the gradient list for (gradient_itr = gradients_.begin(); gradient_itr != gradients_.end(); gradient_itr++){ delete (*gradient_itr); } gradients_.clear(); // Clear the local sink list for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){ delete (*sink_itr); } sinks_.clear(); }; OPPGradientEntry * findGradient(int32_t node_id); void deleteGradient(int32_t node_id); void addGradient(int32_t node_id); void updateSink(u_int16_t sink_id); void deleteExpiredSinks(); void deleteExpiredGradients(); int32_t round_id_; struct timeval tv_; GradientList gradients_; SinkList sinks_;};typedef list<RoundIdEntry *> RoundIdList;class RoutingEntry {public: RoutingEntry() { GetTime(&tv_); }; ~RoutingEntry() { DataNeighborList::iterator data_neighbor_itr; RoundIdList::iterator round_id_itr; SubscriptionList::iterator subscription_itr; // Clear Attributes ClearAttrs(attrs_); delete attrs_; // Clear the attribute list for (subscription_itr = subscription_list_.begin(); subscription_itr != subscription_list_.end(); subscription_itr++){ delete (*subscription_itr); } subscription_list_.clear(); // Clear the round_ids list for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){ delete (*round_id_itr); } round_ids_.clear(); // Clear the data neighbor's list for (data_neighbor_itr = data_neighbors_.begin(); data_neighbor_itr != data_neighbors_.end(); data_neighbor_itr++){ delete (*data_neighbor_itr); } data_neighbors_.clear(); }; RoundIdEntry * findRoundIdEntry(int32_t round_id); RoundIdEntry * addRoundIdEntry(int32_t round_id); void updateNeighborDataInfo(int32_t node_id, bool new_message); void addGradient(int32_t last_hop, int32_t round_id, bool new_gradient); void updateSink(u_int16_t sink_id, int32_t round_id); void deleteExpiredRoundIds(); void getSinksFromList(FlowIdList *msg_list, FlowIdList *sink_list); void getFlowsFromList(FlowIdList *msg_list, FlowIdList *flow_list); int32_t getNeighborFromFlow(int32_t flow_id); struct timeval tv_; NRAttrVec *attrs_; RoundIdList round_ids_; SubscriptionList subscription_list_; DataNeighborList data_neighbors_;};typedef list<RoutingEntry *> RoutingTable;class OnePhasePullFilter;class OnePhasePullFilterReceive : public FilterCallback {public: OnePhasePullFilterReceive(OnePhasePullFilter *filter) : filter_(filter) {}; void recv(Message *msg, handle h); OnePhasePullFilter *filter_;};class DataForwardingHistory {public: DataForwardingHistory() { data_reinforced_ = false; }; ~DataForwardingHistory() { node_list_.clear(); sink_list_.clear(); }; bool alreadyForwardedToNetwork(int32_t node_id) { list<int32_t>::iterator list_itr; list_itr = find(node_list_.begin(), node_list_.end(), node_id); if (list_itr == node_list_.end()) return false; return true; }; bool alreadyForwardedToLibrary(u_int16_t sink_id) { list<u_int16_t>::iterator list_itr; list_itr = find(sink_list_.begin(), sink_list_.end(), sink_id); if (list_itr == sink_list_.end()) return false; return true; }; bool alreadyReinforced() { return data_reinforced_; }; void sendingReinforcement() { data_reinforced_ = true; }; void forwardingToNetwork(int32_t node_id) { node_list_.push_back(node_id); }; void forwardingToLibrary(u_int16_t sink_id) { sink_list_.push_back(sink_id); };private: list<int32_t> node_list_; list<u_int16_t> sink_list_; bool data_reinforced_;};class OnePhasePullFilter : public DiffApp {public:#ifdef NS_DIFFUSION OnePhasePullFilter(const char *dr); int command(int argc, const char*const* argv); void run() {}#else OnePhasePullFilter(int argc, char **argv); void run();#endif // NS_DIFFUSION virtual ~OnePhasePullFilter() { // Nothing to do }; void recv(Message *msg, handle h); // Timers void messageTimeout(Message *msg); void interestTimeout(Message *msg); void gradientTimeout(); void reinforcementTimeout(); int subscriptionTimeout(NRAttrVec *attrs);protected: // General Variables handle filter_handle_; int pkt_count_; int random_id_; // Receive Callback for the filter OnePhasePullFilterReceive *filter_callback_; // List of all known datatypes RoutingTable routing_list_; // Setup the filter handle setupFilter(); // Matching functions RoutingEntry * findRoutingEntry(NRAttrVec *attrs); void deleteRoutingEntry(RoutingEntry *routing_entry); RoutingEntry * matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place); SubscriptionEntry * findMatchingSubscription(RoutingEntry *routing_entry, NRAttrVec *attrs); // Message forwarding functions void sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry); void sendDisinterest(NRAttrVec *attrs, RoutingEntry *routing_entry); void forwardData(Message *msg, RoutingEntry *routing_entry, DataForwardingHistory *forwarding_history); // Message Processing functions void processOldMessage(Message *msg); void processNewMessage(Message *msg); // Flow Ids Processing functions void addLocalFlowsToMessage(Message *msg); void readFlowsFromList(int number_of_flows, FlowIdList *flow_list, void *source_blob); int * writeFlowsToList(FlowIdList *flow_list); bool removeFlowFromList(FlowIdList *flow_list, int32_t flow);};class OppGradientExpirationCheckTimer : public TimerCallback {public: OppGradientExpirationCheckTimer(OnePhasePullFilter *agent) : agent_(agent) {}; ~OppGradientExpirationCheckTimer() {}; int expire(); OnePhasePullFilter *agent_;};class OppReinforcementCheckTimer : public TimerCallback {public: OppReinforcementCheckTimer(OnePhasePullFilter *agent) : agent_(agent) {}; ~OppReinforcementCheckTimer() {}; int expire(); OnePhasePullFilter *agent_;};class OppMessageSendTimer : public TimerCallback {public: OppMessageSendTimer(OnePhasePullFilter *agent, Message *msg) : agent_(agent), msg_(msg) {}; ~OppMessageSendTimer() { delete msg_; }; int expire(); OnePhasePullFilter *agent_; Message *msg_;};class OppInterestForwardTimer : public TimerCallback {public: OppInterestForwardTimer(OnePhasePullFilter *agent, Message *msg) : agent_(agent), msg_(msg) {}; ~OppInterestForwardTimer() { delete msg_; }; int expire(); OnePhasePullFilter *agent_; Message *msg_;};class OppSubscriptionExpirationTimer : public TimerCallback {public: OppSubscriptionExpirationTimer(OnePhasePullFilter *agent, NRAttrVec *attrs) : agent_(agent), attrs_(attrs) {}; ~OppSubscriptionExpirationTimer() { ClearAttrs(attrs_); delete attrs_; }; int expire(); OnePhasePullFilter *agent_; NRAttrVec *attrs_;};#endif // !_ONE_PHASE_PULL_HH_
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -