⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topicdestinationcache.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
 *
 * $Id: TopicDestinationCache.java,v 1.3 2005/05/13 12:57:02 tanderson Exp $
 */
package org.exolab.jms.messagemgr;

import java.sql.Connection;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.ArrayList;
import javax.jms.JMSException;

import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.persistence.PersistenceException;
import org.exolab.jms.persistence.DatabaseService;


/**
 * A {@link DestinationCache} for topics.
 *
 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
 * @version $Revision: 1.3 $ $Date: 2005/05/13 12:57:02 $
 */
class TopicDestinationCache extends AbstractDestinationCache {

    /**
     * Construct a new <code>TopicDestinationCache</code> for a non-persistent
     * topic.
     *
     * @param topic the topic to cache messages for
     */
    public TopicDestinationCache(JmsTopic topic)  {
        super(topic);
    }

    /**
     * Construct a new <code>TopicDestinationCache</code> for a persistent
     * topic.
     *
     * @param topic      the topic to cache messages for
     * @param connection the database connection
     * @throws JMSException for any JMS error
     * @throws PersistenceException for any persistence error
     */
    public TopicDestinationCache(JmsTopic topic, Connection connection)
            throws JMSException, PersistenceException {
        super(topic, connection);
    }

    /**
     * Register a consumer with this cache.
     *
     * @param consumer the message consumer for this destination
     * @return <code>true</code> if registered; otherwise <code>false</code>
     */
    public boolean addConsumer(ConsumerEndpoint consumer) {

        boolean result = false;

        // check to see that the consumer can actually subscribe to
        // this destination
        JmsTopic cdest = (JmsTopic) consumer.getDestination();
        JmsTopic ddest = (JmsTopic) getDestination();

        if (cdest.match(ddest)) {
            result = super.addConsumer(consumer);
        }

        return result;
    }

    /**
     * Invoked when the {@link MessageMgr} receives a non-persistent message.
     *
     * @param destination the message's destination
     * @param message     the message
     * @throws JMSException if the listener fails to handle the message
     */
    public void messageAdded(JmsDestination destination, MessageImpl message)
            throws JMSException {
        boolean processed = false;
        MessageRef reference =
                new CachedMessageRef(message, false, getMessageCache());
        addMessage(reference, message);
        MessageHandle handle = new SharedMessageHandle(reference, message);

        ConsumerEndpoint[] consumers = getConsumerArray();
        for (int index = 0; index < consumers.length; index++) {
            ConsumerEndpoint consumer = consumers[index];
            processed |= consumer.messageAdded(handle, message);
        }

        // create a lease iff one is required and the message has actually
        // been accepted by at least one endpoint
        if (processed) {
            checkMessageExpiry(reference, message);
        } else {
            // no consumer picked up the message, so toss it
            reference.destroy();
            // @todo - inefficient. Don't really want to add the message
            // just to remove it again if there are no consumers for it
        }
    }

    /**
     * Invoked when the {@link MessageMgr} receives a persistent message.
     *
     * @param connection  the database connection
     * @param destination the message's destination
     * @param message     the message
     * @throws JMSException         if the listener fails to handle the message
     * @throws PersistenceException if there is a persistence related problem
     */
    public void persistentMessageAdded(Connection connection,
                                       JmsDestination destination,
                                       MessageImpl message)
            throws JMSException, PersistenceException {
        boolean processed = false;
        MessageRef reference = new CachedMessageRef(message, true, getMessageCache());
        addMessage(reference, message);
        SharedMessageHandle handle = new SharedMessageHandle(reference, message);

        // now send the message to all active consumers
        ConsumerEndpoint[] consumers = getConsumerArray();
        for (int index = 0; index < consumers.length; index++) {
            ConsumerEndpoint consumer = consumers[index];
            processed |= consumer.persistentMessageAdded(handle, message, connection);
        }

        // for each inactive durable consumer, add a persistent handle
        // @todo - possible race condition between inactive subscription
        // becoming active again - potential for message loss?
        JmsTopic topic = (JmsTopic) getDestination();
        List inactive = ConsumerManager.instance().getInactiveSubscriptions(
                topic);
        if (!inactive.isEmpty()) {
            Iterator iterator = inactive.iterator();
            while (iterator.hasNext()) {
                String name = (String) iterator.next();
                TopicConsumerMessageHandle durable
                        = new TopicConsumerMessageHandle(handle, name);
                durable.add(connection);
            }
            processed = true;
        }

        // create a lease iff one is required and the message has actually
        // been accepted by at least one endpoint
        if (processed) {
            checkMessageExpiry(reference, message);
        } else {
            // no consumer picked up the message, so toss it
            handle.destroy(connection);
            // @todo - inefficient. Don't really want to make the message
            // persistent, just to remove it again if there are no consumers
            // for it
        }

    }

    /**
     * Return a message handle back to the cache, to recover unsent or
     * unacknowledged messages.
     *
     * @param handle the message handle to return
     */
    public void returnMessageHandle(MessageHandle handle) {
        long consumerId = handle.getConsumerId();
        AbstractTopicConsumerEndpoint endpoint =
                (AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId);
        // if the endpoint is still active then return the message
        // back to it
        if (endpoint != null) {
            endpoint.returnMessage(handle);
        } else {
            // @todo - need to destroy the handle?
            // what about for inactive durable consumers -
            // could this inadvertently trash the message?
        }

    }

    /**
     * Load the state of a durable consumer.
     *
     * @param name the durable subscription name
     * @param connection the database connection to use
     * @return a list of {@link MessageHandle} instances
     * @throws JMSException for any JMS error
     * @throws PersistenceException for any persistence error
     */
    public List getDurableMessageHandles(String name, Connection connection)
            throws JMSException, PersistenceException {
        Vector handles = DatabaseService.getAdapter().getMessageHandles(
                connection, getDestination(), name);
        List result = new ArrayList(handles.size());

        MessageCache cache = getMessageCache();

        Iterator iterator = handles.iterator();
        while (iterator.hasNext()) {
            PersistentMessageHandle handle =
                    (PersistentMessageHandle) iterator.next();
            String messageId = handle.getMessageId();
            MessageRef reference = cache.getMessageRef(messageId);
            if (reference == null) {
                reference = new CachedMessageRef(messageId, true, cache);
            }
            cache.addMessageRef(reference);
            handle.reference(reference);
            result.add(handle);

            checkMessageExpiry(reference, handle.getExpiryTime());
        }
        return result;
    }

    /**
     * Initialise the cache from the database.
     *
     * @param connection the database connection to use
     * @throws JMSException         for any JMS error
     * @throws PersistenceException for any persistence error
     */
    protected void init(Connection connection) throws JMSException,
            PersistenceException {
        // no-op
    }

    /**
     * Remove an expired persistent message, and notify any listeners.
     *
     * @param reference     a handle to the expired message
     * @param connection the database connection
     * @throws JMSException         if a listener fails to handle the
     *                              expiration
     * @throws PersistenceException if there is a persistence related problem
     */
    protected void persistentMessageExpired(MessageRef reference, Connection connection)
            throws JMSException, PersistenceException {
        String messageId = reference.getMessageId();
        ConsumerEndpoint[] consumers = getConsumerArray();

        for (int i = 0; i < consumers.length; ++i) {
            consumers[i].persistentMessageRemoved(messageId, connection);
        }

        // since it is a persistent message, need to handle inactive
        // durable consumers subscribing to the destination
        List inactive = ConsumerManager.instance().getInactiveSubscriptions(
                (JmsTopic) getDestination());
        Iterator iterator = inactive.iterator();
        while (iterator.hasNext()) {
            String name = (String) iterator.next();
            // @todo
/*
            TopicConsumerMessageHandle sub = new TopicConsumerMessageHandle(handle, name);
            sub.destroy(connection);
*/
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -