📄 queuedestinationcache.java
字号:
}
}
// implementation of MessageMgr.persistentMessageAdded
public boolean persistentMessageAdded(Connection connection,
JmsDestination destination,
MessageImpl message)
throws PersistenceException {
boolean processed = false;
if ((destination != null) &&
(message != null)) {
// check that it is not already present before adding it.
if (destination.equals(_queue)) {
// create a handle for the message
try {
// all messages are added to this queue. Receivers will
// then pick messages of it as required.
MessageHandle handle =
MessageHandleFactory.getHandle(this, message);
addMessage(handle, message);
// increment the number of messages received
_publishCount++;
// if we have any registered consumers then we need to
// send the message to one of them first. If none are
// registered then cache it.
QueueConsumerEndpoint endpoint =
getEndpointForMessage(message);
if (endpoint != null) {
endpoint.persistentMessageAdded(connection, message);
}
// notify any queue listeners that a message has arrived
notifyQueueListeners(message);
// create a lease iff one is required
checkMessageExpiry(message);
// check the message as processed
processed = true;
} catch (JMSException exception) {
_log.error("Failed to add persistent message",
exception);
}
} else {
// need to notify someone or something that we are
// dropping messages. Do we throw an exception
}
}
return processed;
}
// implementation of MessageMgr.persistentMessageAdded
public synchronized void persistentMessageRemoved(
Connection connection, JmsDestination destination,
MessageImpl message)
throws PersistenceException {
if ((destination != null) &&
(message != null)) {
try {
PersistentMessageHandle handle = (PersistentMessageHandle)
MessageHandleFactory.getHandle(this, message);
// call remove regardless whether it exists
if (destination.equals(_queue)) {
removeMessage(handle);
notifyOnRemovePersistentMessage(connection, message);
MessageHandleFactory.destroyPersistentHandle(connection,
handle);
}
} catch (JMSException exception) {
_log.error("Failed to remove persistent message", exception);
}
}
}
/**
* Return the next {@link ConsumerEndpoint} that can consume the specified
* message or null if there is none.
*
* @param message - the message to consume
* @return the consumer who should receive this message or null
*/
private synchronized QueueConsumerEndpoint getEndpointForMessage(
MessageImpl message) {
QueueConsumerEndpoint selectedEndpoint = null;
if (_consumers.size() > 0) {
// roll over the consumer index if it is greater
// than the number of registered consumers
if ((_lastConsumerIndex + 1) > _consumers.size()) {
_lastConsumerIndex = 0;
}
// look over the list of consumers and return the
// first endpoint that can process this message
int index = _lastConsumerIndex;
do {
QueueConsumerEndpoint endpoint =
(QueueConsumerEndpoint) _consumers.get(index);
Selector selector = endpoint.getSelector();
// if the endpoint has a message listener registered
// or the endpoint is waiting for a message and the
// message satisfies the selector then return it to
// the client.
if (((endpoint.hasMessageListener()) ||
(endpoint.isWaitingForMessage())) &&
((selector == null) ||
(selector.selects(message)))) {
_lastConsumerIndex = ++index;
selectedEndpoint = endpoint;
break;
}
// advance to the next consumer
if (++index >= _consumers.size()) {
index = 0;
}
} while (index != _lastConsumerIndex);
}
return selectedEndpoint;
}
/**
* Return the first message of the queue or null if there are no messages
* in the cache
*
* @param QueueConsumerEndpoint - the consumer who will receive the message
* @return MessageHandle - handle to the first message
*/
public synchronized MessageHandle getMessage(
QueueConsumerEndpoint endpoint) {
MessageHandle handle = null;
// do not return a message is the endpoint is null;
if ((endpoint != null) &&
(getMessageCount() > 0)) {
Selector selector = endpoint.getSelector();
if (selector == null) {
// if no selector has been specified then remove and return
// the first message
handle = removeFirstMessage();
_consumeCount++;
} else {
// for non null selector we must find the first matching
Object[] handles = toMessageArray();
for (int i = 0; i < handles.length; ++i) {
MessageHandle hdl = (MessageHandle) handles[i];
MessageImpl message = hdl.getMessage();
if (message != null && selector.selects(message)) {
handle = hdl;
removeMessage(hdl);
_consumeCount++;
break;
}
}
}
}
return handle;
}
/**
* Playback all the messages in the cache to the specified
* {@link QueueListener}
*
* @param listener - the queue listener
*/
public void playbackMessages(QueueListener listener) {
Object[] messages = toMessageArray();
if ((listener != null) &&
(messages.length > 0)) {
try {
for (int index = 0; index < messages.length; index++) {
listener.onMessage(((MessageHandle) messages[index]).getMessage());
}
} catch (IndexOutOfBoundsException exception) {
// ignore the exception since the list is dynamic and may
// be modified while it is being processed.
}
}
}
/**
* Return the specified message to top of the queue. This is called to
* recover unsent or unacked messages
*
* @param message - message to return
*/
public synchronized void returnMessage(MessageHandle handle) {
// add the message to the destination cache
addMessage(handle);
// if there are registered consumers then check whether
// any of them have registered message listeners
if (_consumers.size() > 0) {
// roll over the consumer index if it is greater
// than the number of registered consumers
if ((_lastConsumerIndex + 1) > _consumers.size()) {
_lastConsumerIndex = 0;
}
int index =
(_lastConsumerIndex >= _consumers.size()) ?
0 : _lastConsumerIndex;
do {
QueueConsumerEndpoint endpoint =
(QueueConsumerEndpoint) _consumers.get(index);
// if we find an endpoint with a listener then
// we should reschedule it.
if (endpoint.hasMessageListener()) {
endpoint.schedule();
_lastConsumerIndex = ++index;
break;
}
// advance to the next consumer
if (++index >= _consumers.size()) {
index = 0;
}
} while (index != _lastConsumerIndex);
}
}
/**
* Notify all the queue listeners, that this message has arrived. This is
* ideal for browsers and iterators
*
* @param message - message to deliver
*/
void notifyQueueListeners(MessageImpl message) {
if (!_queueListeners.isEmpty()) {
QueueListener[] listeners =
(QueueListener[]) _queueListeners.toArray(
new QueueListener[0]);
int size = listeners.length;
for (int index = 0; index < size; ++index) {
QueueListener listener = listeners[index];
if (listener instanceof QueueBrowserEndpoint) {
QueueBrowserEndpoint browser =
(QueueBrowserEndpoint) listener;
Selector selector = browser.getSelector();
// if a selector has been specified then apply the filter
// before sending down the message
if ((selector == null) ||
(selector.selects(message))) {
browser.onMessage(message);
}
} else {
// if there is any other type of subscriber then just
// send the message to it.
listener.onMessage(message);
}
}
}
}
// implementation of DestinationCache.notifyOnAddMessage
boolean notifyOnAddMessage(MessageImpl message) {
return true;
}
// implementation of DestinationCache.notifyOnRemoveMessage
void notifyOnRemoveMessage(MessageImpl message) {
}
// implementation of DestinationCache.hasActiveConsumers
boolean hasActiveConsumers() {
boolean active = true;
if (_queueListeners.isEmpty() && _consumers.isEmpty()) {
active = false;
}
if (_log.isDebugEnabled()) {
_log.debug("hasActiveConsumers()[queue=" + _queue + "]=" + active);
}
return active;
}
/**
* Determines if this cache can be destroyed.
* A <code>QueueDestinationCache</code> can be destroyed if there
* are no active consumers and:
* <ul>
* <li>the queue is persistent and there are no messages</li>
* <li>
* the queue is temporary and the corresponding connection is closed
* </li>
* </ul>
*
* @return <code>true</code> if the cache can be destroyed, otherwise
* <code>false</code>
*/
public boolean canDestroy() {
boolean destroy = false;
if (!hasActiveConsumers()) {
JmsDestination queue = getDestination();
if (queue.getPersistent() && getMessageCount() == 0) {
destroy = true;
} else if (queue.isTemporaryDestination()) {
// check if there is a corresponding connection. If
// not, it has been closed, and the cache can be removed
String connectionId =
((JmsTemporaryDestination) queue).getConnectionId();
JmsServerConnectionManager manager =
JmsServerConnectionManager.instance();
if (manager.getConnection(connectionId) == null) {
destroy = true;
}
}
}
return destroy;
}
/**
* Destroy this object
*/
synchronized void destroy() {
super.destroy();
_queueListeners.clear();
}
// override Object.toString
public String toString() {
return _queue.toString();
}
// override Object.hashCode
public int hashCode() {
return _queue.hashCode();
}
} //-- QueueDestinationCache
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -