📄 messageleasehelper.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
*/
package org.exolab.jms.messagemgr;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Vector;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.exolab.jms.lease.BaseLease;
import org.exolab.jms.lease.LeaseEventListenerIfc;
import org.exolab.jms.lease.LeaseManager;
import org.exolab.jms.lease.MessageLease;
import org.exolab.jms.message.MessageHandle;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.persistence.DatabaseService;
import org.exolab.jms.persistence.PersistenceException;
import org.exolab.jms.persistence.SQLHelper;
/**
* This is a helper class for registering leases for messages with
* LeaseManager. The lease is based on the JMSExpiration property of the
* message. <br>
* When the lease expires, the listener's onLeaseExpired() method is invoked
* with a MessageHandle object passed as the argument. <br>
* If JMSExpiration is 0, the message never expires. <br>
*
* @version $Revision: 1.16 $ $Date: 2003/08/17 01:32:24 $
* @author <a href="mailto:tima@intalio.com">Tim Anderson</a>
* @see MessageHandle
* @see LeaseManager
* @see LeaseEventListenerIfc
*/
public class MessageLeaseHelper implements LeaseEventListenerIfc {
/**
* The listener to notify when a lease expires
*/
private LeaseEventListenerIfc _listener;
/**
* A map of MessageId -> MessageLease objects, representing
* the active leases
*/
private HashMap _leases = new HashMap();
/**
* A reference to the LeaseManager
*/
private LeaseManager _leaseMgr;
/**
* The logger
*/
private static final Log _log =
LogFactory.getLog(MessageLeaseHelper.class);
/**
* Construct a helper for the specified destination cache.
*
* @param listener the object to notify when a lease expires
* @throws PersistenceException if the destination is administered
* and the set of non-expired messages cannot be determined
*/
public MessageLeaseHelper(DestinationCache listener)
throws PersistenceException {
_listener = listener;
_leaseMgr = LeaseManager.instance();
Connection connection = null;
try {
connection = DatabaseService.getConnection();
// initialise with the specified connection
init(listener, connection);
// commit the transactions
connection.commit();
} catch (PersistenceException exception) {
SQLHelper.rollback(connection);
throw exception;
} catch (SQLException exception) {
SQLHelper.rollback(connection);
throw new PersistenceException(exception);
} finally {
SQLHelper.close(connection);
}
}
/**
* Construct a helper for the specified destination cache.
* <p>
* This method is only called during cache init time and is used to
* retrieve persistent messages with leases.
*
* @param connection the connection to use to retrieve persistent messages
* with leases
* @param listener the object to notify when a lease expires
* @throws PersistenceException if the destination is administered
* and the set of non-expired messages cannot be determined
*/
public MessageLeaseHelper(Connection connection, DestinationCache listener)
throws PersistenceException {
_listener = listener;
_leaseMgr = LeaseManager.instance();
init(listener, connection);
}
/**
* Add a lease for message to notify listener when message expires.
* The lease uses JMSExpiration property of the message. If this
* is unset or 0, then no lease is registered. If non-zero, a
* MessageHandle object is registered with LeaseManager, and
* the listener will be notified with this object when the lease expires.
*
* @param message the message to add a lease for
*/
public void addLease(MessageImpl message) {
synchronized (_leases) {
// ensure that a lease for this message does not already exist
if (!_leases.containsKey(message.getMessageId())) {
try {
long expiry = message.getJMSExpiration();
if (expiry > 0) {
// create an associated message handle and use it to
// create a
// lease for the message
MessageHandle handle =
MessageHandleFactory.getHandle(message);
long duration = expiry - System.currentTimeMillis();
MessageLease lease = _leaseMgr.createMessageLease(
handle, (duration <= 0 ? 1 : duration), this);
_leases.put(handle.getMessageId(), lease);
}
} catch (JMSException exception) {
_log.error("Failed to create lease", exception);
}
}
}
}
/**
* Add a lease for the handle to notify listener when message expires.
* The lease uses JMSExpiration property of the message. If this
* is unset or 0, then no lease is registered. If non-zero, a
* MessageHandle object is registered with LeaseManager, and
* the listener will be notified with this object when the lease expires.
*
* @param handle message handle to add
*/
public void addLease(MessageHandle handle) {
synchronized (_leases) {
// ensure that a lease for this message does not already exist.
if (!_leases.containsKey(handle.getMessageId())) {
long expiry = handle.getExpiryTime();
if (expiry != 0) {
long duration = expiry - System.currentTimeMillis();
MessageLease lease = _leaseMgr.createMessageLease(
handle, (duration <= 0 ? 1 : duration), this);
_leases.put(handle.getMessageId(), lease);
}
}
}
}
/**
* Remove a lease for a message
*
* @param message the message to remove the lease for
*/
public void removeLease(MessageImpl message) {
try {
MessageHandle handle = MessageHandleFactory.getHandle(message);
if (handle != null) {
synchronized (_leases) {
_leases.remove(handle.getMessageId());
}
}
} catch (JMSException exception) {
_log.error("Failed to remove lease", exception);
}
}
/**
* Clears all leases
*/
public void clear() {
Object[] leases = null;
synchronized (_leases) {
leases = _leases.values().toArray();
_leases.clear();
}
for (int i = 0; i < leases.length; ++i) {
BaseLease lease = (BaseLease) leases[i];
_leaseMgr.removeLease(lease);
}
}
/**
* Invoked when a lease has expired.
* <br>
* It removes the local lease, and notifies the listener.
*
* @param handle An instance of MessageHandle
*/
public void onLeaseExpired(Object handle) {
synchronized (_leases) {
_leases.remove(((MessageHandle) handle).getMessageId());
}
_listener.onLeaseExpired(handle);
}
/**
* This method is used for all common initialization code. It basically
* retrieves all non-expired messages and places them in the memory
*
* @param listener the cache listening for expired leases.
* @param connection the persistent connection to use
* @throws PersistenceException if the destination is administered
* and the set of non-expired messages cannot be determined
*/
protected void init(DestinationCache listener, Connection connection)
throws PersistenceException {
// if the destination is administered then retrieve a list of
// nonexpired messages for this destination and add them to the
// lease manager
if (DatabaseService.getAdapter().checkDestination(
connection, listener.getDestinationByName())) {
Vector handles =
DatabaseService.getAdapter().getNonExpiredMessages(
connection, listener.getDestination());
if (handles != null) {
Enumeration iter = handles.elements();
while (iter.hasMoreElements()) {
PersistentMessageHandle handle =
(PersistentMessageHandle) iter.nextElement();
addLease(handle);
}
}
}
}
} //-- MessageLeaseHelper
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -