📄 messages.java
字号:
exception);
} finally {
SQLHelper.close(delete);
}
return result;
}
/**
* Retrieve the next set of messages for the specified destination with
* an acceptance time greater or equal to that specified. It will retrieve
* around 200 or so messages depending on what is available.
*
* @param connection - execute on this connection
* @param destination - the destination
* @param priority - the priority of the messages
* @param time - with timestamp greater or equal to this
* @return Vector - one or more MessageImpl objects
* @throws PersistenceException - if an SQL error occurs
*/
public Vector getMessages(Connection connection, String destination,
int priority, long time)
throws PersistenceException {
PreparedStatement select = null;
ResultSet set = null;
Vector messages = new Vector();
try {
JmsDestination dest = Destinations.instance().get(destination);
if (dest == null) {
throw new PersistenceException(
"Cannot getMessages for destination=" + destination
+ ": destination does not exist");
}
long destinationId = Destinations.instance().getId(destination);
if (destinationId == 0) {
throw new PersistenceException(
"Cannot getMessages for destination=" + destination
+ ": destination does not exist");
}
if ((dest instanceof JmsTopic) &&
(((JmsTopic) dest).isWildCard())) {
// if the destination is a wildcard then we can't only select
// on timestamp. This will fault in any message greater than
// or equal to the specified timestamp.
select = connection.prepareStatement(
"select * from messages where priority=? and createTime>=? order by createTime asc");
select.setInt(1, priority);
select.setLong(2, time);
} else {
// if the destination is more specific then we can execute a
// more specialized query and fault in other messages for
// the same destination.
select = connection.prepareStatement(
"select * from messages where destinationId=? and priority=? and createTime>=? order by createTime asc");
select.setLong(1, destinationId);
select.setInt(2, priority);
select.setLong(3, time);
}
set = select.executeQuery();
// now iterate through the result set
int count = 0;
long lastTimeStamp = time;
while (set.next()) {
MessageImpl m = deserialize(set.getBytes("messageBlob"));
m.setProcessed((set.getInt("processed") == 1 ? true : false));
messages.add(m);
if (++count > 200) {
// if there are more than two hundred rows then exist
// the loop after 200 messages have been retrieved
// and the timestamp has changed.
if (set.getLong("createTime") > lastTimeStamp) {
break;
}
} else {
lastTimeStamp = set.getLong("createTime");
}
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to retrieve messages", exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return messages;
}
/**
* Retrieve the specified number of message ids from the database with a
* time greater than that specified. The number of items to retrieve
* is only a hint and does not reflect the number of messages actually
* returned.
*
* @param connection - execute on this connection
* @param time - with timestamp greater than
* @param hint - an indication of the number of messages to return.
* @return a map of messageId Strings to their creation time
* @throws PersistenceException - if an SQL error occurs
*/
public HashMap getMessageIds(Connection connection, long time, int hint)
throws PersistenceException {
PreparedStatement select = null;
ResultSet set = null;
HashMap messages = new HashMap();
try {
select = connection.prepareStatement(
"select messageId,createTime from messages where createTime>? order by createTime asc");
select.setLong(1, time);
set = select.executeQuery();
// now iterate through the result set
int count = 0;
long lastTimeStamp = time;
while (set.next()) {
messages.put(set.getString("messageId"),
new Long(set.getLong("createTime")));
if (++count > hint) {
if (set.getLong("createTime") > lastTimeStamp) {
break;
}
} else {
lastTimeStamp = set.getLong("createTime");
}
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to retrieve message identifiers", exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return messages;
}
/**
* Retrieve a list of unprocessed messages and return them to the client.
* An unprocessed message has been accepted by the system but not
* processed.
*
* @param connection - execute on this connection
* @return Vector - one or more MessageImpl objects
* @throws PersistenceException - if an SQL error occurs
*/
public Vector getUnprocessedMessages(Connection connection)
throws PersistenceException {
PreparedStatement select = null;
ResultSet set = null;
Vector messages = new Vector();
try {
select = connection.prepareStatement(
"select * from messages where processed=0");
set = select.executeQuery();
// now iterate through the result set
while (set.next()) {
MessageImpl m = deserialize(set.getBytes("messageBlob"));
m.setProcessed(false);
messages.add(m);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to retrieve unprocessed messages", exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return messages;
}
/**
* Retrieve the message handle for all unexpired messages
*
* @param connection - execute on this connection
* @param destination - the destination in question
* @return Vector - collection of PersistentMessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public Vector getNonExpiredMessages(Connection connection,
JmsDestination destination)
throws PersistenceException {
Vector result = new Vector();
PreparedStatement select = null;
ResultSet set = null;
try {
long destinationId = Destinations.instance().getId(
destination.getName());
if (destinationId == 0) {
throw new PersistenceException(
"Cannot getMessages for destination=" + destination
+ ": destination does not exist");
}
select = connection.prepareStatement(
"select messageId,destinationId,priority,createTime,expiryTime" +
" from messages where expiryTime>0 and destinationId=? order by expiryTime asc");
select.setLong(1, destinationId);
set = select.executeQuery();
while (set.next()) {
PersistentMessageHandle handle = new PersistentMessageHandle();
handle.setMessageId(new MessageId(set.getString(1)));
handle.setDestination(destination);
handle.setPriority(set.getInt(3));
handle.setAcceptedTime(set.getLong(4));
handle.setExpiryTime(set.getLong(5));
result.add(handle);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to retrieve non-expired messages", exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Delete all expired messages and associated message handles.
*
* @param connection - execute on this connection
* @throws PersistenceException - if an SQL error occurs
*/
public void removeExpiredMessages(Connection connection)
throws PersistenceException {
PreparedStatement delete = null;
try {
long time = System.currentTimeMillis();
// delete from the messages
delete = connection.prepareStatement(
"delete from messages where expiryTime > 0 and expiryTime < ?");
delete.setLong(1, time);
delete.executeUpdate();
delete.close();
// delete the message handles
delete = connection.prepareStatement(
"delete from message_handles where expiryTime > 0 and expiryTime < ?");
delete.setLong(1, time);
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove expired messages", exception);
} finally {
SQLHelper.close(delete);
}
}
/**
* Reset the instance. We need to deprecate this method since this
* class does not contain state information
*/
public void close() {
_instance = null;
}
/**
* Default constructor does nothing at the moment.
*/
protected Messages() {
}
/**
* Get the message as a serialized blob
*
* @param message the message to serialize
* @return byte[] the serialized message
*/
public byte[] serialize(MessageImpl message)
throws PersistenceException {
byte[] result = null;
try {
ByteArrayOutputStream bstream = new ByteArrayOutputStream();
ObjectOutputStream ostream = new ObjectOutputStream(bstream);
ostream.writeObject(message);
ostream.close();
result = bstream.toByteArray();
} catch (Exception exception) {
throw new PersistenceException("Failed to serialize message",
exception);
}
return result;
}
/**
* Set the message from a serialized blob
*
* @param blob the serialized message
* @return the re-constructed message
*/
public MessageImpl deserialize(byte[] blob) throws PersistenceException {
MessageImpl message = null;
if (blob != null) {
try {
ByteArrayInputStream bstream = new ByteArrayInputStream(blob);
ObjectInputStream istream = new ObjectInputStream(bstream);
message = (MessageImpl) istream.readObject();
istream.close();
} catch (Exception exception) {
throw new PersistenceException(
"Failed to de-serialize message", exception);
}
} else {
throw new PersistenceException(
"Cannot de-serialize null message blob");
}
return message;
}
} //-- Messages
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -