⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumermanager.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            addToConsumerCache(key, destination, false);
        }

        return endpoint;
    }

    /**
     * Create a durable consumer with the specified well-known name.
     *
     * @param session  the owning session
     * @param topic    consumer for this topic
     * @param name     the unique subscriber name
     * @param selector the message selector. May be <code>null</code>
     * @param noLocal  if true, and the destination is a topic, inhibits the
     *                 delivery of messages published by its own connection. The
     *                 behavior for <code>noLocal</code> is not specified if the
     *                 destination is a queue.
     * @return the durable consumer endpoint
     * @throws JMSException if a durable consumer is already active with the
     *                      same <code>name</code>, or the <code>topic</code>
     *                      doesn't exist, or <code>selector</code> is an
     *                      invalid selector
     */
    public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint(
            JmsServerSession session, JmsTopic topic, String name,
            boolean noLocal,
            String selector)
            throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("createDurableConsumerEndpoint(session=" + session
                       + ", topic=" + topic + ", name=" + name
                       + ", selector=" + selector + ", noLocal=" + noLocal
                       + ")");
        }

        // check that the durable subscriber is not already registered. If it
        // is registered then check to see whether the client is still active.
        DurableConsumerEndpoint endpoint =
                (DurableConsumerEndpoint) _endpoints.get(name);
        if (endpoint != null) {
            throw new JMSException(name + " is already registered");
        }

        // check that the destination actually exists, if the topic
        // is not a wildcard
        if (!topic.isWildCard() &&
                !DestinationManager.instance().destinationExists(topic)) {
            throw new JMSException("Cannot create a durable consumer for "
                                   + topic);
        }

        // if we get this far then we need to create the durable consumer
        long consumerId = getNextConsumerId();
        endpoint = new DurableConsumerEndpoint(consumerId, session, topic, name,
                                               selector, noLocal, _scheduler);
        _endpoints.put(endpoint.getPersistentId(), endpoint);

        return endpoint;
    }

    /**
     * Check whether there are active durable consumers for the specified
     * destination.
     *
     * @param topic the destination to check
     * @return <code>true</code> if there is at least one active consumer
     */
    public synchronized boolean hasActiveDurableConsumers(JmsDestination topic) {

        boolean result = false;
        List consumers = (List) _destToConsumerMap.get(topic);
        if (consumers != null) {
            Iterator iterator = consumers.iterator();
            while (iterator.hasNext()) {
                ConsumerEntry entry = (ConsumerEntry) iterator.next();
                if (entry.isDurable()) {
                    result = true;
                    break;
                }
            }
        }

        return result;
    }

    /**
     * Create a browser for the specified destination and the selector. A
     * browser is responsible for passing all messages back to the client that
     * reside on the queue
     *
     * @param session  the owning session
     * @param queue    the queue to browse
     * @param selector the message selector. May be <code>null</code>
     * @return the queue browser endpoint
     */
    public synchronized ConsumerEndpoint createQueueBrowserEndpoint(
            JmsServerSession session, JmsQueue queue, String selector)
            throws JMSException {

        // ensure that the destination is valid before proceeding
        checkDestination(queue);

        long consumerId = getNextConsumerId();

        ConsumerEndpoint consumer = new QueueBrowserEndpoint(consumerId, session,
                                                             queue, selector,
                                                             _scheduler);
        Object key = ConsumerEntry.getConsumerKey(consumer);
        _endpoints.put(key, consumer);
        addToConsumerCache(key, queue, false);

        return consumer;
    }

    /**
     * Destroy the endpoint associated with the specified durable consumer
     *
     * @param name - name of the durable consumer
     * @throws JMSException - if itt cannot complete the request
     */
    public synchronized void deleteDurableConsumerEndpoint(String name)
            throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("deleteDurableConsumerEndpoint(name=" + name + ")");
        }

        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
        if (entry != null) {
            if (entry.isDurable()) {
                deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name));
            } else {
                throw new JMSException(name + " is not a durable subscriber");
            }
        } else {
           // ignore since the consumer is not active
            if (_log.isDebugEnabled()) {
                _log.debug("deleteDurableConsumerEndpoint(name=" + name
                           + "): failed to locate consumer");
            }
        }
    }

    /**
     * Destroy the specified consumer.
     *
     * @param consumer the consumer to destroy
     */
    public synchronized void deleteConsumerEndpoint(ConsumerEndpoint consumer) {

        if (_log.isDebugEnabled()) {
            _log.debug("deleteConsumerEndpoint(consumer=[Id="
                       + consumer.getId() + ", destination="
                       + consumer.getDestination() + ")");
        }

        Object key = ConsumerEntry.getConsumerKey(consumer);

        // if the consumer is currently active then delete it
        ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key);
        if (existing != null) {
            // remove it from the list of active endpoints
            // As a fix for bug 759752, only remove the consumer if it
            // matches the existing one
            if (consumer.getId() == existing.getId()) {
                _endpoints.remove(key);
            } else {
                if (_log.isDebugEnabled()) {
                    _log.debug("Existing endpoint doesn't match that to " +
                               "be deleted - retaining");
                }
            }

            // close the endpoint
            consumer.close();

            // remove it from the consumer cache if and only if it is a
            // non-durable subscriber
            if (!(consumer instanceof DurableConsumerEndpoint)) {
                removeFromConsumerCache(key);
            }
        }
    }

    /**
     * Return the consumer with the specified identity.
     *
     * @param consumerId the identity of the consumer
     * @return the associated consumer, or <code>null</code> if none exists
     */
    public ConsumerEndpoint getConsumerEndpoint(long consumerId) {
        return (ConsumerEndpoint) _endpoints.get(new Long(consumerId));
    }

    /**
     * Return the consumer with the specified persistent identity.
     *
     * @param persistentId the persistent identity of the consumer
     * @return the associated consumer, or <code>null</code> if none exists
     */
    public ConsumerEndpoint getConsumerEndpoint(String persistentId) {
        return (ConsumerEndpoint) _endpoints.get(persistentId);
    }

    /**
     * Check whether there is an active consumer for the specified destination
     *
     * @param destination - the destination to check
     * @return boolean - true if it exists
     */
    public boolean hasActiveConsumers(JmsDestination destination)
            throws JMSException {
        boolean result = false;

        Object[] endpoints = _endpoints.values().toArray();
        for (int index = 0; index < endpoints.length; index++) {
            ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
            JmsDestination endpoint_dest = endpoint.getDestination();

            if ((destination instanceof JmsTopic) &&
                    (endpoint_dest instanceof JmsTopic) &&
                    (((JmsTopic) endpoint_dest).isWildCard())) {
                if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) {
                    result = true;
                    break;
                }
            } else {
                if (endpoint_dest.equals(destination)) {
                    result = true;
                    break;
                }
            }
        }

        return result;
    }

    /**
     * Check whether a particular durable consumer is active
     *
     * @param name - the consumer name
     * @return boolean - true if active
     */
    public boolean isDurableConsumerActive(String name) {
        return (_endpoints.get(name) != null);
    }

    /**
     * Return the destination assoicated with the specified durable consumer.
     *
     * @param name - consumer name
     * @return JmsDestination - the destination is it registered under or null
     */
    public JmsDestination getDestinationForConsumerName(String name) {
        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
        return (entry != null) ? entry.getDestination() : null;
    }

    /**
     * Check if the specified durable consumer exists
     *
     * @param name - the name of the durable consumer
     * @return boolean - true if successful
     */
    public boolean durableConsumerExists(String name) {
        boolean result = false;
        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
        if ((entry != null) &&
                (entry._durable)) {
            result = true;
        }

        return result;
    }

    /**
     * This method will check that the name-destination pair actually are valid
     * and exist as a DurableConsumer  entity
     *
     * @param topic - the name of the topic
     * @param name  - the name of the durable consumer
     * @return boolean - true if valid and false otherwise
     */
    public boolean validSubscription(String topic, String name) {

        boolean result = false;
        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);

        if ((entry != null) &&
                (entry._destination != null) &&
                (entry._destination.getName().equals(topic))) {
            result = true;
        }

        return result;
    }

    /**
     * Returns inactive subscription names for a persistent topic
     *
     * @param topic the topic
     * @return a list of subscription names, as <code>String</code>s
     */
    public synchronized List getInactiveSubscriptions(JmsTopic topic) {
        List result = new ArrayList();
        List consumers = (List) _destToConsumerMap.get(topic);
        if (consumers != null) {
            Iterator iterator = consumers.iterator();
            while (iterator.hasNext()) {
                ConsumerEntry entry = (ConsumerEntry) iterator.next();
                if (entry.isDurable()
                        && !_endpoints.containsKey(entry.getKey())) {
                    result.add(entry.getName());
                }
            }
        }
        return result;
    }

    /**
     * Destroy this manager. This is brutal and final
     */
    public synchronized void destroy() {

        // clean up all the destinations
        Object[] endpoints = _endpoints.values().toArray();
        for (int index = 0; index < endpoints.length; index++) {
            deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]);
        }
        _endpoints.clear();

        // remove cache data structures
        _consumerCache.clear();
        _consumerCache = null;
        _destToConsumerMap.clear();
        _destToConsumerMap = null;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -