📄 topicdestinationcache.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
*
* $Id: TopicDestinationCache.java,v 1.3 2005/05/13 12:57:02 tanderson Exp $
*/
package org.exolab.jms.messagemgr;
import java.sql.Connection;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.ArrayList;
import javax.jms.JMSException;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.persistence.PersistenceException;
import org.exolab.jms.persistence.DatabaseService;
/**
* A {@link DestinationCache} for topics.
*
* @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
* @version $Revision: 1.3 $ $Date: 2005/05/13 12:57:02 $
*/
class TopicDestinationCache extends AbstractDestinationCache {
/**
* Construct a new <code>TopicDestinationCache</code> for a non-persistent
* topic.
*
* @param topic the topic to cache messages for
*/
public TopicDestinationCache(JmsTopic topic) {
super(topic);
}
/**
* Construct a new <code>TopicDestinationCache</code> for a persistent
* topic.
*
* @param topic the topic to cache messages for
* @param connection the database connection
* @throws JMSException for any JMS error
* @throws PersistenceException for any persistence error
*/
public TopicDestinationCache(JmsTopic topic, Connection connection)
throws JMSException, PersistenceException {
super(topic, connection);
}
/**
* Register a consumer with this cache.
*
* @param consumer the message consumer for this destination
* @return <code>true</code> if registered; otherwise <code>false</code>
*/
public boolean addConsumer(ConsumerEndpoint consumer) {
boolean result = false;
// check to see that the consumer can actually subscribe to
// this destination
JmsTopic cdest = (JmsTopic) consumer.getDestination();
JmsTopic ddest = (JmsTopic) getDestination();
if (cdest.match(ddest)) {
result = super.addConsumer(consumer);
}
return result;
}
/**
* Invoked when the {@link MessageMgr} receives a non-persistent message.
*
* @param destination the message's destination
* @param message the message
* @throws JMSException if the listener fails to handle the message
*/
public void messageAdded(JmsDestination destination, MessageImpl message)
throws JMSException {
boolean processed = false;
MessageRef reference =
new CachedMessageRef(message, false, getMessageCache());
addMessage(reference, message);
MessageHandle handle = new SharedMessageHandle(reference, message);
ConsumerEndpoint[] consumers = getConsumerArray();
for (int index = 0; index < consumers.length; index++) {
ConsumerEndpoint consumer = consumers[index];
processed |= consumer.messageAdded(handle, message);
}
// create a lease iff one is required and the message has actually
// been accepted by at least one endpoint
if (processed) {
checkMessageExpiry(reference, message);
} else {
// no consumer picked up the message, so toss it
reference.destroy();
// @todo - inefficient. Don't really want to add the message
// just to remove it again if there are no consumers for it
}
}
/**
* Invoked when the {@link MessageMgr} receives a persistent message.
*
* @param connection the database connection
* @param destination the message's destination
* @param message the message
* @throws JMSException if the listener fails to handle the message
* @throws PersistenceException if there is a persistence related problem
*/
public void persistentMessageAdded(Connection connection,
JmsDestination destination,
MessageImpl message)
throws JMSException, PersistenceException {
boolean processed = false;
MessageRef reference = new CachedMessageRef(message, true, getMessageCache());
addMessage(reference, message);
SharedMessageHandle handle = new SharedMessageHandle(reference, message);
// now send the message to all active consumers
ConsumerEndpoint[] consumers = getConsumerArray();
for (int index = 0; index < consumers.length; index++) {
ConsumerEndpoint consumer = consumers[index];
processed |= consumer.persistentMessageAdded(handle, message, connection);
}
// for each inactive durable consumer, add a persistent handle
// @todo - possible race condition between inactive subscription
// becoming active again - potential for message loss?
JmsTopic topic = (JmsTopic) getDestination();
List inactive = ConsumerManager.instance().getInactiveSubscriptions(
topic);
if (!inactive.isEmpty()) {
Iterator iterator = inactive.iterator();
while (iterator.hasNext()) {
String name = (String) iterator.next();
TopicConsumerMessageHandle durable
= new TopicConsumerMessageHandle(handle, name);
durable.add(connection);
}
processed = true;
}
// create a lease iff one is required and the message has actually
// been accepted by at least one endpoint
if (processed) {
checkMessageExpiry(reference, message);
} else {
// no consumer picked up the message, so toss it
handle.destroy(connection);
// @todo - inefficient. Don't really want to make the message
// persistent, just to remove it again if there are no consumers
// for it
}
}
/**
* Return a message handle back to the cache, to recover unsent or
* unacknowledged messages.
*
* @param handle the message handle to return
*/
public void returnMessageHandle(MessageHandle handle) {
long consumerId = handle.getConsumerId();
AbstractTopicConsumerEndpoint endpoint =
(AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId);
// if the endpoint is still active then return the message
// back to it
if (endpoint != null) {
endpoint.returnMessage(handle);
} else {
// @todo - need to destroy the handle?
// what about for inactive durable consumers -
// could this inadvertently trash the message?
}
}
/**
* Load the state of a durable consumer.
*
* @param name the durable subscription name
* @param connection the database connection to use
* @return a list of {@link MessageHandle} instances
* @throws JMSException for any JMS error
* @throws PersistenceException for any persistence error
*/
public List getDurableMessageHandles(String name, Connection connection)
throws JMSException, PersistenceException {
Vector handles = DatabaseService.getAdapter().getMessageHandles(
connection, getDestination(), name);
List result = new ArrayList(handles.size());
MessageCache cache = getMessageCache();
Iterator iterator = handles.iterator();
while (iterator.hasNext()) {
PersistentMessageHandle handle =
(PersistentMessageHandle) iterator.next();
String messageId = handle.getMessageId();
MessageRef reference = cache.getMessageRef(messageId);
if (reference == null) {
reference = new CachedMessageRef(messageId, true, cache);
}
cache.addMessageRef(reference);
handle.reference(reference);
result.add(handle);
checkMessageExpiry(reference, handle.getExpiryTime());
}
return result;
}
/**
* Initialise the cache from the database.
*
* @param connection the database connection to use
* @throws JMSException for any JMS error
* @throws PersistenceException for any persistence error
*/
protected void init(Connection connection) throws JMSException,
PersistenceException {
// no-op
}
/**
* Remove an expired persistent message, and notify any listeners.
*
* @param reference a handle to the expired message
* @param connection the database connection
* @throws JMSException if a listener fails to handle the
* expiration
* @throws PersistenceException if there is a persistence related problem
*/
protected void persistentMessageExpired(MessageRef reference, Connection connection)
throws JMSException, PersistenceException {
String messageId = reference.getMessageId();
ConsumerEndpoint[] consumers = getConsumerArray();
for (int i = 0; i < consumers.length; ++i) {
consumers[i].persistentMessageRemoved(messageId, connection);
}
// since it is a persistent message, need to handle inactive
// durable consumers subscribing to the destination
List inactive = ConsumerManager.instance().getInactiveSubscriptions(
(JmsTopic) getDestination());
Iterator iterator = inactive.iterator();
while (iterator.hasNext()) {
String name = (String) iterator.next();
// @todo
/*
TopicConsumerMessageHandle sub = new TopicConsumerMessageHandle(handle, name);
sub.destroy(connection);
*/
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -