⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 routingtable.java

📁 发布/订阅系统路由重配算法,可应用于ad hoc环境
💻 JAVA
字号:
package broker;import java.util.*;import com.Notification;import com.Subscription;import com.Unsubscription;/** * A routing table implementing a covering-based routing algorithm. * @author parzy * */public class RoutingTable {	/**	 * A simple routing table entry consisting of a filter and a destination	 * the filter belongs to.	 * @author parzy	 */	protected static class Entry{		/**		 * The filter.		 */		protected final Filter filter;		/**		 * Its destination.		 */		protected final Destination destination;				/**		 * Creates a new routing entry.		 * @param filter the filter.		 * @param destination its destination		 */		Entry(Filter filter, Destination destination){			this.filter = filter;			this.destination = destination;		}				/**		 * Returns the filter.		 * @return the filter.		 */		protected Filter getFilter(){			return filter;		}				/**		 * Returns the filter's destination.		 * @return the filter's destination.		 */		protected Destination getDestination(){			return destination;		}	}	/**	 * The routing table containing the routing entries.	 */	LinkedList table = new LinkedList();		/**	 * The broker this routing table belongs to.	 */	Broker broker = null;		/**	 * Creates an empty covering-based routing table for the specified broker. 	 * @param broker a broker	 */	public RoutingTable(Broker broker){		this.broker = broker;	}	/**	 * Adds the subscription's filter to the routing table and returns	 * the destinations for forwarding the subscription.	 * @param from the sender of the subscription.	 * @param subscription the subscription itself.	 * @return a destination set with nodes to forward the subscription to.	 */	public DestinationSet subscribe(Destination from, Subscription subscription){		Filter filter;               // the subscription's filter		DestinationSet destinations; // the destinationset to return		Entry entry;                 // the current entry of the routing table				// TODO remove		if(table.size()>1500) {			System.out.println("Routingtable l盲uft voll at "+broker);			try {Thread.sleep(2000);}catch(Exception e) {}		}				// get filter from subscription		filter = subscription.getFilter();				// initialize the destination set		destinations = new DestinationSet(subscription);		destinations.addAll(broker.getNeighbors());		destinations.remove(from);		// for each entry ensure idempotent operation		// determine destinations for forwarding		for(Iterator it = table.iterator(); it.hasNext(); ){			entry = (Entry)it.next();			// remove covered filters of the same subscriber			if(from == entry.destination){								// replay of an identical subscription				// ensure idempotent behaviour				if(filter == entry.filter){					destinations.clear();				    return destinations;				}								// remove the really covered filter, 				// if the destination is a broker				if(from.isBroker() && filter.covers(entry.filter)){					it.remove();					continue;				}			}			// nothing left to forward -- 			// unneccessary to look for covering filters			if(destinations.isEmpty()){				continue;			}						// restrict set to the source of a covering filter 			if(entry.filter.covers(filter)){				destinations.restrictTo(entry.destination);			}		}		// add the new filter to the routing table and return the destinations		table.add(new Entry(filter,from));		return destinations;	}		/**	 * Removes the unsubscribed filter from the routing table and forwards	 * the unsubscriptions and also uncovered subscriptions to neighbouring	 * brokers. 	 * @param from the destination (broker or client) the unsubscription was	 *        received from	 * @param unsubscription the unsubscription to process	 * @return a destination set with mapped unsubscriptions to forward	 */	public DestinationSet unsubscribe(Destination from, Unsubscription unsubscription){		Filter filter;                        // the unsubscribed filter		Filter uncoveredFilter;               // an uncovered filter		DestinationSet destinations;          // the destination set		DestinationSet uncoveredDestinations; // destination subset of a uncovered subscription		Entry entry;                          // an entry of the routing table		Entry uncoveredEntry;                 // an uncovered routing table entry		LinkedList uncoveredEntries;          // a list of uncovered entries		Destination uncoveredSource;          // the destination an uncovered filter is associated to		boolean contained;                    // boolean, whether the filter was found in the routing table				int type;                             // message type 		// get the filter to unsubscribe		filter = unsubscription.getFilter();		// initialize destination set		type = unsubscription.getType();		destinations = new DestinationSet(new Unsubscription(filter, type));		destinations.addAll(broker.getNeighbors());		destinations.remove(from);				// find filter to remove and look for covering and uncovered filters		contained = false;		uncoveredEntries = new LinkedList();		for(Iterator it = table.iterator(); it.hasNext(); ){			entry = (Entry)it.next();			// remove unsubscribed filter			if(from == entry.destination){				if(filter == entry.filter){					contained = true;					it.remove();					continue;				}			}			        // nothing left to forward -- 			// unneccessary to look for covering und uncovered filters			if(destinations.isEmpty()){				continue;			}					// covering or identical routing entry?			if(entry.filter.covers(filter)){				destinations.restrictTo(entry.destination);		    // uncovered routing entry?			}else if(filter.covers(entry.filter)){				uncoveredEntries.add(entry);			}		}		// replay of an identical unsubscription		// ensure idempotent behaviour		if(!contained){			destinations.clear();			return destinations;		}		// add received uncovered subscriptions to the routing table and to		// the list of uncovered entries		for(Iterator it = unsubscription.getUncoveredFilters().iterator(); it.hasNext(); ){			uncoveredEntry = new Entry((Filter)it.next(), from);			table.add(uncoveredEntry);			uncoveredEntries.add(uncoveredEntry);		}		    // process uncovered entries		while(!uncoveredEntries.isEmpty()){						// initialize destination set for uncovered entry			uncoveredEntry = (Entry)uncoveredEntries.removeFirst();			uncoveredFilter = uncoveredEntry.filter;			uncoveredSource = uncoveredEntry.destination;			uncoveredDestinations = new DestinationSet(filter);			uncoveredDestinations.addAll(broker.getNeighbors());			uncoveredDestinations.remove(from);			uncoveredDestinations.remove(uncoveredSource);	    			// look for covering filters for our uncovered ones			for(Iterator it = table.iterator(); it.hasNext(); ){				entry = (Entry)it.next();								// an entry cannot cover itself				if(entry == uncoveredEntry){					continue;				}				// nothing left to forward -- 				// unneccessary to look for covering filters				if(uncoveredDestinations.isEmpty()){					break;				}				// if a new covering filter is found				if(entry.filter.covers(uncoveredFilter)){					// check whether both are identical					if(uncoveredFilter.covers(entry.filter)){						// then also add its source, if it is a broker						if(uncoveredSource.isBroker()){							uncoveredDestinations.add(uncoveredSource);							// and remove it from the list							uncoveredEntries.remove(entry);						}					}else{						// otherwise restrict the set to source of the covering						// filter						uncoveredDestinations.restrictTo(entry.destination);					}				}			}								// add uncovered filters to the unsubscription in the original			// destination set			destinations.addFilters(uncoveredDestinations);					}		return destinations;	}		/**	 * Forwards a notification to all destinations with matching filters.	 * @param from the destination (broker or client) it was received from.	 * @param notification the notification to forward.	 * @return a set of destinations to forward the notification	 */	public DestinationSet forward(Destination from, Notification notification){		DestinationSet destinations; // the destinations to forward		Entry entry;                 // current entry of the routing table				// initialize the destination set		destinations = new DestinationSet(notification);				// look for matching filters		for(Iterator it = table.iterator(); it.hasNext(); ){			entry = (Entry)it.next();			if(entry.filter.match(notification) && from!=entry.destination){				// add the destination if we do not send it back to a				// neighboring broker				if(from.isClient() || from!=entry.destination){					destinations.add(entry.destination);				}			}		}				return destinations;	}	/**	 * Returns all filters associated to specified destination.	 * @param to the destination.	 * @return all filters associated to the destination to.	 */	public Collection getFilters(Destination to){		Entry entry;        // a routing table entry		LinkedList filters; // the assocated filters		filters = new LinkedList();		for(Iterator it = table.iterator(); it.hasNext(); ){			entry = (Entry)it.next();			if(entry.destination == to){				filters.add(entry.filter);			}		}		return filters;	}		public Collection getFilters(){		Entry entry; // a routing table entry		Hashtable set;				set = new Hashtable();				for(Iterator it = table.iterator(); it.hasNext(); ){			entry = (Entry)it.next();			set.put(entry.filter, entry.filter);		}		return set.values();	}}	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -