📄 abstractdestinationcache.java
字号:
// remove the consumers
_consumers.clear();
// unregister itself from the message manager
MessageMgr.instance().removeEventListener(getDestination(), this);
// remove the leases
MessageLease[] leases = null;
synchronized (_leases) {
leases = (MessageLease[]) _leases.values().toArray(
new MessageLease[0]);
_leases.clear();
}
for (int i = 0; i < leases.length; ++i) {
MessageLease lease = leases[i];
LeaseManager.instance().removeLease(lease);
}
}
/**
* Invoked when a message lease has expired.
*
* @param object an instance of {@link MessageRef}
*/
public void onLeaseExpired(Object object) {
MessageRef reference = (MessageRef) object;
String messageId = ((MessageRef) reference).getMessageId();
synchronized (_leases) {
_leases.remove(messageId);
}
// determine whether the message is persistent or not and take
// the corresponding action
try {
if (reference.isPersistent()) {
Connection connection = null;
try {
connection = DatabaseService.getConnection();
persistentMessageExpired(reference, connection);
reference.destroy(connection);
connection.commit();
} catch (JMSException exception) {
SQLHelper.rollback(connection);
throw exception;
} catch (Exception exception) {
SQLHelper.rollback(connection);
_log.error("Failed to expire message", exception);
throw new JMSException(exception.getMessage());
} finally {
SQLHelper.close(connection);
}
} else {
messageExpired(reference);
reference.destroy();
}
} catch (JMSException exception) {
_log.error("Failed to expire message", exception);
}
}
public void collectGarbage(boolean aggressive) {
if (aggressive) {
// clear all persistent messages in the cache
_cache.clearPersistentMessages();
if (_log.isDebugEnabled()) {
_log.debug("Evicted all persistent messages from cache "
+ getDestination().getName());
}
}
if (_log.isDebugEnabled()) {
_log.debug("DESTCACHE -" + getDestination().getName()
+ " Messages: P[" + _cache.getPersistentCount()
+ "] T[" + _cache.getTransientCount() + "] Total: ["
+ _cache.getMessageCount() + "]");
}
}
/**
* Initialise the cache from the database.
*
* @param connection the database connection to use
* @throws JMSException for any JMS error
* @throws PersistenceException for any persistence error
*/
protected abstract void init(Connection connection) throws JMSException,
PersistenceException;
/**
* Add a message reference and its corresponding message to the cache
*
* @param reference the reference to the message
* @param message the message
*/
protected void addMessage(MessageRef reference, MessageImpl message) {
_cache.addMessage(reference, message);
}
/**
* Returns the message cache
*
* @return the message cache
*/
protected DefaultMessageCache getMessageCache() {
return _cache;
}
/**
* Determines if there are any registered consumers
*
* @return <code>true</code> if there are registered consumers
*/
protected boolean hasActiveConsumers() {
return !_consumers.isEmpty();
}
/**
* Returns a consumer endpoint, given its id
*
* @param consumerId the consumer identity
* @return the consumer corresponding to <code>id</code>, or
* <code>null</code> if none is registered
*/
protected ConsumerEndpoint getConsumerEndpoint(long consumerId) {
return (ConsumerEndpoint) _consumers.get(new Long(consumerId));
}
/**
* Helper to return the consumers as an array
*
* @return the consumers of this cache
*/
protected ConsumerEndpoint[] getConsumerArray() {
ConsumerEndpoint[] result =
(ConsumerEndpoint[]) _consumers.values().toArray(
new ConsumerEndpoint[0]);
return result;
}
/**
* Remove an expired non-peristent message, and notify any listeners
*
* @param reference the reference to the expired message
* @throws JMSException for any error
*/
protected void messageExpired(MessageRef reference)
throws JMSException {
// notify consumers
String messageId = reference.getMessageId();
ConsumerEndpoint[] consumers = getConsumerArray();
for (int i = 0; i < consumers.length; ++i) {
consumers[i].messageRemoved(messageId);
}
}
/**
* Remove an expired persistent message, and notify any listeners.
*
* @param reference the reference to the expired message
* @param connection the database connection to use
* @throws JMSException if a listener fails to handle the expiration
* @throws PersistenceException if there is a persistence related problem
*/
protected void persistentMessageExpired(MessageRef reference,
Connection connection)
throws JMSException, PersistenceException {
// notify consumers
String messageId = reference.getMessageId();
ConsumerEndpoint[] consumers = getConsumerArray();
for (int i = 0; i < consumers.length; ++i) {
consumers[i].persistentMessageRemoved(messageId, connection);
}
}
/**
* Check to see if the message has a TTL. If so then set up a lease for it.
* An expiry time of 0 means that the message never expires
*
* @param reference a reference to the message
* @param message the message
* @throws JMSException if the JMSExpiration property can't be accessed
*/
protected void checkMessageExpiry(MessageRef reference,
MessageImpl message) throws JMSException {
checkMessageExpiry(reference, message.getJMSExpiration());
}
/**
* Check to see if the message has a TTL. If so then set up a lease for it.
* An expiry time of 0 means that the message never expires
*
* @param reference a reference to the message
* @param expiryTime the time when the message expires
*/
protected void checkMessageExpiry(MessageRef reference,
long expiryTime) {
if (expiryTime != 0) {
synchronized (_leases) {
// ensure that a lease for this message does not already exist.
if (!_leases.containsKey(reference.getMessageId())) {
long duration = expiryTime - System.currentTimeMillis();
if (duration <= 0) {
duration = 1;
}
MessageLease lease = new MessageLease(reference, duration,
this);
LeaseManager.instance().addLease(lease);
_leases.put(reference.getMessageId(), lease);
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -