consumermanagerimpl.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 919 行 · 第 1/3 页

JAVA
919
字号
        }        return consumer;    }    /**     * Create a durable consumer.     *     * @param topic        the topic to subscribe to     * @param name         the subscription name     * @param clientID     the client identifier. May be <code>null</code>.     * @param connectionId the identity of the connection that owns this     *                     consumer     * @param noLocal      if true, and the destination is a topic, inhibits the     *                     delivery of messages published by its own     *                     connection.     * @param selector     the message selector. May be <code>null</code>     * @return the durable consumer endpoint     * @throws InvalidDestinationException if <code>topic</code> is not a     *                                     persistent destination     * @throws InvalidSelectorException    if the selector is not well formed     * @throws JMSException                if a durable consumer is already     *                                     active with the same <code>name</code>     */    public synchronized DurableConsumerEndpoint createDurableConsumer(            JmsTopic topic, String name, String clientID, long connectionId,            boolean noLocal,            String selector)            throws JMSException {        if (_log.isDebugEnabled()) {            _log.debug("createDurableConsumer(topic=" + topic                       + ", name=" + name + ", connectionId=" + connectionId                       + ", selector=" + selector + ", noLocal=" + noLocal                       + ")");        }        DurableConsumerEndpoint consumer                = createInactiveDurableConsumer(topic, name, clientID);        consumer.activate(connectionId, selector, noLocal);        return consumer;    }    /**     * Create a browser for the specified destination and the selector. A     * browser is responsible for passing all messages back to the client that     * reside on the queue     *     * @param queue    the queue to browse     * @param selector the message selector. May be <code>null</code>     * @return the queue browser endpoint     * @throws JMSException             if the browser can't be created     */    public synchronized ConsumerEndpoint createQueueBrowser(JmsQueue queue,                                                            String selector)            throws JMSException {        // ensure that the destination is valid before proceeding        getDestination(queue, true);        long consumerId = getNextConsumerId();        ConsumerEndpoint consumer = null;        try {            _database.begin();            QueueDestinationCache cache;            cache = (QueueDestinationCache) _destinations.getDestinationCache(                    queue);            consumer = new QueueBrowserEndpoint(consumerId, cache, selector);            Object key = ConsumerEntry.getConsumerKey(consumer);            _endpoints.put(key, consumer);            addConsumerEntry(key, queue, null, false);            _database.commit();        } catch (Exception exception) {            rethrow("Failed to create browser", exception);        }        return consumer;    }    /**     * Close a consumer.     *     * @param consumer the consumer to close     */    public synchronized void closeConsumer(ConsumerEndpoint consumer) {        if (_log.isDebugEnabled()) {            _log.debug("closeConsumerEndpoint(consumer=[Id="                       + consumer.getId() + ", destination="                       + consumer.getDestination() + ")");        }        Object key = ConsumerEntry.getConsumerKey(consumer);        ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key);        if (existing != null) {            try {                 _database.begin();                if (consumer.getId() != existing.getId()) {                    // As a fix for bug 759752, only remove the consumer if it                    // matches the existing one.                    // @todo - not sure if this situation can arise any longer                    _log.error("Existing endpoint doesn't match that to be closed "                               + "- retaining");                } else if (existing instanceof DurableConsumerEndpoint) {                    DurableConsumerEndpoint durable                            = (DurableConsumerEndpoint) existing;                    if (durable.isActive()) {                        try {                            durable.deactivate();                        } catch (JMSException exception) {                            _log.error("Failed to deactivate durable consumer="                                       + durable, exception);                        }                    }                } else {                    _endpoints.remove(key);                    consumer.close();                    removeConsumerEntry(key);                }                _database.commit();            } catch (PersistenceException exception) {                _log.error("Failed to close consumer=" + consumer, exception);                rollback();            }        }    }    /**     * Return the consumer with the specified identity.     *     * @param consumerId the identity of the consumer     * @return the associated consumer, or <code>null</code> if none exists     */    public synchronized ConsumerEndpoint getConsumerEndpoint(long consumerId) {        return (ConsumerEndpoint) _endpoints.get(new Long(consumerId));    }    /**     * Return the consumer with the specified persistent identity.     *     * @param persistentId the persistent identity of the consumer     * @return the associated consumer, or <code>null</code> if none exists     */    public synchronized ConsumerEndpoint getConsumerEndpoint(            String persistentId) {        return (ConsumerEndpoint) _endpoints.get(persistentId);    }    /**     * Determines if there are any active consumers for a destination.     *     * @param destination the destination     * @return <code>true</code> if there is at least one consumer     */    public synchronized boolean hasActiveConsumers(JmsDestination destination) {        boolean result = false;        ConsumerEndpoint[] consumers = getConsumers();        for (int i = 0; i < consumers.length; ++i) {            if (consumers[i].canConsume(destination)) {                result = true;                break;            }        }        return result;    }    /**     * Start the service.     *     * @throws ServiceException if the service fails to start     */    protected void doStart() throws ServiceException {        try {            _database.begin();            Connection connection = _database.getConnection();            PersistenceAdapter adapter = _database.getAdapter();            // return a list of JmsDestination objects.            HashMap map = adapter.getAllDurableConsumers(connection);            Iterator iter = map.keySet().iterator();            // Create an endpoint for each durable consumer            while (iter.hasNext()) {                String consumer = (String) iter.next();                String deststr = (String) map.get(consumer);                JmsDestination dest = _destinations.getDestination(deststr);                if (dest == null) {                    // this maybe a wildcard subscription                    dest = new JmsTopic(deststr);                    if (!((JmsTopic) dest).isWildCard()) {                        dest = null;                    }                }                if (consumer != null && dest != null &&                        dest instanceof JmsTopic) {                    // cache the consumer-destination mapping in memory.                    addDurableConsumer((JmsTopic) dest, consumer, null);                } else {                    // @todo                    _log.error("Failure in ConsumerManager.init : " + consumer +                               ":" + dest);                }            }            _database.commit();        } catch (Exception exception) {            rollback();            throw new ServiceException("Failed to initialise ConsumerManager",                                       exception);        }    }    /**     * Stop the service.     */    protected synchronized void doStop() {        // clean up all the destinations        Object[] endpoints = _endpoints.values().toArray();        for (int index = 0; index < endpoints.length; index++) {            closeConsumer((ConsumerEndpoint) endpoints[index]);        }        _endpoints.clear();        // remove cache data structures        _consumers.clear();        _destToConsumerMap.clear();        _wildcardConsumers.clear();    }    /**     * Create an inactive durable consumer.     * <p/>     * If the consumer doesn't exist, it will created in the persistent store.     * If it does exist, and is inactive, it will be recreated. If it does     * exist, but is active, an exception will be raised.     *     * @param topic    the topic to subscribe to     * @param name     the subscription name     * @param clientID the client identifier. May be <code>null</code>.     * @return the durable consumer     * @throws InvalidDestinationException if <code>topic</code> is not a     *                                     persistent destination     * @throws JMSException                if a durable consumer is already     *                                     active with the same <code>name</code>,     *                                     or the consumer can't be created     */    private DurableConsumerEndpoint createInactiveDurableConsumer(            JmsTopic topic, String name, String clientID)            throws JMSException {        DurableConsumerEndpoint endpoint;        if (_log.isDebugEnabled()) {            _log.debug("createInactiveDurableConsumer(topic=" + topic                       + ", name=" + name + ", clientID=" + clientID + ")");        }        // check that the destination exists, if the topic is not a wildcard        if (!topic.isWildCard()) {            topic = (JmsTopic) getDestination(topic, false);        }        if (name == null || name.length() == 0) {            throw new InvalidDestinationException(                    "Invalid subscription name: " + name);        }        endpoint = (DurableConsumerEndpoint) _endpoints.get(name);        if (endpoint != null) {            if (endpoint.isActive()) {                throw new JMSException(                        "Durable subscriber already exists with name: " + name);            }            if (!endpoint.getDestination().equals(topic)) {                // subscribing to a different topic. Need to re-subscribe.                unsubscribe(name, clientID);                endpoint = null;            }        }        if (endpoint == null) {            try {                _database.begin();                PersistenceAdapter adapter = _database.getAdapter();                Connection connection = _database.getConnection();                adapter.addDurableConsumer(connection, topic.getName(), name);                endpoint = addDurableConsumer(topic, name, clientID);                _database.commit();            } catch (Exception exception) {                String msg = "Failed to create durable consumer, name=" + name                        + ", for topic=" + topic.getName();                rethrow(msg, exception);            }        }        return endpoint;    }    /**     * Register an inactive durable consumer.     *     * @param topic    the topic to subscribe to     * @param name     the subscription name     * @param clientID the client identifier. May be <code>null</code>.     * @return the durable consumer

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?