topicdestinationcache.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 283 行
JAVA
283 行
/** * Redistribution and use of this software and associated documentation * ("Software"), with or without modification, are permitted provided * that the following conditions are met: * * 1. Redistributions of source code must retain copyright * statements and notices. Redistributions must also contain a * copy of this document. * * 2. Redistributions in binary form must reproduce the * above copyright notice, this list of conditions and the * following disclaimer in the documentation and/or other * materials provided with the distribution. * * 3. The name "Exolab" must not be used to endorse or promote * products derived from this Software without prior written * permission of Exoffice Technologies. For written permission, * please contact info@exolab.org. * * 4. Products derived from this Software may not be called "Exolab" * nor may "Exolab" appear in their names without prior written * permission of Exoffice Technologies. Exolab is a registered * trademark of Exoffice Technologies. * * 5. Due credit should be given to the Exolab Project * (http://www.exolab.org/). * * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. * * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id: TopicDestinationCache.java,v 1.6 2005/12/20 20:39:45 tanderson Exp $ */package org.exolab.jms.messagemgr;import java.sql.Connection;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Vector;import javax.jms.JMSException;import org.exolab.jms.client.JmsDestination;import org.exolab.jms.client.JmsTopic;import org.exolab.jms.lease.LeaseManager;import org.exolab.jms.message.MessageImpl;import org.exolab.jms.persistence.DatabaseService;import org.exolab.jms.persistence.PersistenceException;/** * A {@link DestinationCache} for topics. * * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a> * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> * @version $Revision: 1.6 $ $Date: 2005/12/20 20:39:45 $ */class TopicDestinationCache extends AbstractDestinationCache { /** * Construct a new <code>TopicDestinationCache</code>. * * @param topic the topic to cache messages for * @param database the database service * @param leases the lease manager */ public TopicDestinationCache(JmsTopic topic, DatabaseService database, LeaseManager leases) { super(topic, database, leases); } /** * Register a consumer with this cache. * * @param consumer the message consumer for this destination * @return <code>true</code> if registered; otherwise <code>false</code> */ public boolean addConsumer(ConsumerEndpoint consumer) { boolean result = false; // check to see that the consumer can actually subscribe to // this destination JmsTopic cdest = (JmsTopic) consumer.getDestination(); JmsTopic ddest = (JmsTopic) getDestination(); if (cdest.match(ddest)) { result = super.addConsumer(consumer); } return result; } /** * Invoked when the {@link MessageMgr} receives a non-persistent message. * * @param destination the message's destination * @param message the message * @throws JMSException if the listener fails to handle the message */ public void messageAdded(JmsDestination destination, MessageImpl message) throws JMSException { boolean processed = false; MessageRef reference = new CachedMessageRef(message, false, getMessageCache()); reference.reference(); // temporary reference to ensure the message has // a non-zero reference while passing it to each // of the consumers, to avoid premature // destruction addMessage(reference, message); MessageHandle handle = new SharedMessageHandle(this, reference, message); ConsumerEndpoint[] consumers = getConsumerArray(); for (int index = 0; index < consumers.length; index++) { ConsumerEndpoint consumer = consumers[index]; processed |= consumer.messageAdded(handle, message); } // create a lease iff one is required and the message has actually // been accepted by at least one endpoint if (processed) { checkMessageExpiry(reference, message); reference.dereference(); // remove temporary reference } else { // no consumer picked up the message, so toss it reference.destroy(); // @todo - inefficient. Don't really want to add the message // just to remove it again if there are no consumers for it } } /** * Invoked when the {@link MessageMgr} receives a persistent message. * * @param destination the message's destination * @param message the message * @throws JMSException if the listener fails to handle the message * @throws PersistenceException if there is a persistence related problem */ public void persistentMessageAdded(JmsDestination destination, MessageImpl message) throws JMSException, PersistenceException { boolean processed = false; MessageRef reference = new CachedMessageRef(message, true, getMessageCache()); reference.reference(); // temporary reference to ensure the message has // a non-zero reference while passing it to each // of the consumers, to avoid premature // destruction addMessage(reference, message); SharedMessageHandle handle = new SharedMessageHandle(this, reference, message); // now send the message to all active consumers ConsumerEndpoint[] consumers = getConsumerArray(); for (int index = 0; index < consumers.length; index++) { ConsumerEndpoint consumer = consumers[index]; processed |= consumer.persistentMessageAdded(handle, message); } // for each inactive durable consumer, add a persistent handle // @todo - possible race condition between inactive subscription // becoming active again - potential for message loss?/* JmsTopic topic = (JmsTopic) getDestination(); List inactive = _consumers.getInactiveSubscriptions( topic); if (!inactive.isEmpty()) { Iterator iterator = inactive.iterator(); while (iterator.hasNext()) { String name = (String) iterator.next(); TopicConsumerMessageHandle durable = new TopicConsumerMessageHandle(handle, name); durable.add(connection); } processed = true; }*/ // create a lease iff one is required and the message has actually // been accepted by at least one endpoint if (processed) { checkMessageExpiry(reference, message); reference.dereference(); // remove temporary reference } else { // no consumer picked up the message, so toss it handle.destroy(); // @todo - inefficient. Don't really want to make the message // persistent, just to remove it again if there are no consumers // for it } } /** * Return a message handle back to the cache, to recover unsent or * unacknowledged messages. * * @param handle the message handle to return */ public void returnMessageHandle(MessageHandle handle) { long consumerId = handle.getConsumerId(); AbstractTopicConsumerEndpoint endpoint = (AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId); // if the endpoint is still active then return the message // back to it if (endpoint != null) { endpoint.returnMessage(handle); } else { // @todo - need to destroy the handle? } } /** * Load the state of a durable consumer. * * @param name the durable subscription name * @return a list of {@link MessageHandle} instances * @throws JMSException for any JMS error */ public List getDurableMessageHandles(String name) throws JMSException, PersistenceException { DatabaseService service = DatabaseService.getInstance(); Connection connection = service.getConnection(); Vector handles = service.getAdapter().getMessageHandles( connection, getDestination(), name); List result = new ArrayList(handles.size()); MessageCache cache = getMessageCache(); Iterator iterator = handles.iterator(); while (iterator.hasNext()) { PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next(); String messageId = handle.getMessageId(); MessageRef reference = cache.getMessageRef(messageId); if (reference == null) { reference = new CachedMessageRef(messageId, true, cache); } cache.addMessageRef(reference); handle.reference(reference); handle.setDestinationCache(this); result.add(handle); checkMessageExpiry(reference, handle.getExpiryTime()); } return result; } /** * Remove an expired persistent message, and notify any listeners. * * @param reference a handle to the expired message * @throws JMSException if a listener fails to handle the * expiration * @throws PersistenceException if there is a persistence related problem */ protected void persistentMessageExpired(MessageRef reference) throws JMSException, PersistenceException { String messageId = reference.getMessageId(); ConsumerEndpoint[] consumers = getConsumerArray(); for (int i = 0; i < consumers.length; ++i) { consumers[i].persistentMessageRemoved(messageId); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?