📄 broker.java
字号:
/* * Created on May 4, 2005 * * To change the template for this generated file go to * Window>Preferences>Java>Code Generation>Code and Comments */package broker;import java.util.*;import com.Begin;import com.Broadcast;import com.End;import com.Lock;import com.Message;import com.Notification;import com.Request;import com.Separator;import com.Subscription;import com.Unlock;import com.Unsubscription;import graph.Vertex;import sim.*;/** * @author parzy * * To change the template for this generated type comment go to * Window>Preferences>Java>Code Generation>Code and Comments */public class Broker extends Vertex implements Destination{ /** * Color of a normal broker. */ protected static final int BLACK = 0; /** * Color of a broker when the new routing entires are established during * a reconfiguration pahse. */ protected static final int GRAY = 1; /** * Direction left. */ protected static final int LEFT = 0; /** * Direction right. */ protected static final int RIGHT = 1; /** * Constant associated with no message order. */ protected static final int UNORDERED = 0; /** * Constant associated with a FIFO message order. */ protected static final int FIFO = 1; /** * Constant associated with a causal message order. */ protected static final int CAUSAL = 2; public static final int COSTS_AND_INTERESTS = 0; public static final int COSTS = 1; public static final int INTERESTS = 2; /** * The update interval of an local environment. * The interval after an environment entry gets stale and is removed. */ public static double updateInterval = 400; /** * The interval between two consecutive broadcasts of a broker. */ public static double broadcastInterval = 250; /** * The requested message order. */ protected static int order = UNORDERED; /** * The capacity of a broker's cache. */ protected static int cacheSize = 8192; /** * The capacity of a broadcast's Bloom filter. */ protected static int filterSize = 100000; /** * The number of hash functions a Bloom filter uses. */ protected static int numberOfHashs = 5; protected static int environmentSize = 5; public static int heuristic = COSTS_AND_INTERESTS; protected static long brokerSeed = 5902120016761263358L; protected static Random rand = new Random(brokerSeed); /** * Initializes all fields of the broker class. * The values are stored as Strings in the properties object. * The keys are the names of the fields. * @param properties properties containing name/value pairs. */ public static void initialize(Properties properties){ // set the update interval updateInterval = properties.getProperty("updateInterval") != null ? Double.parseDouble(properties.getProperty("updateInterval")) : updateInterval; // set the broadcast interval broadcastInterval = properties.getProperty("broadcastInterval") != null ? Double.parseDouble(properties.getProperty("broadcastInterval")) : broadcastInterval; // parse the requested message order if(properties.getProperty("order") != null){ if(properties.getProperty("order").equalsIgnoreCase("CAUSAL")){ order = CAUSAL; }else if(properties.getProperty("order").equalsIgnoreCase("FIFO")) { order = FIFO; } } // set the broker's cache size cacheSize = properties.getProperty("cacheSize") != null ? Integer.parseInt(properties.getProperty("cacheSize")) : cacheSize; // set the size of a Bloom filter filterSize = properties.getProperty("filterSize") != null ? Integer.parseInt(properties.getProperty("filterSize")) : filterSize; // set the number of used hash functions numberOfHashs = properties.getProperty("numberOfHashs") != null ? Integer.parseInt(properties.getProperty("numberOfHashs")) : numberOfHashs; // set the size of the local environment environmentSize = properties.getProperty("environmentSize") != null ? Integer.parseInt(properties.getProperty("environmentSize")) : environmentSize; // parse the requested heuristic if(properties.getProperty("heuristic") != null){ if(properties.getProperty("heuristic").equalsIgnoreCase("COSTS")){ heuristic = COSTS; }else if(properties.getProperty("heuristic").equalsIgnoreCase("INTERESTS")) { heuristic = INTERESTS; } } // initialize other classes BloomFilter.initialize(properties); } // TODO: instanziierung implementieren private Simulation sim; private Network net; // parts of a broker private RoutingTable routingTable= new RoutingTable(this); private Cache cache = new Cache(cacheSize); private LocalEnvironment environment; // the broker's queues private MessageQueue qIn = new MessageQueue(); private MessageQueue qSub = new MessageQueue(); private MessageQueue qNot = new MessageQueue(); private LinkedList neighbors = new LinkedList(); private Event broadcastEvent = null; // reconfiguration stuff private boolean locked = false; private boolean relaying = false; private int side = LEFT; private int color = BLACK; private Broker bLeft = null; private Broker bRight = null; private Broker bNew = null; private Broker bOld = null; private Broker bRelay = null; private LinkedList bGray = new LinkedList(); private LinkedList bQueued = new LinkedList(); public Broker(Simulation sim, Network net) { this.sim = sim; this.net = net; this.environment = new LocalEnvironment(sim, net, this); } /** * Returns true. */ public boolean isBroker(){ return true; } /** * Returns false. */ public boolean isClient(){ return false; } /** * Returns the neighbouring brokers. * @return the neighbouring brokers. */ public Collection getNeighbors(){ return neighbors; } public void setNeighbors(Collection neighbors) { this.neighbors.clear(); this.neighbors.addAll(neighbors); } /** * Receives a physical message. * @param sender the sender of the message. * @param message the message itself. */ public void receive(Destination sender, Message message){ // Add a new message to the broker's internal queue qIn.enqueue(sender,this,message); // TODO remove if(sender == null){ throw new NullPointerException(); } // When it is the first message in the queue, schedule its processing. if(qIn.size()==1) { sim.scheduleEventIn(net.getProcessingDelay(this), new Event(){ public void handle(){ Broker.this.processMessage(); } }); } } /** * Processes the next queued message. */ private void processMessage() { Message m; //System.out.println(queueIn.size()); // check if a message is available if(qIn.isEmpty()) { return; } // process the first message m = qIn.dequeue(); // TODO: insert statistics here sim.collectStatisticsProcessing(this,m); receiveMessage(qIn.getSender(),m); // schedule the processing of the next message, if one ios available if(qIn.isEmpty()) { return; } sim.scheduleEventIn(net.getProcessingDelay(this), new Event(){ public void handle(){ Broker.this.processMessage(); } }); } /** * Receives a message, when it is dequeued. * @param sender the sender of the message. * @param m the message itself. */ private void receiveMessage(Destination sender, Message message) { sender.isBroker(); // queueing of gray notifications and (un)subscriptions if(sender == bNew && !bQueued.isEmpty() && message instanceof Notification){ qNot.enqueue(sender, this, message); sim.collectStatisticsEnqueue(this,message); return; } if (sender == bNew && bRelay != null && (message instanceof Subscription || message instanceof Unsubscription) ){ qSub.enqueue(sender, this, message); return; } // relaying of black (un)subscriptions if (sender == bOld && relaying && (message instanceof Subscription || message instanceof Unsubscription) ){ message.setRelayed(); message.colorGray(); sendMessage(bRelay, message); return; } if(message.isRelayed()){ // TODO remove if(bNew == null){ System.out.println("I am "+this+" Message is "+message); sender.isBroker(); } message.unsetRelayed(); sender = bNew; } // coloring messages if(color == GRAY && sender != bLeft && sender != bRight){ message.colorGray(); } handleMessage(sender, message); } private void handleMessage(Destination sender, Message message) { DestinationSet destinations; DestinationSet.Entry destination; boolean containsClient = false; boolean containsNeighbor = false; // Is the sender a client or a neighbor? if(sender.isClient()) { containsClient = true; } else { containsNeighbor = true; } // message dispatching if(message instanceof Unsubscription){ destinations = handleUnsubscription(sender,(Unsubscription)message); } else if (message instanceof Subscription){ destinations = handleSubscription(sender,(Subscription)message); } else if (message instanceof Notification){ destinations = handleNotification(sender, (Notification)message); } else if (message instanceof Broadcast){ destinations = handleBroadcast(sender, (Broadcast)message); } else if (message instanceof Request) { destinations = handleRequest(sender, (Request)message); } else if (message instanceof Lock) { destinations = handleLock(sender, (Lock)message); } else if (message instanceof Begin) { destinations = handleBegin(sender, (Begin)message); } else if (message instanceof Separator) { destinations = handleSeparator(sender, (Separator)message); } else if (message instanceof End) { destinations = handleEnd(sender, (End)message); } else if (message instanceof Unlock) { destinations = handleUnlock(sender, (Unlock)message); } else{ destinations = new DestinationSet(); } // message forwarding for(Iterator it = destinations.iterator(); it.hasNext(); ){ destination = (DestinationSet.Entry)it.next(); sendMessage(destination.destination, destination.message); // Does the set contains any clients or neighbors? if(destination.destination.isClient()){ containsClient = true; }else{ containsNeighbor = true; } } // caching of a notification, iff a client and a neighbor is contained if(message instanceof Notification && containsClient && containsNeighbor){// if(cache.isFull()){// broadcast();// } cache.add((Notification)message); sim.collectStatisticsCorrectProcessing(); } if(message instanceof Notification && !containsClient) { sim.collectStatisticsPureForwarding(); } } private void sendMessage(Destination receiver, Message message){ // forwarding colored messages over the new and old edge if ( !(message instanceof Lock || message instanceof Unlock || message instanceof Begin || message instanceof End) ) { if( (receiver == bNew && message.isBlack()) || (receiver == bOld && message.isGray()) ){ return; } } // uncolor messages when leaving the circuit if (receiver != this && receiver != bLeft && receiver != bRight && receiver != bNew && receiver != bOld && receiver != bRelay) { message.colorBlack(); } // sending messages to myself if(receiver == this){ receiveMessage(this, message); }else{ net.send(this,receiver,message); } } // handle subscriptions, unsubscriptions and notifications private DestinationSet handleSubscription(Destination from, Subscription s){ return routingTable.subscribe(from, s); } private DestinationSet handleUnsubscription(Destination from, Unsubscription u){ return routingTable.unsubscribe(from, u); } private DestinationSet handleNotification(Destination from, Notification n){ return routingTable.forward(from, n); } /** * Starts all broker's functions, inclusive the broadcasting. */ public void start(){ startBroadcasting(); } /** * Starts the broker's broadcasting to announce his presence and his * interests. */ public void startBroadcasting(){ if(broadcastEvent == null){ broadcastEvent = new Event(){ public void handle(){ Broker.this.broadcast(); } }; sim.scheduleEventIn(broadcastInterval*rand.nextDouble(), broadcastEvent); } throw new UnsupportedOperationException(); } /** * Starts the broker's broadcasting to announce his presence and his * interests. */ public void startHeuristic(){ if(broadcastEvent == null){ broadcastEvent = new Event(){ public void handle(){ Broker.this.broadcast(); } }; sim.scheduleEventIn(broadcastInterval*rand.nextDouble(), broadcastEvent); } } public void stopHeuristic() { if (broadcastEvent != null) { sim.removeEvent(broadcastEvent); broadcastEvent = null; } } // handle a broadcast private void broadcast(){ // create a new Broadcast Message // if(net.getNumber(this)==68) {// System.out.println("Cache belegt ist: "+cache.getFillState());// } handleMessage(this, new Broadcast(cache.getBloomFilter(filterSize, numberOfHashs))); // schedule a new Broadcast broadcastEvent = new Event(){ public void handle(){ Broker.this.broadcast(); } }; sim.scheduleEventIn(broadcastInterval, broadcastEvent); } private DestinationSet handleBroadcast(Destination from, Broadcast broadcast){ DestinationSet destinations; Broker[] path; Broker source; //double time; BloomFilter bloomFilter; double interest; //Request request; // TODO remove //System.out.println("Broadcast received at"+this); // prepare the path path = extendPath(broadcast.getPath()); source = path[0]; bloomFilter = broadcast.getBloomFilter(); if(source != this){ // determine interests interest = cache.countHits(bloomFilter); // if(path[path.length-2]==source){// //System.out.println("Bloom Filter Hits: "+interest+" correct: "+" Cache Compare: "+cache.countHits(source.cache)+" size: "+bloomFilter.size());// interest = interest == 0 ? 1:interest;// } // update the environment environment.setInterest(source, interest); // env.refresh(path, time); // not needed anymore?// env.update(time); // // evaluate the path try{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -