⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumermanager.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                                                         MessageImpl message)
        throws PersistenceException {
        try {

            JmsDestination dest = (JmsDestination) message.getJMSDestination();

            // check to ensure that the message is not of type queue. If it
            // is then we don't want to process it here.
            if (dest instanceof JmsQueue) {
                return false;
            }

            Vector consumers = (Vector) _destToConsumerMap.get(dest);
            if (consumers != null) {
                // now search through the list of inactive durable consumers
                // and remove it from their cache
                Enumeration entries = consumers.elements();
                while (entries.hasMoreElements()) {
                    ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
                    if ((entry._durable) &&
                        (!_endpoints.containsKey(entry._name))) {
                        MessageHandleFactory.destroyPersistentHandle(connection,
                            dest, entry._name, message);
                    }
                }
            }
        } catch (PersistenceException exception) {
            throw exception;
        } catch (Exception exception) {
            // rethrow as a PersistenceException...but does it make sense
            throw new PersistenceException(
                "Exception in ConsumerManager.persistentMessageRemoved " +
                exception.toString());
        }

        return true;
    }

    /**
     * Destroy this manager. This is brutal and final
     */
    public synchronized void destroy() {

        // clean up all the destinations
        Object[] endpoints = _endpoints.values().toArray();
        for (int index = 0; index < endpoints.length; index++) {
            deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]);
        }
        _endpoints.clear();

        // remove cache data structures
        _consumerCache.clear();
        _consumerCache = null;
        _destToConsumerMap.clear();
        _destToConsumerMap = null;
        _wildcardConsumers.clear();
        _wildcardConsumers = null;

        // reset the singleton
        _instance = null;
    }

    /**
     * Return a list of durable subscribers for the specified destination
     *
     * @return Vector - a vector of strings, which denote the name
     */
    public synchronized Vector getDurableConsumersForDest(JmsTopic dest) {
        Vector names = new Vector();

        Vector consumers = (Vector) _destToConsumerMap.get(dest);
        if (consumers != null) {
            Enumeration entries = consumers.elements();
            while (entries.hasMoreElements()) {
                ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
                if (entry._durable) {
                    names.add(entry._name);
                }
            }
        }

        // if the destination is a topic and part is also a wildcard then
        // check the wildcardConsumers for additional consumers
        Iterator wildconsumers = _wildcardConsumers.keySet().iterator();
        while (wildconsumers.hasNext()) {
            ConsumerEntry entry = (ConsumerEntry) wildconsumers.next();
            JmsDestination adest = entry._destination;
            if ((entry._durable) &&
                (adest instanceof JmsTopic) &&
                (((JmsTopic) adest).match((JmsTopic) dest))) {
                names.add(entry._name);
            }
        }

        return names;
    }

    /**
     * Clean all the unreferenced endpoints for a specified destination.
     * Unreference endpoints usually occur when the client abnormally
     * terminates leaving some dangling endpoints on the server.
     *
     * @param dest- the destination to query
     */
    public void cleanUnreferencedEndpoints(JmsDestination dest) {
        Object[] endpoints = _endpoints.values().toArray();

        for (int index = 0; index < endpoints.length; index++) {
            ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
            if (dest.equals(endpoint.getDestination())) {
                if (!endpoint.getSession().isClientEndpointActive()) {
                    try {
                        endpoint.getSession().close();
                    } catch (Exception ignore) {
                        // ignore this exception
                    }
                }
            }
        }
    }

    /**
     * Return a list of {@link ConsumerEndpoint} objects attached to the
     * specified destination
     *
     * @param dest the destination to query
     * @return list of endpoints
     */
    public synchronized LinkedList getEndpointsForDest(JmsDestination dest) {
        LinkedList endpoints = new LinkedList();
        Iterator iter = _endpoints.values().iterator();

        while (iter.hasNext()) {
            ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next();
            if (dest.equals(endpoint.getDestination())) {
                endpoints.add(endpoint);
            }
        }

        return endpoints;
    }

    // implement of GarbageCollectable.collectGarbage
    public synchronized void collectGarbage(boolean aggressive) {
        if (aggressive) {
            Object[] endpoints = _endpoints.values().toArray();
            int count = endpoints.length;

            for (int index = 0; index < count; index++) {
                ((ConsumerEndpoint) endpoints[index]).collectGarbage(aggressive);
            }
        }
    }

    /**
     * Add the specified consumer to the cache.
     *
     * @param name - the name of the consumer
     * @param dest - the destination it is subscribed to. It can be a wildcard
     * @param durable - indicates whether it is a durable subscription
     */
    synchronized void addToConsumerCache(String name, JmsDestination dest,
                                         boolean durable)
        throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("addToConsumerCache(name=" + name + ", dest=" + dest
                + ", durable=" + durable + ")");
        }

        if (!_consumerCache.containsKey(name)) {
            ConsumerEntry entry = new ConsumerEntry(name, dest, durable);
            _consumerCache.put(name, entry);

            // if the specified destination is a JmsTopic and also a wildcard
            // then we need to add it to all matching desitnations
            if ((dest instanceof JmsTopic) &&
                (((JmsTopic) dest).isWildCard())) {
                // store wild card consumers in a separate array.
                _wildcardConsumers.put(new ConsumerEntry(name, dest, durable),
                    dest);
            } else {
                // we also need to add the reverse mapping
                Vector consumers = (Vector) _destToConsumerMap.get(dest);
                if (consumers == null) {
                    consumers = new Vector();
                    _destToConsumerMap.put(dest, consumers);
                }

                // add the mapping
                consumers.add(entry);
            }
        }
    }

    /**
     * Remove the specified consumer from the cache and make all necessary
     * adjustments
     *
     * @param name - name of consumer to remove
     */
    synchronized void removeFromConsumerCache(String name)
        throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("removeFromConsumerCache(name=" + name + ")");
        }

        if (_consumerCache.containsKey(name)) {
            ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(name);
            JmsDestination dest = entry._destination;


            if ((dest instanceof JmsTopic) &&
                (((JmsTopic) dest).isWildCard())) {
                // remove it from the wildcard cache.
                _wildcardConsumers.remove(name);
            } else {
                // remove it from the specified destination
                Vector consumers = (Vector) _destToConsumerMap.get(dest);
                if (consumers != null) {
                    consumers.remove(entry);

                    // if consumers is of size 0 then remove it
                    if (consumers.size() == 0) {
                        _destToConsumerMap.remove(dest);
                    }
                }
            }
        } else {
            if (_log.isDebugEnabled()) {
                _log.debug("removeFromConsumerCache(name=" + name +
                    "): consumer not found");
            }
        }
    }

    /**
     * Remove all the consumers for the specified destination from the
     * cache.
     *
     * @param destination - destination to remove.
     */
    synchronized void removeFromConsumerCache(JmsDestination destination) {
        if (_destToConsumerMap.containsKey(destination)) {
            _destToConsumerMap.remove(destination);

            // i am not too sure whether we need more house keeping
        }
    }

    /**
     * Return the number of active consumer endpoints
     *
     * @return int - number of active consumer endpoints
     */
    int getConsumerEndpointCount() {
        return _endpoints.size();
    }

    /**
     * Initialises the consumer manager
     *
     * @throws ServiceException if the manager can't be initialised
     */
    private void init() throws ServiceException {
        _scheduler = Scheduler.instance();

        Connection connection = null;
        try {
            connection = DatabaseService.getConnection();

            PersistenceAdapter adapter = DatabaseService.getAdapter();
            connection.commit();

            // return a list of JmsDestination objects.
            HashMap map = adapter.getAllDurableConsumers(connection);
            Iterator iter = map.keySet().iterator();

            // Create an endpoint for each durable consumer
            while (iter.hasNext()) {
                // for each destination, create the destination cache
                String consumer = (String) iter.next();
                String deststr = (String) map.get(consumer);

                JmsDestination dest =
                    DestinationManager.instance().destinationFromString(
                        deststr);
                if (dest == null) {
                    // this maybe a wildcard subscription
                    dest = new JmsTopic(deststr);
                    if (!((JmsTopic) dest).isWildCard()) {
                        dest = null;
                    }
                }

                if (consumer != null && dest != null &&
                    dest instanceof JmsTopic) {
                    // cache the consumer-destination mapping in memory.
                    addToConsumerCache(consumer, dest, true);
                } else {
                    // what should we do about this stage
                    _log.error(
                        "Failure in ConsumerManager.init : " + consumer +
                        ":" + dest);
                }
            }
        } catch (ServiceException exception) {
            SQLHelper.rollback(connection);
            throw exception;
        } catch (Exception exception) {
            SQLHelper.rollback(connection);
            throw new ServiceException("Failed to initialise ConsumerManager",
                exception);
        } finally {
            SQLHelper.close(connection);
        }
    }

    /**
     * This private static class is used to maintain consumer information
     */
    private static class ConsumerEntry {

        /**
         * The name of the consumer. This name is either the durable
         * subscriber name or a uniquely generated name.
         */
        String _name = null;

        /**
         * Indicated whether this entry is for a durable subscriber
         */
        boolean _durable = false;

        /**
         * The destination that the consumer is actually subscribed too
         */
        JmsDestination _destination = null;

        /**
         * Construct an instance of this class using the specified
         * name and durable subscriber indicator
         *
         * @param name - the name of the consumer
         * @param destination - the destination consumer is subscribed too
         * @param durable - indicates whether it is a durable subscription
         */
        ConsumerEntry(String name, JmsDestination destination,
                      boolean durable) {
            _name = name;
            _destination = destination;
            _durable = durable;
        }

        // override Object.equals
        public boolean equals(Object obj) {

            boolean result = false;
            if ((obj != null) &&
                (obj instanceof ConsumerEntry) &&
                (((ConsumerEntry) obj)._name.equals(_name))) {
                result = true;
            }

            return result;
        }

    } //-- ConsumerEntry

} //-- ConsumerManager

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -