📄 destinationcache.java
字号:
*/
public Iterator getConsumers() {
return _consumers.iterator();
}
/**
* Return the consumers as an array of {@link ConsumerEndpoint} objects
* This is a safer way to get a list of consumers since it avoids
* concurrent modification exceptions
*
* @return Object[] - an array of ConsumerEndpoint objects
*/
Object[] getConsumersByArray() {
return _consumers.toArray();
}
/**
* This method is called when the {@link MessageMgr} adds a message
* for this destination to the cache
*
* @param message - message added to cache
*/
abstract public boolean messageAdded(JmsDestination destination,
MessageImpl message);
/**
* This method is called when the {@link MessageMgr} removes a
* message from the cache.
*
* @param message - message removed from cache
*/
abstract public void messageRemoved(JmsDestination destination,
MessageImpl message);
/**
* Return the number of messages currently active for this destination
*
* @return int - number of active messages
*/
public int getMessageCount() {
return _cache.getHandleCount();
}
/**
* Notify the listeners that a non-persistent message has been added to the
* cache
*
* @param handle - message that was added
* @return boolean - true of at least one listener has processed itx
*/
abstract boolean notifyOnAddMessage(MessageImpl message);
/**
* Notify the listeners that a non-persistent message has been removed form
* the cache
*
* @param handle - message that was removed
*/
abstract void notifyOnRemoveMessage(MessageImpl message);
/**
* Notify the listeners that a persistent message has been added to the
* cache
*
* @param connection - the persistent connection to use
* @param handle - message that was added
* @return boolean - true of at least one listener has processed it
* @throws PersistenceException - if there is a persistence related error
*/
boolean notifyOnAddPersistentMessage(Connection connection,
MessageImpl message)
throws PersistenceException {
//default implementation
return true;
}
/**
* Notify the listeners that a persistent message has been removed form
* the cache
*
* @param connection - the persistent connection to use.
* @param handle - message that was removed
* @throws PersistenceException - if there is a persistence related error
*/
void notifyOnRemovePersistentMessage(Connection connection,
MessageImpl message)
throws PersistenceException {
//default implementation is empty
}
/**
* Check whether there are any attached consumers to this cache
*
* @return boolean - true if there are attached consumers
*/
abstract boolean hasActiveConsumers();
/**
* This method is called whenever a lease expires. It passes the
* object that has expired.
*
* @param leasedObject reference to the leased object
*/
public void onLeaseExpired(Object leasedObject) {
if (leasedObject != null) {
MessageHandle handle = (MessageHandle) leasedObject;
// retrieve an instance of the message
MessageImpl message = resolveExpiredMessage(handle);
// determine whether the message is persistent or not and take
// the corresponding action
if (handle instanceof PersistentMessageHandle) {
Connection connection = null;
try {
connection = DatabaseService.getConnection();
persistentMessageRemoved(connection, getDestination(),
message);
connection.commit();
} catch (Exception exception) {
SQLHelper.rollback(connection);
_log.error("Failure in onLeaseExpired", exception);
} finally {
SQLHelper.close(connection);
}
} else {
// notify it's listeners that the non-persistent message has
// been removed
messageRemoved(getDestination(), message);
}
}
}
/**
* Determines if this cache can be destroyed
*
* @return <code>true</code> if the cache can be destroyed, otherwise
* <code>false</code>
*/
public abstract boolean canDestroy();
/**
* Check to see if the message has a TTL. If so then set up a lease
* for it. An expiry time of 0 means that the message never expires
*
* @param message - message to add
*/
void checkMessageExpiry(MessageImpl message) {
if (message != null) {
_leaseHelper.addLease(message);
}
}
/**
* Destory this cache.
*/
synchronized void destroy() {
// clear the cache
_cache.clear();
// remove the consumers
_consumers.clear();
// unregister itself from the message manager
MessageMgr.instance().removeEventListener(getDestination(), this);
// remove the lease
_leaseHelper.clear();
}
/**
* Close the cache and unregister all the consumers. Notify any and all
* DestinationCacheLifecycleListeners.
* <p>
* Once the cache is closed it will no longger receive messages for this
* destination.
*/
public void shutdown() {
destroy();
}
/**
* Insert the specified handle to the handles cache.
*
* @param handle - handle to add
*/
void addMessage(MessageHandle handle) {
handle.setConsumerName(getDestination().getName());
_cache.addHandle(handle);
}
/**
* Add the following handle and corresponding message to their respective
* caches
*
* @param handle - handle to add
* @param message - the corresponding message to add
*/
void addMessage(MessageHandle handle, MessageImpl message) {
handle.setConsumerName(getDestination().getName());
_cache.addMessage(handle, message);
}
/**
* Return the message for the specified handle
*
* @param handle - the handle
* @return MessageImpl - the associated message
*/
MessageImpl getMessage(MessageHandle handle) {
return _cache.getMessage(handle);
}
/**
* Remove the message handle from the cache, if it exists.
*
* @param handle - handle to remove
* @return boolean - true if it was removed
*/
boolean removeMessage(MessageHandle handle) {
return _cache.removeHandle(handle);
}
/**
* Remove and return the first message handle in the cache
*
* @return MessageHandle - the first handle or null if cache is empty
*/
final MessageHandle removeFirstMessage() {
return _cache.removeFirstHandle();
}
/**
* Return the message handles in the cache as an array
*
* @return Object[] - array of message handles
*/
final Object[] toMessageArray() {
return _cache.getHandleArray();
}
/**
* Delete the message with the specified handle from the cache
*
* @param handle - the handle
*/
void deleteMessage(MessageHandle handle) {
_cache.removeMessage(handle.getMessageId());
}
// implementation of Identifiable.getId
public String getId() {
return _id;
}
// implementation of GarbageCollectable.collectGarbage
public void collectGarbage(boolean aggressive) {
if (aggressive) {
// clear all persistent messages in the cache
_cache.clearPersistentMessages();
if (_log.isDebugEnabled()) {
_log.debug("Evicted all persistent messages from cache "
+ getDestination().getName());
}
}
if (_log.isDebugEnabled()) {
_log.debug("DESTCACHE -" + getDestination().getName()
+ " Messages: P[" + _cache.getPersistentCount()
+ "] T[" + _cache.getTransientCount() + "] Handles: ["
+ _cache.getHandleCount() + "]");
}
}
/**
* Resolve an expired message through its handle
*
* @param handle the expired message's handle
* @return the expired message. May be null.
*/
protected MessageImpl resolveExpiredMessage(MessageHandle handle) {
return handle.getMessage();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -