messagehandles.java
来自「实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库」· Java 代码 · 共 770 行 · 第 1/2 页
JAVA
770 行
public void updateMessageHandle(Connection connection,
PersistentMessageHandle handle)
throws PersistenceException {
PreparedStatement update = null;
if (_log.isDebugEnabled()) {
_log.debug("updateMessageHandle(handle=[consumer="
+ handle.getConsumerName()
+ ", destination=" + handle.getDestination()
+ ", id=" + handle.getMessageId().getId() + "])");
}
try {
// get the message id
String id = handle.getMessageId().getId();
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(
handle.getDestination().getName());
if (destinationId == 0) {
throw new PersistenceException(
"Cannot update message handle id=" +
handle.getMessageId() + " for destination=" +
handle.getDestination().getName() + " and consumer=" +
handle.getConsumerName() +
" since the destination cannot be mapped to an id");
}
// map the consumer name to an identity
long consumerId = Consumers.instance().getConsumerId(
handle.getConsumerName());
if (consumerId == 0) {
throw new PersistenceException(
"Cannot update message handle id=" +
handle.getMessageId() + " for destination=" +
handle.getDestination().getName() + " and consumer=" +
handle.getConsumerName() +
" since the consumer cannot be mapped to an id");
}
update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT);
update.setInt(1, handle.getDelivered() ? 1 : 0);
update.setString(2, id);
update.setLong(3, destinationId);
update.setLong(4, consumerId);
// execute the delete
if (update.executeUpdate() != 1 && !handle.hasExpired()) {
// only log if the message hasn't been garbage collected
_log.error(
"Failed to execute updateMessageHandle for handle=" +
id + ", destination id=" + destinationId +
", consumer id=" + consumerId);
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to update message handle=" +
handle, exception);
} finally {
SQLHelper.close(update);
}
}
/**
* Remove all the message handles associated with the specified destination
*
* @param connection - the connection to use
* @param String name of destination
* @throws PersistenceException - sql releated exception
*/
public void removeMessageHandles(Connection connection, String destination)
throws PersistenceException {
PreparedStatement delete = null;
try {
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(destination);
if (destinationId == 0) {
throw new PersistenceException(
"Cannot remove message handles for destination=" +
destination + " since the destination cannot be " +
"mapped to an id");
}
delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST);
delete.setLong(1, destinationId);
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove message handles for destination=" +
destination, exception);
} finally {
SQLHelper.close(delete);
}
}
/**
* Remove all the message handles for the specified messageid
*
* @param connection - the connection to use
* @param id - message identity
* @throws PersistenceException - sql releated exception
*/
public void removeMessageHandles(Connection connection, long messageId)
throws PersistenceException {
PreparedStatement delete = null;
try {
delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT);
delete.setLong(1, messageId);
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove message handles for message id=" + messageId,
exception);
} finally {
SQLHelper.close(delete);
}
}
/**
* Retrieve the message handle for the specified desitation and consumer
* name
*
* @param connection - the connection to use
* @param destination - destination name
* @param name - consumer name
* @return Vector - collection of PersistentMessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public Vector getMessageHandles(Connection connection, String destination,
String name)
throws PersistenceException {
Vector result = new Vector();
PreparedStatement select = null;
ResultSet set = null;
// if the consumer and/or destination cannot be mapped then
// return an empty vector
long destinationId = Destinations.instance().getId(destination);
long consumerId = Consumers.instance().getConsumerId(name);
if ((consumerId == 0) ||
(destinationId == 0)) {
return result;
}
// all preprequisites have been met so continue processing the
// request.
try {
select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST);
select.setLong(1, consumerId);
// iterate through the result set and construct the corresponding
// PersistentMessageHandles
set = select.executeQuery();
while (set.next()) {
// Attempt to retrieve the corresponding destination
JmsDestination dest = Destinations.instance().get(
set.getLong(2));
if (dest == null) {
throw new PersistenceException(
"Cannot create persistent handle, because " +
"destination mapping failed for " + set.getLong(2));
}
String consumer = Consumers.instance().getConsumerName(
set.getLong(3));
if (name == null) {
throw new PersistenceException(
"Cannot create persistent handle because " +
"consumer mapping failed for " + set.getLong(3));
}
PersistentMessageHandle handle = new PersistentMessageHandle();
handle.setMessageId(new MessageId(set.getString(1)));
handle.setDestination(dest);
handle.setConsumerName(consumer);
handle.setPriority(set.getInt(4));
handle.setAcceptedTime(set.getLong(5));
handle.setSequenceNumber(set.getLong(6));
handle.setExpiryTime(set.getLong(7));
handle.setDelivered((set.getInt(8) == 0) ? false : true);
result.add(handle);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to get message handles for destination=" +
destination + ", consumer=" + name, exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Retrieve a distint list of message ids, in this table, between the min
* and max times inclusive.
*
* @param connection - the connection to use
* @param min - the minimum time in milliseconds
* @param max - the maximum time in milliseconds
* @return Vector - collection of String objects
* @throws PersistenceException - sql related exception
*/
public Vector getMessageIds(Connection connection, long min, long max)
throws PersistenceException {
Vector result = new Vector();
PreparedStatement select = null;
ResultSet set = null;
try {
select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE);
select.setLong(1, min);
select.setLong(2, max);
// iterate through the result set and construct the corresponding
// PersistentMessageHandles
set = select.executeQuery();
while (set.next()) {
result.add(set.getString(1));
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to retrieve message ids",
exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Check if a message with the specified messageId exists in the
* table
*
* @param connection - the connection to use
* @param id - message Id
* @return Vector - collection of PersistentMessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public boolean messageExists(Connection connection, long messageId)
throws PersistenceException {
boolean result = false;
PreparedStatement select = null;
ResultSet set = null;
try {
select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID);
select.setLong(1, messageId);
set = select.executeQuery();
if (set.next()) {
result = true;
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to determine if message exists, id=" + messageId,
exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Returns the number of messages for the specified destination and
* consumer
*
* @param connection - the connection to use
* @param destination - destination name
* @param name - consumer name
* @return Vector - collection of PersistentMessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public int getMessageCount(Connection connection, String destination,
String name)
throws PersistenceException {
int result = -1;
boolean destinationIsWildCard = false;
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(destination);
if (destinationId == 0) {
if (JmsTopic.isWildCard(destination)) {
destinationIsWildCard = true;
} else {
throw new PersistenceException(
"Cannot get message handle count for destination=" +
destination + " and consumer=" + name +
" since the destination cannot be mapped to an id");
}
}
// map the consumer name to an identity
long consumerId = Consumers.instance().getConsumerId(name);
if (consumerId == 0) {
throw new PersistenceException(
"Cannot get message handle count for destination=" +
destination + " and consumer=" + name +
" since the consumer cannot be mapped to an id");
}
PreparedStatement select = null;
ResultSet set = null;
try {
if (destinationIsWildCard) {
select = connection.prepareStatement(
GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER);
select.setLong(1, destinationId);
select.setLong(2, consumerId);
} else {
select = connection.prepareStatement(
GET_MSG_HANDLE_COUNT_FOR_CONSUMER);
select.setLong(1, consumerId);
}
set = select.executeQuery();
if (set.next()) {
result = set.getInt(1);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to count messages for destination=" + destination +
", consumer=" + name, exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Remove all expired handles for the specified consumer
*
* @param connection - the connection to use
* @param consumer - consumer name
* @throws PersistenceException - sql releated exception
*/
public void removeExpiredMessageHandles(Connection connection,
String consumer)
throws PersistenceException {
PreparedStatement delete = null;
// map the consumer name ot an identity
long consumerId = Consumers.instance().getConsumerId(consumer);
if (consumerId != 0) {
try {
delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES);
delete.setLong(1, consumerId);
delete.setLong(2, System.currentTimeMillis());
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove expired message handles",
exception);
} finally {
SQLHelper.close(delete);
}
}
}
/**
* Deallocates resources owned or referenced by the instance
*/
public void close() {
_instance = null;
}
} //-- MessageHandles
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?