📄 routingtable.java
字号:
package broker;import java.util.*;import com.Notification;import com.Subscription;import com.Unsubscription;/** * A routing table implementing a covering-based routing algorithm. * @author parzy * */public class RoutingTable { /** * A simple routing table entry consisting of a filter and a destination * the filter belongs to. * @author parzy */ protected static class Entry{ /** * The filter. */ protected final Filter filter; /** * Its destination. */ protected final Destination destination; /** * Creates a new routing entry. * @param filter the filter. * @param destination its destination */ Entry(Filter filter, Destination destination){ this.filter = filter; this.destination = destination; } /** * Returns the filter. * @return the filter. */ protected Filter getFilter(){ return filter; } /** * Returns the filter's destination. * @return the filter's destination. */ protected Destination getDestination(){ return destination; } } /** * The routing table containing the routing entries. */ LinkedList table = new LinkedList(); /** * The broker this routing table belongs to. */ Broker broker = null; /** * Creates an empty covering-based routing table for the specified broker. * @param broker a broker */ public RoutingTable(Broker broker){ this.broker = broker; } /** * Adds the subscription's filter to the routing table and returns * the destinations for forwarding the subscription. * @param from the sender of the subscription. * @param subscription the subscription itself. * @return a destination set with nodes to forward the subscription to. */ public DestinationSet subscribe(Destination from, Subscription subscription){ Filter filter; // the subscription's filter DestinationSet destinations; // the destinationset to return Entry entry; // the current entry of the routing table // TODO remove if(table.size()>1500) { System.out.println("Routingtable l盲uft voll at "+broker); try {Thread.sleep(2000);}catch(Exception e) {} } // get filter from subscription filter = subscription.getFilter(); // initialize the destination set destinations = new DestinationSet(subscription); destinations.addAll(broker.getNeighbors()); destinations.remove(from); // for each entry ensure idempotent operation // determine destinations for forwarding for(Iterator it = table.iterator(); it.hasNext(); ){ entry = (Entry)it.next(); // remove covered filters of the same subscriber if(from == entry.destination){ // replay of an identical subscription // ensure idempotent behaviour if(filter == entry.filter){ destinations.clear(); return destinations; } // remove the really covered filter, // if the destination is a broker if(from.isBroker() && filter.covers(entry.filter)){ it.remove(); continue; } } // nothing left to forward -- // unneccessary to look for covering filters if(destinations.isEmpty()){ continue; } // restrict set to the source of a covering filter if(entry.filter.covers(filter)){ destinations.restrictTo(entry.destination); } } // add the new filter to the routing table and return the destinations table.add(new Entry(filter,from)); return destinations; } /** * Removes the unsubscribed filter from the routing table and forwards * the unsubscriptions and also uncovered subscriptions to neighbouring * brokers. * @param from the destination (broker or client) the unsubscription was * received from * @param unsubscription the unsubscription to process * @return a destination set with mapped unsubscriptions to forward */ public DestinationSet unsubscribe(Destination from, Unsubscription unsubscription){ Filter filter; // the unsubscribed filter Filter uncoveredFilter; // an uncovered filter DestinationSet destinations; // the destination set DestinationSet uncoveredDestinations; // destination subset of a uncovered subscription Entry entry; // an entry of the routing table Entry uncoveredEntry; // an uncovered routing table entry LinkedList uncoveredEntries; // a list of uncovered entries Destination uncoveredSource; // the destination an uncovered filter is associated to boolean contained; // boolean, whether the filter was found in the routing table int type; // message type // get the filter to unsubscribe filter = unsubscription.getFilter(); // initialize destination set type = unsubscription.getType(); destinations = new DestinationSet(new Unsubscription(filter, type)); destinations.addAll(broker.getNeighbors()); destinations.remove(from); // find filter to remove and look for covering and uncovered filters contained = false; uncoveredEntries = new LinkedList(); for(Iterator it = table.iterator(); it.hasNext(); ){ entry = (Entry)it.next(); // remove unsubscribed filter if(from == entry.destination){ if(filter == entry.filter){ contained = true; it.remove(); continue; } } // nothing left to forward -- // unneccessary to look for covering und uncovered filters if(destinations.isEmpty()){ continue; } // covering or identical routing entry? if(entry.filter.covers(filter)){ destinations.restrictTo(entry.destination); // uncovered routing entry? }else if(filter.covers(entry.filter)){ uncoveredEntries.add(entry); } } // replay of an identical unsubscription // ensure idempotent behaviour if(!contained){ destinations.clear(); return destinations; } // add received uncovered subscriptions to the routing table and to // the list of uncovered entries for(Iterator it = unsubscription.getUncoveredFilters().iterator(); it.hasNext(); ){ uncoveredEntry = new Entry((Filter)it.next(), from); table.add(uncoveredEntry); uncoveredEntries.add(uncoveredEntry); } // process uncovered entries while(!uncoveredEntries.isEmpty()){ // initialize destination set for uncovered entry uncoveredEntry = (Entry)uncoveredEntries.removeFirst(); uncoveredFilter = uncoveredEntry.filter; uncoveredSource = uncoveredEntry.destination; uncoveredDestinations = new DestinationSet(filter); uncoveredDestinations.addAll(broker.getNeighbors()); uncoveredDestinations.remove(from); uncoveredDestinations.remove(uncoveredSource); // look for covering filters for our uncovered ones for(Iterator it = table.iterator(); it.hasNext(); ){ entry = (Entry)it.next(); // an entry cannot cover itself if(entry == uncoveredEntry){ continue; } // nothing left to forward -- // unneccessary to look for covering filters if(uncoveredDestinations.isEmpty()){ break; } // if a new covering filter is found if(entry.filter.covers(uncoveredFilter)){ // check whether both are identical if(uncoveredFilter.covers(entry.filter)){ // then also add its source, if it is a broker if(uncoveredSource.isBroker()){ uncoveredDestinations.add(uncoveredSource); // and remove it from the list uncoveredEntries.remove(entry); } }else{ // otherwise restrict the set to source of the covering // filter uncoveredDestinations.restrictTo(entry.destination); } } } // add uncovered filters to the unsubscription in the original // destination set destinations.addFilters(uncoveredDestinations); } return destinations; } /** * Forwards a notification to all destinations with matching filters. * @param from the destination (broker or client) it was received from. * @param notification the notification to forward. * @return a set of destinations to forward the notification */ public DestinationSet forward(Destination from, Notification notification){ DestinationSet destinations; // the destinations to forward Entry entry; // current entry of the routing table // initialize the destination set destinations = new DestinationSet(notification); // look for matching filters for(Iterator it = table.iterator(); it.hasNext(); ){ entry = (Entry)it.next(); if(entry.filter.match(notification) && from!=entry.destination){ // add the destination if we do not send it back to a // neighboring broker if(from.isClient() || from!=entry.destination){ destinations.add(entry.destination); } } } return destinations; } /** * Returns all filters associated to specified destination. * @param to the destination. * @return all filters associated to the destination to. */ public Collection getFilters(Destination to){ Entry entry; // a routing table entry LinkedList filters; // the assocated filters filters = new LinkedList(); for(Iterator it = table.iterator(); it.hasNext(); ){ entry = (Entry)it.next(); if(entry.destination == to){ filters.add(entry.filter); } } return filters; } public Collection getFilters(){ Entry entry; // a routing table entry Hashtable set; set = new Hashtable(); for(Iterator it = table.iterator(); it.hasNext(); ){ entry = (Entry)it.next(); set.put(entry.filter, entry.filter); } return set.values(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -