📄 messages.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
*/
package org.exolab.jms.persistence;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Vector;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.message.MessageId;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.messagemgr.PersistentMessageHandle;
/**
* This class manages the persistent of message objects. It maintains
* all the SQL for achieving this.
*
* @version $Revision: 1.24 $ $Date: 2004/01/08 05:55:07 $
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
* @see org.exolab.jms.message.MessageImpl
* @see org.exolab.jms.persistence.RDBMSAdapter
*/
public class Messages {
/**
* Caches the singleton instance of this class
*/
private static Messages _instance;
/**
* Cache the transaction retry count
*/
private static int _retryCount = 10;
/**
* Cache the retry interval in milliseconds
*/
private static long _retryInterval = 50;
/**
* Monitor used to synchronize the initialization of this
* class
*/
private static final Object _block = new Object();
/**
* The logger
*/
private static final Log _log = LogFactory.getLog(Messages.class);
/**
* Returns a reference to the singleton instance.
* <p>
* Note that initialise() must have been invoked first for this
* to return a valid instance.
*
* @return Messages the singleton instance
*/
public static Messages instance() {
return _instance;
}
/**
* Create an initialise the singleton istance of this class.
*
* @return Messages the singleton instance
*/
public static Messages initialise() {
if (_instance == null) {
synchronized (_block) {
if (_instance == null) {
_instance = new Messages();
}
}
}
return _instance;
}
/**
* Add a message to the database, in the context of the specified
* transaction and connection.
*
* @param connection - execute on this connection
* @param message - the message to add
* @throws PersistenceException - an sql related error
*/
public void add(Connection connection, MessageImpl message)
throws PersistenceException {
PreparedStatement insert = null;
// extract the identity of the message
String messageId = message.getMessageId().getId();
// check that the destination is actually registered
// and map the name to the corresponding id
String name;
try {
name = ((JmsDestination) message.getJMSDestination()).getName();
} catch (JMSException exception) {
throw new PersistenceException(
"Failed to get destination for message=" +
message.getMessageId(), exception);
}
long destinationId = Destinations.instance().getId(name);
if (destinationId == 0) {
throw new PersistenceException(
"Cannot add message=" + message.getMessageId() +
", destination=" + name + " (" + destinationId +
"): destination does not exist");
}
try {
// create, populate and execute the insert
insert = connection.prepareStatement(
"insert into messages values " + "(?,?,?,?,?,?,?)");
insert.setString(1, messageId);
insert.setLong(2, destinationId);
insert.setInt(3, message.getJMSPriority());
insert.setLong(4, message.getAcceptedTime());
insert.setLong(5, message.getJMSExpiration());
insert.setInt(6, (message.getProcessed()) ? 1 : 0);
// serialize the message
byte[] bytes = serialize(message);
insert.setBinaryStream(7, new ByteArrayInputStream(bytes),
bytes.length);
//insert.setBytes(8, bytes);
// execute the insert
if (insert.executeUpdate() != 1) {
throw new PersistenceException(
"Failed to add message=" + message.getMessageId() +
", destination=" + name + " (" + destinationId + ")");
}
} catch (PersistenceException exception) {
throw exception;
} catch (Exception exception) {
throw new PersistenceException(
"Failed to add message=" + message.getMessageId() +
", destination=" + name + " (" + destinationId + ")",
exception);
} finally {
SQLHelper.close(insert);
}
}
/**
* Update the message state in the database. This will be called to set
* the message state to processed by the provider
*
* @param connection - execute on this connection
* @param message - the message to update
* @throws PersistenceException - an sql related error
*/
public void update(Connection connection, MessageImpl message)
throws PersistenceException {
PreparedStatement update = null;
// extract the identity of the message
String messageId = message.getMessageId().getId();
try {
update = connection.prepareStatement(
"update messages set processed=? where messageId=?");
update.setInt(1, message.getProcessed() ? 1 : 0);
update.setString(2, messageId);
// execute the update
if (update.executeUpdate() != 1) {
_log.error("Cannot update message=" + messageId);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to update message, id=" + messageId, exception);
} finally {
SQLHelper.close(update);
}
}
/**
* Remove a message with the specified identity from the database
*
* @param connection - execute on this connection
* @param messageId - the message id of the message to remove
* @throws PersistenceException - an sql related error
*/
public void remove(Connection connection, String messageId)
throws PersistenceException {
PreparedStatement delete = null;
try {
delete = connection.prepareStatement(
"delete from messages where messageId=?");
delete.setString(1, messageId);
// execute the delete
if (delete.executeUpdate() != 1) {
_log.error("Cannot remove message=" + messageId);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove message, id=" + messageId, exception);
} finally {
SQLHelper.close(delete);
}
}
/**
* Return the message identified by the message Id
*
* @param connection - execute on this connection
* @param messageId - id of message to retrieve
* @return MessageImpl - the associated message
* @throws PersistenceException - an sql related error
*/
public MessageImpl get(Connection connection, String messageId)
throws PersistenceException {
MessageImpl result = null;
PreparedStatement select = null;
ResultSet set = null;
try {
select = connection.prepareStatement(
"select messageBlob, processed from messages where messageId=?");
select.setString(1, messageId);
set = select.executeQuery();
if (set.next()) {
result = deserialize(set.getBytes(1));
result.setProcessed((set.getInt(2) == 1 ? true : false));
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to retrieve message, id=" + messageId, exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Delete all messages for the given destination
*
* @param connection - execute on this connection
* @param messageId - id of message to retrieve
* @return int - the number of messages purged
* @throws PersistenceException - an sql related error
*/
public int removeMessages(Connection connection, String destination)
throws PersistenceException {
int result = 0;
PreparedStatement delete = null;
// map the destination name to an id
long destinationId = Destinations.instance().getId(destination);
if (destinationId == 0) {
throw new PersistenceException("Cannot delete messages for " +
"destination=" + destination +
": destination does not exist");
}
try {
delete = connection.prepareStatement(
"delete from messages where destinationId = ?");
delete.setLong(1, destinationId);
result = delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove messages for destination=" + destination,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -