⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumermanager.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
     *
     * @param session the owning session
     * @param topic consumer for this topic
     * @param name the unique subscriber name
     * @param clientId the remote client identity
     * @param selector the message selector. May be <code>null</code>
     * @return the durable consumer endpoint
     * @throws JMSException if a durable consumer is already active with
     * the same <code>name</code>, or the <code>topic</code> doesn't exist,
     * or <code>selector</code> is an invalid selector
     */
    public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint(
        JmsServerSession session, JmsTopic topic, String name,
        long clientId, String selector)
        throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("createDurableConsumerEndpoint(session=[sessionId="
                + session.getSessionId() + "], topic=" + topic
                + ", name=" + name + ", clientId=" + clientId
                + ", selector=" + selector);
        }

        // check that the durable subscriber is not already registered. If it
        // is registered then check to see whether the client is still active.
        DurableConsumerEndpoint endpoint =
            (DurableConsumerEndpoint) _endpoints.get(name);
        if (endpoint != null) {
            if (endpoint.getSession().isClientEndpointActive()) {
                throw new JMSException(name + " is already registered");
            } else {
                // client endpoint must have been lost
                if (_log.isDebugEnabled()) {
                    _log.debug("Closing session for inactive durable " +
                        "consumer [name=" + name + "]");
                }
                endpoint.getSession().close();
            }
        }

        // check that the destination actually exists, if the topic
        // is not a wildcard
        if (!topic.isWildCard() &&
            !DestinationManager.instance().destinationExists(topic)) {
            throw new JMSException("Cannot create a durable consumer for "
                                   + topic);
        }

        // if we get this far then we need to create the durable consumer
        endpoint = new DurableConsumerEndpoint(session, clientId, topic, name,
            selector, _scheduler);
        _endpoints.put(endpoint.getPersistentId(), endpoint);

        return endpoint;
    }

    /**
     * Check whether there are active durable consumers for the specified
     * destination.
     *
     * @param topic the destination to check
     * @param <code>true</code> if there is at least one active consumer
     */
    public synchronized boolean hasActiveDurableConsumers(
        JmsDestination topic) {

        boolean result = false;
        Vector consumers = (Vector) _destToConsumerMap.get(topic);
        if (consumers != null) {
            Enumeration iterator = consumers.elements();
            while (iterator.hasMoreElements()) {
                ConsumerEntry entry = (ConsumerEntry) iterator.nextElement();
                if (entry._durable) {
                    result = true;
                    break;
                }
            }
        }

        return result;
    }

    /**
     * Create a browser for the specified destination and the selector. A
     * browser is responsible for passing all messages back to the client
     * that reside on the queue
     *
     * @param session the owning session
     * @param clientId the remote client identity, which is session scoped
     * @param queue the queue destination cache
     * @param selector optional filter
     * @return the queue browser endpoint
     */
    public synchronized ConsumerEndpoint createQueueBrowserEndpoint(
        JmsServerSession session, long clientId, JmsQueue queue,
        String selector)
        throws JMSException {

        ConsumerEndpoint consumer = null;

        if (queue != null) {
            consumer = new QueueBrowserEndpoint(session, clientId,
                queue, selector, _scheduler);
        } else {
            throw new JMSException("Cannot create a browser for a null queue");
        }

        String id = consumer.getPersistentId();
        _endpoints.put(id, consumer);
        addToConsumerCache(id, queue, false);

        return consumer;
    }

    /**
     * Destroy the endpoint associated with the specified durable consumer
     *
     * @param name - name of the durable consumer
     * @exception JMSException - if itt cannot complete the request
     */
    public synchronized void deleteDurableConsumerEndpoint(String name)
        throws JMSException {

        if (_log.isDebugEnabled()) {
            _log.debug("deleteDurableConsumerEndpoint(name=" + name + ")");
        }

        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
        if (entry != null) {
            if (entry._durable) {
                deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name));
            } else {
                throw new JMSException(name + " is not a durable subscriber");
            }
        } else {
            // ignore since the consumer is not active
            if (_log.isDebugEnabled()) {
                _log.debug("deleteDurableConsumerEndpoint(name=" + name
                    + "): failed to locate consumer");
            }
        }
    }

    /**
     * Destroy the specified consumer
     *
     * @param consumer the consumer to destroy
     */
    public synchronized void deleteConsumerEndpoint(
        ConsumerEndpoint consumer) {

        if (_log.isDebugEnabled()) {
            _log.debug("deleteConsumerEndpoint(consumer=[clientId="
                + consumer.getClientId() + ", destination="
                + consumer.getDestination() + ", session=[sessionId="
                + consumer.getSession().getSessionId() + "]])");
        }

        String id = consumer.getPersistentId();

        // if the consumer is currently active then delete it
        ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(id);
        if (existing != null) {
            // unregister itself from all the caches
            consumer.unregister();

            // remove it from the list of active endpoints
            // As a fix for bug 759752, only remove the consumer if it
            // matches the existing one
            if (consumer.getId().equals(existing.getId())) {
                _endpoints.remove(id);
            } else {
                if (_log.isDebugEnabled()) {
                    _log.debug("Existing endpoint doesn't match that to " +
                        "be deleted - retaining");
                }
            }

            // close the endpoint
            consumer.close();

            // remove it from the consumer cache if and only if it is a
            // non-durable subscriber
            if (!(consumer instanceof DurableConsumerEndpoint)) {
                try {
                    removeFromConsumerCache(id);
                } catch (JMSException exception) {
                    _log.debug("Failed to remove " + id + " from the cache",
                        exception);
                }
            }
        }
    }

    /**
     * Return the consumer with the specified identity
     *
     * @param id - identity of the consumer
     * @return Consumer - associated consumer object or null
     */
    public ConsumerEndpoint getConsumerEndpoint(String id) {
        return (ConsumerEndpoint) _endpoints.get(id);
    }

    /**
     * Return a list of {@link ConsumerEndpoint} objects, both transient and
     * durable that are currently active.
     *
     * @return Iterator - iterator of {@link ConsumerEndpoint} objects
     */
    public Iterator consumerEndpoints() {
        return _endpoints.values().iterator();
    }

    /**
     * Return a list of consumer names id's currently active in the
     * consumer manager.
     *
     * @return Iterator - iterator of {#link String} ids
     */
    public Iterator consumerIds() {
        return _endpoints.keySet().iterator();
    }

    /**
     * Check whether a consumer, with the specified identity actually
     * exists.
     *
     * @param id - identity of the consumer
     * @return boolean - true if one exists
     */
    public boolean exists(String id) {
        return (getConsumerEndpoint(id) != null);
    }

    /**
     * Check whether there is an active consumer for the specified
     * destination
     *
     * @param destination - the destination to check
     * @return boolean - true if it exists
     */
    public boolean hasActiveConsumers(JmsDestination destination)
        throws JMSException {
        boolean result = false;

        Object[] endpoints = _endpoints.values().toArray();
        for (int index = 0; index < endpoints.length; index++) {
            ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
            JmsDestination endpoint_dest = endpoint.getDestination();

            if ((destination instanceof JmsTopic) &&
                (endpoint_dest instanceof JmsTopic) &&
                (((JmsTopic) endpoint_dest).isWildCard())) {
                if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) {
                    result = true;
                    break;
                }
            } else {
                if (endpoint_dest.equals(destination)) {
                    result = true;
                    break;
                }
            }
        }

        return result;
    }

    /**
     * Check whether a particular durable consumer is active
     *
     * @param name - the consumer name
     * @return boolean - true if active
     */
    public boolean isDurableConsumerActive(String name) {
        return (_endpoints.get(name) != null);
    }

    /**
     * Return the destination assoicated with the specified durable
     * consumer.
     *
     * @param name - consumer name
     * @return JmsDestination - the destination is it registered under or null
     */
    public JmsDestination getDestinationForConsumerName(String name) {
        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
        return (entry != null) ? entry._destination : null;
    }

    /**
     * Check if the specified durable consumer exists
     *
     * @param name - the name of the durable consumer
     * @return boolean - true if successful
     */
    public boolean durableConsumerExists(String name) {
        boolean result = false;
        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
        if ((entry != null) &&
            (entry._durable)) {
            result = true;
        }

        return result;
    }

    /**
     * This method will check that the name-destination pair actually
     * are valid and exist as a DurableConsumer  entity
     *
     * @param topic - the name of the topic
     * @param name - the name of the durable consumer
     * @return boolean - true if valid and false otherwise
     */
    public boolean validSubscription(String topic, String name) {

        boolean result = false;
        ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);

        if ((entry != null) &&
            (entry._destination != null) &&
            (entry._destination.getName().equals(topic))) {
            result = true;
        }

        return result;
    }

    // implementation of DestinationCacheEventListener.messageAdded
    synchronized public boolean messageAdded(MessageImpl message) {
        return false;
    }

    // implementation of DestinationCacheEventListener.messageRemoved
    synchronized public boolean messageRemoved(MessageImpl message) {
        return false;
    }

    // implementation of DestinationCacheEventListener.persistentMessageAdded
    synchronized public boolean persistentMessageAdded(Connection connection,
                                                       MessageImpl message)
        throws PersistenceException {
        try {
            JmsDestination dest = (JmsDestination) message.getJMSDestination();

            // check to ensure that the message is not of type queue. If it
            // is then we don't want to process it here. All the processing
            // for non-resident queues are done at the cache level
            if (dest instanceof JmsQueue) {
                return false;
            }

            // for each durable consumer we need to add a persistent handle
            Vector names = getDurableConsumersForDest((JmsTopic) dest);
            while (names.size() > 0) {
                String name = (String) names.remove(0);
                MessageHandleFactory.createPersistentHandle(connection, dest,
                    name, message);
            }
        } catch (JMSException exception) {
            // rethrow as a PersistenceException...but does it make sense
            throw new PersistenceException(
                "Failed to create persistent handle", exception);
        }

        // return false, even though we processed it to force the
        // message manager not to keep the message in memory.
        return false;
    }

    // implementation of DestinationCacheEventListener.persistentMessageRemoved
    synchronized public boolean persistentMessageRemoved(Connection connection,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -