📄 messagehandles.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
*/
package org.exolab.jms.persistence;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.message.MessageId;
import org.exolab.jms.messagemgr.PersistentMessageHandle;
/**
* This class provides persistency for PersistentMessageHandle objects
* in an RDBMS database
*
* @version $Revision: 1.30 $ $Date: 2004/01/08 05:55:07 $
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
* @see org.exolab.jms.persistence.RDBMSAdapter
*/
class MessageHandles {
/**
* prepared statement for inserting a message handle
*/
private static final String INSERT_MSG_HANDLE_STMT =
"insert into message_handles values " + "(?,?,?,?,?,?,?,?)";
/**
* prepared statements for deleting message handle
*/
private static final String DELETE_MSG_HANDLE_STMT1 =
"delete from message_handles where messageId=? and consumerId=?";
private static final String DELETE_MSG_HANDLE_STMT2 =
"delete from message_handles where messageId=? and destinationId=? " +
"and consumerId=?";
/**
* Delete all message handles with the specified message id
*/
private static final String DELETE_MSG_HANDLES_STMT =
"delete from message_handles where messageId=?";
/**
* determine the number of handles with the specified message identity
*/
private static final String GET_MSG_HANDLE_COUNT =
"select count(messageId) from message_handles where messageId=?";
/**
* Delete a message from the message table
*/
private static final String DELETE_MESSAGE =
"delete from messages where messageId=?";
/**
* Update a row in the message handles table
*/
private static final String UPDATE_MSG_HANDLE_STMT =
"update message_handles set delivered=? where messageId=? and " +
"destinationId=? and consumerId=?";
/**
* Delete all message handles for a destination
*/
private static final String DELETE_MSG_HANDLES_FOR_DEST =
"delete from message_handles where destinationId=?";
/**
* Retrieve all message handles for a particular consumer
*/
private static final String GET_MSG_HANDLES_FOR_DEST =
"select * from message_handles where consumerId=? order by " +
"acceptedTime asc";
/**
* Retrieve a range of message handles between the specified times
*/
private static final String GET_MESSAGE_HANDLES_IN_RANGE =
"select distinct messageId from message_handles where " +
" acceptedTime >= ? and acceptedTime <=?";
/**
* Retrieve a handle with the specified id
*/
private static final String GET_MESSAGE_HANDLE_WITH_ID =
"select distinct messageId from message_handles where messageId=?";
/**
* Return the number of messages and a specified destination and cousmer
*/
private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
"select count(messageId) from message_handles where destinationId=? " +
"and consumerId=?";
/**
* Return the number of messages and a specified consumer
*/
private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
"select count(messageId) from message_handles where consumerId=?";
/**
* Delete all expired messages
*/
private static final String DELETE_EXPIRED_MESSAGES =
"delete from message_handles where consumerId=? and expiryTime != 0 " +
"and expiryTime<?";
/**
* Singleton to this class
*/
private static MessageHandles _instance;
/**
* Cache the transaction retry count
*/
private static int _retryCount = 10;
/**
* Cache the retry interval in milliseconds
*/
private static long _retryInterval = 50;
/**
* Used to ensure that only one thread initialises the class
*/
private static final Object _block = new Object();
/**
* The logger
*/
private static final Log _log = LogFactory.getLog(MessageHandles.class);
/**
* Returns the singleton instance.
*
* Note that initialise() must have been invoked first for this
* to return a valid instance.
*
* @return MessageHandles
*/
public static MessageHandles instance() {
return _instance;
}
/**
* Constructor
*/
protected MessageHandles() {
}
/**
* Initialise the singleton _instance
*
* @return MessageHandles
*/
public static MessageHandles initialise() {
if (_instance == null) {
synchronized (_block) {
if (_instance == null) {
_instance = new MessageHandles();
}
}
}
return _instance;
}
/**
* Add the specified message handle to the database
*
* @param connection - the connection to use
* @param handle - message handle to add
* @throws PersistenceException - if add does not complete
*/
public void addMessageHandle(Connection connection,
PersistentMessageHandle handle)
throws PersistenceException {
if (_log.isDebugEnabled()) {
_log.debug("addMessageHandle(handle=[consumer="
+ handle.getConsumerName()
+ ", destination=" + handle.getDestination()
+ ", id=" + handle.getMessageId().getId() + "])");
}
PreparedStatement insert = null;
try {
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(
handle.getDestination().getName());
if (destinationId == 0) {
throw new PersistenceException(
"Cannot add message handle id=" + handle.getMessageId() +
" for destination=" + handle.getDestination().getName() +
" and consumer=" + handle.getConsumerName() +
" since the destination cannot be mapped to an id");
}
// map the consumer name ot an identity
long consumerId = Consumers.instance().getConsumerId(
handle.getConsumerName());
if (consumerId == 0) {
throw new PersistenceException(
"Cannot add message handle id=" + handle.getMessageId() +
" for destination=" + handle.getDestination().getName() +
" and consumer=" + handle.getConsumerName() +
" since the consumer cannot be mapped to an id");
}
insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
insert.setString(1, handle.getMessageId().getId());
insert.setLong(2, destinationId);
insert.setLong(3, consumerId);
insert.setInt(4, handle.getPriority());
insert.setLong(5, handle.getAcceptedTime());
insert.setLong(6, handle.getSequenceNumber());
insert.setLong(7, handle.getExpiryTime());
insert.setInt(8, (handle.getDelivered()) ? 1 : 0);
// execute the insert
if (insert.executeUpdate() != 1) {
_log.error(
"Failed to execute addMessageHandle for handle="
+ handle.getMessageId().getId() + ", destination Id="
+ destinationId);
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to add message handle=" +
handle, exception);
} finally {
SQLHelper.close(insert);
}
}
/**
* Remove the specified message handle from the database. Once the handle
* has been removed check to see whether there are any more message handles
* referencing the same message. If there are not then remove the
* corresponding message from the messages tables.
*
* @param connection - the connection to use
* @param handle - the handle to remove
* @throws PersistenceException - sql releated exception
*/
public void removeMessageHandle(Connection connection,
PersistentMessageHandle handle)
throws PersistenceException {
if (_log.isDebugEnabled()) {
_log.debug("removeMessageHandle(handle=[consumer="
+ handle.getConsumerName()
+ ", destination=" + handle.getDestination()
+ ", id=" + handle.getMessageId().getId() + "])");
}
PreparedStatement delete = null;
PreparedStatement select = null;
ResultSet rs = null;
try {
// first check to see that the consumer exists and only
// proceed if it non-zero.
long consumerId = Consumers.instance().getConsumerId(
handle.getConsumerName());
if (consumerId != 0) {
// get the message id
String id = handle.getMessageId().getId();
// map the destination name to an actual identity. If it is
// null then the destination does not currently exist but we
// may need to delete orphaned handles
long destinationId = Destinations.instance().getId(
handle.getDestination().getName());
if (destinationId == 0) {
delete = connection.prepareStatement(
DELETE_MSG_HANDLE_STMT1);
delete.setString(1, id);
delete.setLong(2, consumerId);
} else {
delete = connection.prepareStatement(
DELETE_MSG_HANDLE_STMT2);
delete.setString(1, id);
delete.setLong(2, destinationId);
delete.setLong(3, consumerId);
}
// execute the delete
if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
// only log if the message hasn't been garbage
// collected
_log.error("Failed to execute removeMessageHandle for "
+ "handle=" + id + " destination id="
+ destinationId + " consumer id=" + consumerId);
}
// if there are no more handles with the specified message id
// then delete the corresponding message from the message table
select = connection.prepareStatement(GET_MSG_HANDLE_COUNT);
select.setString(1, id);
rs = select.executeQuery();
if (rs.next() && (rs.getInt(1) == 0)) {
delete.close();
delete = connection.prepareStatement(DELETE_MESSAGE);
delete.setString(1, id);
if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
// can get 2 durable consumers trying to do this
// simultaneously, so don't log an error
// See bug 819212.
if (_log.isDebugEnabled()) {
_log.debug(
"Failed to delete the message with id=" + id
+ " in a call to removeMessageHandle.");
}
}
}
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to remove message handle=" +
handle, exception);
} finally {
SQLHelper.close(rs);
SQLHelper.close(delete);
SQLHelper.close(select);
}
}
/**
* Update the specified message handle from the database
*
* @param connection - the connection to use
* @param handle - the handle to update
* @throws PersistenceException - sql releated exception
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -