⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumermanager.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        _wildcardConsumers.clear();
        _wildcardConsumers = null;

        // reset the singleton
        _instance = null;
    }

    /**
     * Return a list of durable subscribers for the specified destination
     *
     * @return Vector - a vector of strings, which denote the name
     */
    public synchronized List getDurableConsumersForDest(JmsTopic dest) {
        List names = new ArrayList();

        List consumers = (List) _destToConsumerMap.get(dest);
        if (consumers != null) {
            Iterator iterator = consumers.iterator();
            while (iterator.hasNext()) {
                ConsumerEntry entry = (ConsumerEntry) iterator.next();
                if (entry.isDurable()) {
                    names.add(entry.getName());
                }
            }
        }

        // if the destination is a topic and part is also a wildcard then
        // check the wildcardConsumers for additional consumers
        Iterator wildconsumers = _wildcardConsumers.keySet().iterator();
        while (wildconsumers.hasNext()) {
            ConsumerEntry entry = (ConsumerEntry) wildconsumers.next();
            JmsDestination adest = entry.getDestination();
            if (entry.isDurable() && adest instanceof JmsTopic &&
                    ((JmsTopic) adest).match((JmsTopic) dest)) {
                names.add(entry.getName());
            }
        }

        return names;
    }

    /**
     * Return a list of {@link ConsumerEndpoint} objects attached to the
     * specified destination
     *
     * @param dest the destination to query
     * @return list of endpoints
     */
    public synchronized List getEndpointsForDest(JmsDestination dest) {
        LinkedList endpoints = new LinkedList();
        Iterator iter = _endpoints.values().iterator();

        while (iter.hasNext()) {
            ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next();
            if (dest.equals(endpoint.getDestination())) {
                endpoints.add(endpoint);
            }
        }

        return endpoints;
    }

    /**
     * Add the specified consumer to the cache.
     *
     * @param key     a key to identify the consumer
     * @param dest    the destination it is subscribed to. It can be a wildcard
     * @param durable indicates whether it is a durable subscription
     */
    private synchronized void addToConsumerCache(Object key, JmsDestination dest,
                                                 boolean durable) {
        if (_log.isDebugEnabled()) {
            _log.debug("addToConsumerCache(key=" + key + ", dest=" + dest
                       + ", durable=" + durable + ")");
        }

        if (!_consumerCache.containsKey(key)) {
            ConsumerEntry entry = new ConsumerEntry(key, dest, durable);
            _consumerCache.put(key, entry);

            // if the specified destination is a JmsTopic and also a wildcard
            // then we need to add it to all matching desitnations
            if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
                // store wild card consumers in a separate array.
                _wildcardConsumers.put(entry, dest);
            } else {
                // we also need to add the reverse mapping
                List consumers = (List) _destToConsumerMap.get(dest);
                if (consumers == null) {
                    consumers = new ArrayList();
                    _destToConsumerMap.put(dest, consumers);
                }

                // add the mapping
                consumers.add(entry);
            }
        }
    }

    /**
     * Remove the specified consumer from the cache
     *
     * @param key the consumer key
     */
    private synchronized void removeFromConsumerCache(Object key) {
        if (_log.isDebugEnabled()) {
            _log.debug("removeFromConsumerCache(key=" + key + ")");
        }

        ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(key);
        if (entry != null) {
            JmsDestination dest = entry.getDestination();

            if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
                // remove it from the wildcard cache.
                _wildcardConsumers.remove(entry);
            } else {
                // remove it from the specified destination
                List consumers = (List) _destToConsumerMap.get(dest);
                if (consumers != null) {
                    consumers.remove(entry);

                    // if consumers is of size 0 then remove it
                    if (consumers.isEmpty()) {
                        _destToConsumerMap.remove(dest);
                    }
                }
            }
        } else {
            if (_log.isDebugEnabled()) {
                _log.debug("removeFromConsumerCache(key=" + key +
                           "): consumer not found");
            }
        }
    }

    /**
     * Remove all the consumers for the specified destination from the cache.
     *
     * @param destination - destination to remove.
     */
    private synchronized void removeFromConsumerCache(
            JmsDestination destination) {
        if (_destToConsumerMap.containsKey(destination)) {
            _destToConsumerMap.remove(destination);
        }
    }

    /**
     * Initialises the consumer manager
     *
     * @throws ServiceException if the manager can't be initialised
     */
    private void init() throws ServiceException {
        _scheduler = Scheduler.instance();

        Connection connection = null;
        try {
            connection = DatabaseService.getConnection();

            PersistenceAdapter adapter = DatabaseService.getAdapter();
            connection.commit();

            // return a list of JmsDestination objects.
            HashMap map = adapter.getAllDurableConsumers(connection);
            Iterator iter = map.keySet().iterator();

            // Create an endpoint for each durable consumer
            while (iter.hasNext()) {
                // for each destination, create the destination cache
                String consumer = (String) iter.next();
                String deststr = (String) map.get(consumer);

                JmsDestination dest =
                        DestinationManager.instance().getDestination(deststr);
                if (dest == null) {
                    // this maybe a wildcard subscription
                    dest = new JmsTopic(deststr);
                    if (!((JmsTopic) dest).isWildCard()) {
                        dest = null;
                    }
                }

                if (consumer != null && dest != null &&
                        dest instanceof JmsTopic) {
                    // cache the consumer-destination mapping in memory.
                    addToConsumerCache(consumer, dest, true);
                } else {
                    // what should we do about this stage
                    _log.error("Failure in ConsumerManager.init : " + consumer +
                               ":" + dest);
                }
            }
        } catch (ServiceException exception) {
            SQLHelper.rollback(connection);
            throw exception;
        } catch (Exception exception) {
            SQLHelper.rollback(connection);
            throw new ServiceException("Failed to initialise ConsumerManager",
                                       exception);
        } finally {
            SQLHelper.close(connection);
        }
    }

    /**
     * Returns the next seed value to be allocated to a new consumer
     *
     * @return a unique identifier for a consumer
     */
    private synchronized long getNextConsumerId() {
        return ++_consumerIdSeed;
    }

    /**
     * Verifies that a destination is valid.
     * <p/>
     * A destination is valid if it is persistent and is registered with {@link
     * DestinationManager}, or non-persistent. If it is non-persistent, it will
     * be registered
     *
     * @param destination the destination to check
     * @throws InvalidDestinationException if the destination is invalid
     */
    private void checkDestination(JmsDestination destination)
            throws InvalidDestinationException {
        final DestinationManager manager = DestinationManager.instance();
        final String name = destination.getName();
        final JmsDestination existing = manager.getDestination(name);

        if (existing == null) {
            if (destination.getPersistent()) {
                throw new InvalidDestinationException(
                        "No persistent destination with name=" + name
                        + " exists");
            }
            // non-persistent destinations can be registered dynamically
            manager.createDestination(destination);
        } else {
            // make sure the supplied destination has the same properties
            // as the existing one
            if (existing.getPersistent() != destination.getPersistent()) {
                throw new InvalidDestinationException(
                        "Mismatched destination properties for destination"
                        + "with name=" + name);
            }
        }
    }

    /**
     * Helper class used to maintain consumer information
     */
    private static final class ConsumerEntry {

        /**
         * An identifier for the consumer.
         */
        private final Object _key;

        /**
         * Indicated whether this entry is for a durable subscriber
         */
        private final boolean _durable;

        /**
         * The destination that the consumer is actually subscribed too
         */
        private final JmsDestination _destination;

        /**
         * Construct an instance of this class using the specified name and
         * durable subscriber indicator
         *
         * @param key         an identifier for the consumer
         * @param destination the destination consumer is subscribed to
         * @param durable     indicates whether it is a durable subscription
         */
        public ConsumerEntry(Object key, JmsDestination destination,
                             boolean durable) {
            _key = key;
            _destination = destination;
            _durable = durable;
        }

        // override Object.equals
        public boolean equals(Object obj) {

            boolean result = false;
            if ((obj != null) &&
                    (obj instanceof ConsumerEntry) &&
                    (((ConsumerEntry) obj)._key.equals(_key))) {
                result = true;
            }

            return result;
        }

        public Object getKey() {
            return _key;
        }

        public String getName() {
            return (_key instanceof String) ? (String) _key : null;
        }

        public JmsDestination getDestination() {
            return _destination;
        }

        public boolean isDurable() {
            return _durable;
        }

        /**
         * Helper to return a key for identifying {@link ConsumerEndpoint}
         * instances. This returns the consumers persistent identifier if it has
         * one; if not, it returns its transient identifier.
         *
         * @param consumer the consumer
         * @return a key for identifying <code>consumer</code>
         */
        public static Object getConsumerKey(ConsumerEndpoint consumer) {
            Object key = null;
            String id = consumer.getPersistentId();
            if (id != null) {
                key = id;
            } else {
                key = new Long(consumer.getId());
            }
            return key;
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -