⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 broker.java

📁 发布/订阅系统路由重配算法,可应用于ad hoc环境
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Created on May 4, 2005 * * To change the template for this generated file go to * Window>Preferences>Java>Code Generation>Code and Comments */package broker;import java.util.*;import com.Begin;import com.Broadcast;import com.End;import com.Lock;import com.Message;import com.Notification;import com.Request;import com.Separator;import com.Subscription;import com.Unlock;import com.Unsubscription;import graph.Vertex;import sim.*;/** * @author parzy * * To change the template for this generated type comment go to * Window>Preferences>Java>Code Generation>Code and Comments */public class Broker extends Vertex implements Destination{		/**	 * Color of a normal broker.	 */	protected static final int BLACK = 0;		/**	 * Color of a broker when the new routing entires are established during	 * a reconfiguration pahse.	 */	protected static final int GRAY = 1;		/**	 * Direction left.	 */	protected static final int LEFT = 0;		/**	 * Direction right.	 */	protected static final int RIGHT = 1;		/**	 * Constant associated with no message order.	 */	protected static final int UNORDERED = 0;		/**	 * Constant associated with a FIFO message order.	 */	protected static final int FIFO = 1;		/**	 * Constant associated with a causal message order.	 */	protected static final int CAUSAL = 2;		public static final int COSTS_AND_INTERESTS = 0;		public static final int COSTS = 1;		public static final int INTERESTS = 2;	/**	 * The update interval of an local environment.	 * The interval after an environment entry gets stale and is removed.	 */	public static double updateInterval = 400;		/**	 * The interval between two consecutive broadcasts of a broker.	 */	public static double broadcastInterval = 250;		/**	 * The requested message order.	 */	protected static int order = UNORDERED;		/**	 * The capacity of a broker's cache.	 */	protected static int cacheSize = 8192;		/**	 * The capacity of a broadcast's Bloom filter.	 */	protected static int filterSize = 100000;		/**	 * The number of hash functions a Bloom filter uses. 	 */	protected static int numberOfHashs = 5;		protected static int environmentSize = 5;	public static int heuristic = COSTS_AND_INTERESTS;		protected static long brokerSeed = 5902120016761263358L;		protected static Random rand = new Random(brokerSeed);		/**	 * Initializes all fields of the broker class.	 * The values are stored as Strings in the properties object.	 * The keys are the names of the fields. 	 * @param properties properties containing name/value pairs.	 */	public static void initialize(Properties properties){		// set the update interval		updateInterval = properties.getProperty("updateInterval") != null ?				         Double.parseDouble(properties.getProperty("updateInterval")) :				         updateInterval;	    // set the broadcast interval        broadcastInterval = properties.getProperty("broadcastInterval") != null ?        		            Double.parseDouble(properties.getProperty("broadcastInterval")) :        					broadcastInterval;        // parse the requested message order        if(properties.getProperty("order") != null){        	if(properties.getProperty("order").equalsIgnoreCase("CAUSAL")){        		order = CAUSAL;        	}else if(properties.getProperty("order").equalsIgnoreCase("FIFO")) {        		order = FIFO;        	}        }        // set the broker's cache size		cacheSize = properties.getProperty("cacheSize") != null ?			        Integer.parseInt(properties.getProperty("cacheSize")) :			        cacheSize;		// set the size of a Bloom filter	    filterSize = properties.getProperty("filterSize") != null ?	    		     Integer.parseInt(properties.getProperty("filterSize")) :	    		     filterSize;	    // set the number of used hash functions	    numberOfHashs = properties.getProperty("numberOfHashs") != null ?	    		        Integer.parseInt(properties.getProperty("numberOfHashs")) :	    		        numberOfHashs;        // set the size of the local environment	    environmentSize = properties.getProperty("environmentSize") != null ?	    			      Integer.parseInt(properties.getProperty("environmentSize")) :	    			      environmentSize;	    // parse the requested heuristic	    if(properties.getProperty("heuristic") != null){	    	if(properties.getProperty("heuristic").equalsIgnoreCase("COSTS")){	    		heuristic = COSTS;	    	}else if(properties.getProperty("heuristic").equalsIgnoreCase("INTERESTS")) {	    		heuristic = INTERESTS;	    	}	    }	    		        	    		        	    // initialize other classes	    BloomFilter.initialize(properties);	}		// TODO: instanziierung implementieren	private Simulation sim;	private Network net;		// parts of a broker	private RoutingTable routingTable= new RoutingTable(this);	private Cache cache = new Cache(cacheSize);	private LocalEnvironment environment;	// the broker's queues	private MessageQueue qIn = new MessageQueue();	private MessageQueue qSub = new MessageQueue();	private MessageQueue qNot = new MessageQueue();		private LinkedList neighbors = new LinkedList();		private Event broadcastEvent = null;			// reconfiguration stuff	private boolean locked = false;	private boolean relaying = false;	private int side = LEFT;	private int color = BLACK;		private Broker bLeft = null;	private Broker bRight = null;	private Broker bNew = null;	private Broker bOld = null;	private Broker bRelay = null;		private LinkedList bGray = new LinkedList();	private LinkedList bQueued = new LinkedList();				public Broker(Simulation sim, Network net) {		this.sim = sim;		this.net = net;		this.environment = new LocalEnvironment(sim, net, this);	}	    /**     * Returns true.     */	public boolean isBroker(){		return true;	}		/**	 * Returns false.	 */	public boolean isClient(){		return false;	}			/**	 * Returns the neighbouring brokers.	 * @return the neighbouring brokers.	 */	public Collection getNeighbors(){		return neighbors;	}	public void setNeighbors(Collection neighbors) {		this.neighbors.clear();		this.neighbors.addAll(neighbors);	}					/**	 * Receives a physical message.	 * @param sender the sender of the message.	 * @param message the message itself.	 */	public void receive(Destination sender, Message message){		// Add a new message to the broker's internal queue		qIn.enqueue(sender,this,message);		// TODO remove		if(sender == null){			throw new NullPointerException();		}				// When it is the first message in the queue, schedule its processing.		if(qIn.size()==1) {			sim.scheduleEventIn(net.getProcessingDelay(this), new Event(){				public void handle(){					Broker.this.processMessage();				}			});		}	}		/**	 * Processes the next queued message.	 */	private void processMessage() {				Message m;		//System.out.println(queueIn.size());		// check if a message is available		if(qIn.isEmpty()) {			return;		}				// process the first message		m = qIn.dequeue();				// TODO: insert statistics here 		sim.collectStatisticsProcessing(this,m);		receiveMessage(qIn.getSender(),m);		// schedule the processing of the next message, if one ios available		if(qIn.isEmpty()) {			return;		}				sim.scheduleEventIn(net.getProcessingDelay(this), new Event(){			public void handle(){				Broker.this.processMessage();			}		});	}		/**	 * Receives a message, when it is dequeued.	 * @param sender the sender of the message.	 * @param m the message itself.	 */	private void receiveMessage(Destination sender, Message message) {		sender.isBroker();		// queueing of gray notifications and (un)subscriptions		if(sender == bNew && !bQueued.isEmpty() && message instanceof Notification){			qNot.enqueue(sender, this, message);			sim.collectStatisticsEnqueue(this,message);			return;		}		if (sender == bNew && bRelay != null && 		   (message instanceof Subscription || message instanceof Unsubscription) ){						qSub.enqueue(sender, this, message);			return;		}				// relaying of black (un)subscriptions		if (sender == bOld && relaying && 		   (message instanceof Subscription || message instanceof Unsubscription) ){						message.setRelayed();			message.colorGray();			sendMessage(bRelay, message);			return;		}				if(message.isRelayed()){						// TODO remove			if(bNew == null){				System.out.println("I am "+this+" Message is "+message);				sender.isBroker();			}						message.unsetRelayed();			sender = bNew;		}				// coloring messages		if(color == GRAY && sender != bLeft && sender != bRight){			message.colorGray();		}						handleMessage(sender, message);	}		private void handleMessage(Destination sender, Message message) {		DestinationSet destinations;		DestinationSet.Entry destination;		boolean containsClient = false;		boolean containsNeighbor = false;				// Is the sender a client or a neighbor?		if(sender.isClient()) {			containsClient = true;		} else {			containsNeighbor = true;		}				// message dispatching		if(message instanceof Unsubscription){			destinations = handleUnsubscription(sender,(Unsubscription)message);		}		else if (message instanceof Subscription){			destinations = handleSubscription(sender,(Subscription)message);		}		else if (message instanceof Notification){			destinations = handleNotification(sender, (Notification)message);		}		else if (message instanceof Broadcast){			destinations = handleBroadcast(sender, (Broadcast)message);		}		else if (message instanceof Request) {			destinations = handleRequest(sender, (Request)message);		}		else if (message instanceof Lock) {			destinations = handleLock(sender, (Lock)message);		}		else if (message instanceof Begin) {			destinations = handleBegin(sender, (Begin)message);		}		else if (message instanceof Separator) {			destinations = handleSeparator(sender, (Separator)message);		}		else if (message instanceof End) {			destinations = handleEnd(sender, (End)message);		}		else if (message instanceof Unlock) {			destinations = handleUnlock(sender, (Unlock)message);		}		else{			destinations = new DestinationSet();		}				// message forwarding		for(Iterator it = destinations.iterator(); it.hasNext(); ){			destination = (DestinationSet.Entry)it.next();			sendMessage(destination.destination, destination.message);			// Does the set contains any clients or neighbors?			if(destination.destination.isClient()){				containsClient = true;			}else{				containsNeighbor = true;			}		}				// caching of a notification, iff a client and a neighbor is contained		if(message instanceof Notification && containsClient && containsNeighbor){//			if(cache.isFull()){//				broadcast();//			}			cache.add((Notification)message);			sim.collectStatisticsCorrectProcessing();		}			if(message instanceof Notification && !containsClient) {			sim.collectStatisticsPureForwarding();		}	}		private void sendMessage(Destination receiver, Message message){		// forwarding colored messages over the new and old edge		if ( !(message instanceof Lock  || message instanceof Unlock ||			   message instanceof Begin || message instanceof End) ) {						if( (receiver == bNew && message.isBlack()) ||					(receiver == bOld && message.isGray()) ){				return;			}		}				// uncolor messages when leaving the circuit		if (receiver != this && receiver != bLeft && receiver != bRight &&		    receiver != bNew && receiver != bOld && receiver != bRelay) {						message.colorBlack();		}				// sending messages to myself		if(receiver == this){			receiveMessage(this, message);		}else{			net.send(this,receiver,message);		}	}			// handle subscriptions, unsubscriptions and notifications	private DestinationSet handleSubscription(Destination from, Subscription s){		return routingTable.subscribe(from, s);	}	private DestinationSet handleUnsubscription(Destination from, Unsubscription u){		return routingTable.unsubscribe(from, u);	}		private DestinationSet handleNotification(Destination from, Notification n){		return routingTable.forward(from, n);	}	/**	 * Starts all broker's functions, inclusive the broadcasting.	 */	public void start(){		startBroadcasting();	}		/**	 * Starts the broker's broadcasting to announce his presence and his	 * interests.  	 */	public void startBroadcasting(){		if(broadcastEvent == null){			broadcastEvent = new Event(){				public void handle(){					Broker.this.broadcast();				}			};			sim.scheduleEventIn(broadcastInterval*rand.nextDouble(), broadcastEvent);		}		throw new UnsupportedOperationException();	}		/**	 * Starts the broker's broadcasting to announce his presence and his	 * interests.  	 */	public void startHeuristic(){		if(broadcastEvent == null){			broadcastEvent = new Event(){				public void handle(){					Broker.this.broadcast();				}			};			sim.scheduleEventIn(broadcastInterval*rand.nextDouble(), broadcastEvent);		}	}		public void stopHeuristic() {		if (broadcastEvent != null) {			sim.removeEvent(broadcastEvent);			broadcastEvent = null;		}	}		// handle a broadcast	private void broadcast(){			// create a new Broadcast Message //		if(net.getNumber(this)==68) {//			System.out.println("Cache belegt ist: "+cache.getFillState());//		}		handleMessage(this, new Broadcast(cache.getBloomFilter(filterSize, 				                                               numberOfHashs)));				// schedule a new Broadcast		broadcastEvent = new Event(){			public void handle(){				Broker.this.broadcast();			}		};		sim.scheduleEventIn(broadcastInterval, broadcastEvent);	}		private DestinationSet handleBroadcast(Destination from, Broadcast broadcast){		DestinationSet destinations;		Broker[] path;		Broker source;		//double time;		BloomFilter bloomFilter;		double interest;		//Request request;		// TODO remove		//System.out.println("Broadcast received at"+this);				// prepare the path		path = extendPath(broadcast.getPath());		source = path[0];		bloomFilter = broadcast.getBloomFilter();				if(source != this){			// determine interests			interest = cache.countHits(bloomFilter);						//			if(path[path.length-2]==source){//				//System.out.println("Bloom Filter Hits: "+interest+" correct: "+" Cache Compare: "+cache.countHits(source.cache)+" size: "+bloomFilter.size());//				interest = interest == 0 ? 1:interest;//			}							// update the environment			environment.setInterest(source, interest);	//			env.refresh(path, time); // not needed anymore?//			env.update(time);			//			// evaluate the path			try{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -