📄 queuedestinationcache.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
*
* $Id: QueueDestinationCache.java,v 1.34.2.1 2004/05/01 12:05:26 tanderson Exp $
*
* Date Author Changes
* 3/1/2001 jima Created
*/
package org.exolab.jms.messagemgr;
import java.sql.Connection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import javax.jms.JMSException;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsQueue;
import org.exolab.jms.client.JmsTemporaryDestination;
import org.exolab.jms.message.MessageHandle;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.persistence.DatabaseService;
import org.exolab.jms.persistence.PersistenceException;
import org.exolab.jms.selector.Selector;
import org.exolab.jms.server.JmsServerConnectionManager;
/**
* A DestinationCache for Queues
*
* @version $Revision: 1.34.2.1 $ $Date: 2004/05/01 12:05:26 $
* @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
*/
public class QueueDestinationCache
extends DestinationCache {
/**
* Maintains a list of queue listeners for this cache
*/
protected List _queueListeners =
Collections.synchronizedList(new LinkedList());
/**
* Underlying destination
*/
private JmsQueue _queue = null;
/**
* Index of the last listener that received a message from this
* destination. If multiple listeners are attached to this queue then
* messages will be sent to each in a round robin fashion
*/
private int _lastConsumerIndex = 0;
/**
* Tracks the number of messages added to the destination cache
*/
private long _publishCount;
/**
* Tracks the number of messages consumed from the destination cache
*/
private long _consumeCount;
/**
* The logger
*/
private static final Log _log = LogFactory.getLog(
QueueDestinationCache.class);
/**
* Construct a message cache for a queue destination. This cache will
* receive all messages published under the specified destination.
* <p>
* the constructor will also attempt to load any persistent messages
* from the database.
* <p>
*
* @param destination the queue
* @throws FailedToInitializeException
*/
QueueDestinationCache(JmsQueue destination)
throws FailedToInitializeException {
_queue = destination;
// call init on the base class
init();
if (DestinationManager.instance().isAdministeredDestination(destination.getName())) {
// call the persistence adapter to determine if we have unacknowledged
// messages for this queue. The persistent handles are keyed on the
// queue name
Connection connection = null;
TransactionManager tm = null;
try {
connection = DatabaseService.getConnection();
// initialise the cache
init(destination, connection);
// commit the work
connection.commit();
} catch (PersistenceException exception) {
if (connection != null) {
try {
connection.rollback();
} catch (Exception nested) {
// ignore
}
}
throw new FailedToInitializeException(
"QueueDestinationCache init failed " + exception);
} catch (Exception exception) {
// rethrow as a JMSException
throw new FailedToInitializeException(
"QueueDestinationCache init failed " + exception);
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception nested) {
// ignore
}
}
}
}
}
/**
* Construct a message cache for a queue destination using the specified
* database connection. This cache will receive all messages published
* under the destination.
* <p>
* the constructor will also attempt to load any persistent messages
* from the database using the specified connection
* <p>
* If there is any problem during construction a FailedToInitializeException
* will be raised.
*
* @param destination - the queue
* @throws FialedToInitializeException
*/
QueueDestinationCache(Connection connection, JmsQueue destination)
throws FailedToInitializeException {
super();
_queue = destination;
// call init on the base class
init(connection);
if (DestinationManager.instance().isAdministeredDestination(destination.getName())) {
// call the persistence adapter to determine if we have unacknowledged
// messages for this queue. The persistent handles are keyed on the
// queue name
try {
// initialise the cache
init(destination, connection);
} catch (Exception exception) {
// rethrow as a JMSException
throw new FailedToInitializeException(
"QueueDestinationCache init failed " + exception);
}
}
}
/**
* This common method is used to help initialise the cache. It basically
* removes all the expired messages and then retrieves all unacked messages
* from the database and stores them locally.
* <p>
* It will throw a PersistenceException if there is a database related
* problem.
*
* @param destination - the queue
* @param connection - the database connection to use
* @throws PersistenceException
*/
void init(JmsQueue destination, Connection connection)
throws PersistenceException {
DatabaseService.getAdapter().removeExpiredMessageHandles(
connection, destination.getName());
Vector handles = DatabaseService.getAdapter().getMessageHandles(
connection, destination, destination.getName());
if (handles != null) {
Enumeration iter = handles.elements();
while (iter.hasMoreElements()) {
addMessage((MessageHandle) iter.nextElement());
}
}
}
// implementation of DestinationCache.getDestination
public JmsDestination getDestination() {
return _queue;
}
/**
* A Queue can also hav a queue listener, which simply gets informed of all
* messages that arrive at this destination
*
* @param listener - queue listener
*/
public void addQueueListener(QueueListener listener) {
// add if not present
if (!_queueListeners.contains(listener)) {
_queueListeners.add(listener);
}
}
/**
* Remove the queue listener associated with this cache
*
* @param listener - queue listener to remove
*/
public void removeQueueListener(QueueListener listener) {
// add if not present
if (_queueListeners.contains(listener)) {
_queueListeners.remove(listener);
}
}
// implementation of MessageMgr.messageAdded
public boolean messageAdded(JmsDestination destination,
MessageImpl message) {
boolean processed = false;
if ((destination != null) &&
(message != null)) {
// check that the message is for this queue
if (destination.equals(_queue)) {
// create a handle for the message
try {
MessageHandle handle =
MessageHandleFactory.createHandle(this, message);
// all messages are added to this queue. Receivers will
// then pick messages of it as required.
addMessage(handle, message);
// update the publishedCount
_publishCount++;
// if we have any registered consumers then we need to
// send the message to one of them first. If none are
// registered then cache it.
QueueConsumerEndpoint endpoint =
getEndpointForMessage(message);
if (endpoint != null) {
endpoint.messageAdded(message);
}
// notify any queue listeners that a message has arrived
notifyQueueListeners(message);
// create a lease iff one is required
checkMessageExpiry(message);
// check the message as processed
processed = true;
} catch (JMSException exception) {
_log.error("Failed to add message", exception);
}
} else {
// need to notify someone or something that we are
// dropping messages. Do we throw an exception
_log.error("Dropping message " + message.getMessageId()
+ " for destination " + destination.getName());
}
}
return processed;
}
/**
* This method is called when the {@link MessageMgr} removes a message
* from the cache.
*
* @param destination the message destination
* @param message the message removed from cache
*/
public void messageRemoved(JmsDestination destination,
MessageImpl message) {
if ((destination != null) &&
(message != null)) {
try {
MessageHandle handle =
MessageHandleFactory.getHandle(this, message);
// call remove regardless whether it exists
if (destination.equals(_queue)) {
removeMessage(handle);
notifyOnRemoveMessage(message);
handle.destroy();
}
} catch (JMSException exception) {
_log.error("Failed to remove message", exception);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -