consumermanagerimpl.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 919 行 · 第 1/3 页

JAVA
919
字号
/** * Redistribution and use of this software and associated documentation * ("Software"), with or without modification, are permitted provided * that the following conditions are met: * * 1. Redistributions of source code must retain copyright *    statements and notices.  Redistributions must also contain a *    copy of this document. * * 2. Redistributions in binary form must reproduce the *    above copyright notice, this list of conditions and the *    following disclaimer in the documentation and/or other *    materials provided with the distribution. * * 3. The name "Exolab" must not be used to endorse or promote *    products derived from this Software without prior written *    permission of Exoffice Technologies.  For written permission, *    please contact info@exolab.org. * * 4. Products derived from this Software may not be called "Exolab" *    nor may "Exolab" appear in their names without prior written *    permission of Exoffice Technologies. Exolab is a registered *    trademark of Exoffice Technologies. * * 5. Due credit should be given to the Exolab Project *    (http://www.exolab.org/). * * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. * * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id: ConsumerManagerImpl.java,v 1.3 2005/12/23 12:17:25 tanderson Exp $ */package org.exolab.jms.messagemgr;import java.sql.Connection;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import javax.jms.InvalidDestinationException;import javax.jms.InvalidSelectorException;import javax.jms.JMSException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.exolab.jms.client.JmsDestination;import org.exolab.jms.client.JmsQueue;import org.exolab.jms.client.JmsTopic;import org.exolab.jms.persistence.DatabaseService;import org.exolab.jms.persistence.PersistenceAdapter;import org.exolab.jms.persistence.PersistenceException;import org.exolab.jms.service.Service;import org.exolab.jms.service.ServiceException;/** * The consumer manager is responsible for creating and managing the lifecycle * of consumers. The consumer manager maintains a list of all active consumers. * * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a> * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> * @version $Revision: 1.3 $ $Date: 2005/12/23 12:17:25 $ */public class ConsumerManagerImpl extends Service implements ConsumerManager {    /**     * The destination manager.     */    private final DestinationManager _destinations;    /**     * The database service.     */    private final DatabaseService _database;    /**     * Maintains a list of all consumers, durable and non-durable. All durable     * subscribers are maintained in memory until they are removed from the     * system entirely. All non-durable subscribers are maintained in memory     * until their endpoint is removed.     */    private HashMap _consumers = new HashMap();    /**     * The set of all consumer endpoints. This is a map of {@link     * ConsumerEndpoint} instances, keyed on {@link ConsumerEndpoint#getPersistentId()}     * if non null; otherwise {@link ConsumerEndpoint#getId()}.     */    private HashMap _endpoints = new HashMap();    /**     * Maintains a mapping between destinations and consumers. A destination can     * have more than one consumer and a consumer can also be registered to more     * than one destination     */    private HashMap _destToConsumerMap = new HashMap();    /**     * The set of all wildcard consumers, represented by a map of ConsumerEntry     * -> JmsTopic instances.     */    private HashMap _wildcardConsumers = new HashMap();    /**     * The seed to allocate identifiers to new consumers.     */    private long _consumerIdSeed = 0;    /**     * The logger.     */    private static final Log _log = LogFactory.getLog(            ConsumerManagerImpl.class);    /**     * Construct a new  <code>ConsumerManager</code>.     *     * @param destinations the destination manager     * @param database     the database service     */    public ConsumerManagerImpl(DestinationManager destinations,                               DatabaseService database) {        if (destinations == null) {            throw new IllegalArgumentException(                    "Argument 'destinations' is null");        }        if (database == null) {            throw new IllegalArgumentException("Argument 'database' is null");        }        _destinations = destinations;        _database = database;    }    /**     * Create a new durable subscription.     * <p/>     * A client can change an existing durable subscription by creating a new     * subscription with the same name and a new topic. Changing a durable     * subscriber is equivalent to unsubscribing the old one and creating a new     * one.     *     * @param topic    the topic to subscribe to     * @param name     the subscription name     * @param clientID the client identifier. May be <code>null</code>     * @throws InvalidDestinationException if <code>topic</code> is not a     *                                     persistent destination, or     *                                     <code>name</code> is an invalid     *                                     subscription name     * @throws JMSException                if the durable consumer can't be     *                                     created     */    public synchronized void subscribe(JmsTopic topic, String name,                                       String clientID)            throws JMSException {        createInactiveDurableConsumer(topic, name, clientID);    }    /**     * Remove a durable subscription.     * <p/>     * A subscription may only be removed if the associated {@link     * DurableConsumerEndpoint} is inactive.     *     * @param name     the subscription name     * @param clientID the client identifier. May be <code>null</code>     * @throws InvalidDestinationException if an invalid subscription name is     *                                     specified.     * @throws JMSException                if the durable consumer is active, or     *                                     cannot be removed     */    public synchronized void unsubscribe(String name, String clientID)            throws JMSException {        if (_log.isDebugEnabled()) {            _log.debug("unsubscribe(name=" + name + ", clientID="                       + clientID + ")");        }        DurableConsumerEndpoint consumer                = (DurableConsumerEndpoint) _endpoints.remove(name);        if (consumer == null) {            throw new InvalidDestinationException("Durable consumer " + name                                                  + " is not defined.");        }        if (consumer.isActive()) {            throw new JMSException("Cannot remove durable consumer=" + name                                   + ": consumer is active");        }        consumer.close();        // remove it from the persistent store.        try {            _database.begin();            Connection connection = _database.getConnection();            _database.getAdapter().removeDurableConsumer(connection, name);            removeConsumerEntry(name);            _database.commit();        } catch (PersistenceException exception) {            String msg = "Failed to remove durable consumer, name=" + name;            rethrow(msg, exception);        }    }    /**     * Remove all durable subscriptions for a destination.     * <p/>     * Subscriptions may only be removed if the associated {@link     * ConsumerEndpoint}s are inactive.     *     * @param topic the topic to remove consumers for     * @throws JMSException if the subscriptions can't be removed     */    public synchronized void unsubscribe(JmsTopic topic) throws JMSException {        List list = (List) _destToConsumerMap.get(topic);        if (list != null) {            ConsumerEntry[] consumers                    = (ConsumerEntry[]) list.toArray(new ConsumerEntry[0]);            for (int i = 0; i < consumers.length; ++i) {                ConsumerEntry consumer = consumers[i];                if (consumer.isDurable()) {                    // remove the durable consumer. This operation                    // will fail if the consumer is active.                    unsubscribe(consumer.getName(), consumer.getClientID());                }            }        }        // remove all consumers for the specified destination        removeFromConsumerCache(topic);    }    /**     * Create a transient consumer for the specified destination.     *     * @param destination  the destination to consume messages from     * @param connectionId the identity of the connection that owns this     *                     consumer     * @param selector     the message selector. May be <code>null</code>     * @param noLocal      if true, and the destination is a topic, inhibits the     *                     delivery of messages published by its own connection.     *                     The behavior for <code>noLocal</code> is not     *                     specified if the destination is a queue.     * @return a new transient consumer     */    public synchronized ConsumerEndpoint createConsumer(            JmsDestination destination, long connectionId,            String selector,            boolean noLocal)            throws JMSException, InvalidSelectorException {        if (_log.isDebugEnabled()) {            _log.debug("createConsumerEndpoint(destination=" + destination                       + ", connectionId=" + connectionId                       + ", selector=" + selector                       + ", noLocal=" + noLocal + ")");        }        ConsumerEndpoint consumer = null;        // ensure that the destination is valid before proceeding        getDestination(destination, true);        long consumerId = getNextConsumerId();        try {            _database.begin();            // determine what type of consumer to create based on the destination            // it subscribes to.            if (destination instanceof JmsTopic) {                JmsTopic topic = (JmsTopic) destination;                consumer = new TopicConsumerEndpoint(consumerId, connectionId,                                                     topic, selector, noLocal,                                                     _destinations);            } else if (destination instanceof JmsQueue) {                QueueDestinationCache cache;                cache = (QueueDestinationCache) _destinations.getDestinationCache(                        destination);                consumer = new QueueConsumerEndpoint(consumerId, cache, selector);            }            if (consumer != null) {                // add it to the list of managed consumers. If it has a persistent                // identity, use that as the key, otherwise use its transient                // identity.                Object key = ConsumerEntry.getConsumerKey(consumer);                _endpoints.put(key, consumer);                addConsumerEntry(key, destination, null, false);            }            _database.commit();        } catch (Exception exception) {            rethrow("Failed to create consumer", exception);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?