📄 messagehandles.java
字号:
handle.getMessageId() + " for destination=" +
handle.getDestination().getName() + " and consumer=" +
handle.getConsumerPersistentId() +
" since the destination cannot be mapped to an id");
}
// map the consumer name to an identity
long consumerId = Consumers.instance().getConsumerId(
handle.getConsumerPersistentId());
if (consumerId == 0) {
throw new PersistenceException(
"Cannot update message handle id=" +
handle.getMessageId() + " for destination=" +
handle.getDestination().getName() + " and consumer=" +
handle.getConsumerPersistentId() +
" since the consumer cannot be mapped to an id");
}
update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT);
update.setInt(1, handle.getDelivered() ? 1 : 0);
update.setString(2, id);
update.setLong(3, destinationId);
update.setLong(4, consumerId);
// execute the delete
if (update.executeUpdate() != 1 && !handle.hasExpired()) {
// only log if the message hasn't been garbage collected
_log.error(
"Failed to execute updateMessageHandle for handle=" +
id + ", destination id=" + destinationId +
", consumer id=" + consumerId);
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to update message handle=" +
handle, exception);
} finally {
SQLHelper.close(update);
}
}
/**
* Remove all the message handles associated with the specified destination
*
* @param connection - the connection to use
* @param destination the name of the destination
* @throws PersistenceException - sql releated exception
*/
public void removeMessageHandles(Connection connection, String destination)
throws PersistenceException {
PreparedStatement delete = null;
try {
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(destination);
if (destinationId == 0) {
throw new PersistenceException(
"Cannot remove message handles for destination=" +
destination + " since the destination cannot be " +
"mapped to an id");
}
delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST);
delete.setLong(1, destinationId);
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove message handles for destination=" +
destination, exception);
} finally {
SQLHelper.close(delete);
}
}
/**
* Remove all the message handles for the specified messageid
*
* @param connection - the connection to use
* @param messageId the message identity
* @throws PersistenceException - sql releated exception
*/
public void removeMessageHandles(Connection connection, long messageId)
throws PersistenceException {
PreparedStatement delete = null;
try {
delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT);
delete.setLong(1, messageId);
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove message handles for message id=" + messageId,
exception);
} finally {
SQLHelper.close(delete);
}
}
/**
* Retrieve the message handle for the specified desitation and consumer
* name
*
* @param connection - the connection to use
* @param destination - destination name
* @param name - consumer name
* @return Vector - collection of MessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public Vector getMessageHandles(Connection connection, String destination,
String name)
throws PersistenceException {
Vector result = new Vector();
PreparedStatement select = null;
ResultSet set = null;
// if the consumer and/or destination cannot be mapped then
// return an empty vector
long destinationId = Destinations.instance().getId(destination);
long consumerId = Consumers.instance().getConsumerId(name);
if ((consumerId == 0) ||
(destinationId == 0)) {
return result;
}
// all preprequisites have been met so continue processing the
// request.
try {
select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST);
select.setLong(1, consumerId);
// iterate through the result set and construct the corresponding
// MessageHandles
set = select.executeQuery();
while (set.next()) {
// Attempt to retrieve the corresponding destination
JmsDestination dest = Destinations.instance().get(
set.getLong(2));
if (dest == null) {
throw new PersistenceException(
"Cannot create persistent handle, because " +
"destination mapping failed for " + set.getLong(2));
}
String consumer = Consumers.instance().getConsumerName(
set.getLong(3));
if (name == null) {
throw new PersistenceException(
"Cannot create persistent handle because " +
"consumer mapping failed for " + set.getLong(3));
}
String messageId = set.getString(1);
int priority = set.getInt(4);
long acceptedTime = set.getLong(5);
long sequenceNumber = set.getLong(6);
long expiryTime = set.getLong(7);
boolean delivered = (set.getInt(8) == 0) ? false : true;
MessageHandle handle = new PersistentMessageHandle(
messageId, priority, acceptedTime, sequenceNumber,
expiryTime, dest, consumer);
handle.setDelivered(delivered);
result.add(handle);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to get message handles for destination=" +
destination + ", consumer=" + name, exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Retrieve a distint list of message ids, in this table, between the min
* and max times inclusive.
*
* @param connection - the connection to use
* @param min - the minimum time in milliseconds
* @param max - the maximum time in milliseconds
* @return Vector - collection of String objects
* @throws PersistenceException - sql related exception
*/
public Vector getMessageIds(Connection connection, long min, long max)
throws PersistenceException {
Vector result = new Vector();
PreparedStatement select = null;
ResultSet set = null;
try {
select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE);
select.setLong(1, min);
select.setLong(2, max);
// iterate through the result set and construct the corresponding
// MessageHandles
set = select.executeQuery();
while (set.next()) {
result.add(set.getString(1));
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to retrieve message ids",
exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Check if a message with the specified messageId exists in the
* table
*
* @param connection - the connection to use
* @param messageId the message Identifier
* @return Vector - collection of MessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public boolean messageExists(Connection connection, long messageId)
throws PersistenceException {
boolean result = false;
PreparedStatement select = null;
ResultSet set = null;
try {
select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID);
select.setLong(1, messageId);
set = select.executeQuery();
if (set.next()) {
result = true;
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to determine if message exists, id=" + messageId,
exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Returns the number of messages for the specified destination and
* consumer
*
* @param connection - the connection to use
* @param destination - destination name
* @param name - consumer name
* @return Vector - collection of MessageHandle objects
* @throws PersistenceException - sql releated exception
*/
public int getMessageCount(Connection connection, String destination,
String name)
throws PersistenceException {
int result = -1;
boolean destinationIsWildCard = false;
// map the destination name to an actual identity
long destinationId = Destinations.instance().getId(destination);
if (destinationId == 0) {
if (JmsTopic.isWildCard(destination)) {
destinationIsWildCard = true;
} else {
throw new PersistenceException(
"Cannot get message handle count for destination=" +
destination + " and consumer=" + name +
" since the destination cannot be mapped to an id");
}
}
// map the consumer name to an identity
long consumerId = Consumers.instance().getConsumerId(name);
if (consumerId == 0) {
throw new PersistenceException(
"Cannot get message handle count for destination=" +
destination + " and consumer=" + name +
" since the consumer cannot be mapped to an id");
}
PreparedStatement select = null;
ResultSet set = null;
try {
if (destinationIsWildCard) {
select = connection.prepareStatement(
GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER);
select.setLong(1, destinationId);
select.setLong(2, consumerId);
} else {
select = connection.prepareStatement(
GET_MSG_HANDLE_COUNT_FOR_CONSUMER);
select.setLong(1, consumerId);
}
set = select.executeQuery();
if (set.next()) {
result = set.getInt(1);
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to count messages for destination=" + destination +
", consumer=" + name, exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
return result;
}
/**
* Remove all expired handles for the specified consumer
*
* @param connection - the connection to use
* @param consumer - consumer name
* @throws PersistenceException - sql releated exception
*/
public void removeExpiredMessageHandles(Connection connection,
String consumer)
throws PersistenceException {
PreparedStatement delete = null;
// map the consumer name ot an identity
long consumerId = Consumers.instance().getConsumerId(consumer);
if (consumerId != 0) {
try {
delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES);
delete.setLong(1, consumerId);
delete.setLong(2, System.currentTimeMillis());
delete.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to remove expired message handles",
exception);
} finally {
SQLHelper.close(delete);
}
}
}
/**
* Deallocates resources owned or referenced by the instance
*/
public void close() {
_instance = null;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -