queuedestinationcache.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 523 行 · 第 1/2 页
JAVA
523 行
MessageHandle handle = handles[i]; MessageImpl message = handle.getMessage(); if (message != null) { browser.messageAdded(handle, message); } } } /** * Return a message handle back to the cache, to recover unsent or * unacknowledged messages. * * @param handle the message handle to return */ public void returnMessageHandle(MessageHandle handle) { // add the message to the destination cache _handles.add(handle); try { MessageImpl message = handle.getMessage(); if (message != null) { // if there are any registered consumers, notify one of them // that a message has arrived ConsumerEndpoint consumer = getConsumerForMessage(message); if (consumer != null) { consumer.messageAdded(handle, message); } } } catch (JMSException exception) { _log.debug(exception, exception); } } /** * Determines if there are any registered consumers. * * @return <code>true</code> if there are registered consumers */ public boolean hasConsumers() { boolean active = super.hasConsumers(); if (!active && !_browsers.isEmpty()) { active = true; } if (_log.isDebugEnabled()) { _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]=" + active); } return active; } /** * Returns the number of messages in the cache. * * @return the number of messages in the cache */ public int getMessageCount() { return _handles.size(); } /** * Determines if this cache can be destroyed. A <code>QueueDestinationCache</code> * can be destroyed if there are no active consumers and: <ul> <li>the queue * is persistent and there are no messages</li> <li> the queue is temporary * and the corresponding connection is closed </li> </ul> * * @return <code>true</code> if the cache can be destroyed, otherwise * <code>false</code> */ public boolean canDestroy() { boolean destroy = false; if (!hasConsumers()) { JmsDestination queue = getDestination(); if (queue.getPersistent() && getMessageCount() == 0) { destroy = true; } else if (queue.isTemporaryDestination()) { // check if there is a corresponding connection. If // not, it has been closed, and the cache can be removed long connectionId = ((JmsTemporaryDestination) queue).getConnectionId(); if (_connections.getConnection(connectionId) == null) { destroy = true; } } } return destroy; } /** * Destroy this object. */ public void destroy() { super.destroy(); _browsers.clear(); } /** * Initialise the cache. This removes all the expired messages, and then * retrieves all unacked messages from the database and stores them * locally. * * @throws JMSException if the cache can't be initialised */ protected void init() throws JMSException { JmsDestination queue = getDestination(); List handles; DatabaseService service = null; try { service = DatabaseService.getInstance(); Connection connection = service.getConnection(); service.getAdapter().removeExpiredMessageHandles(connection, queue.getName()); handles = service.getAdapter().getMessageHandles(connection, queue, queue.getName()); } catch (PersistenceException exception) { _log.error(exception, exception); try { if (service != null) { service.rollback(); } } catch (PersistenceException error) { _log.error(error, error); } throw new JMSException(exception.getMessage()); } Iterator iterator = handles.iterator(); DefaultMessageCache cache = getMessageCache(); while (iterator.hasNext()) { PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next(); String messageId = handle.getMessageId(); MessageRef reference = cache.getMessageRef(messageId); if (reference == null) { reference = new CachedMessageRef(messageId, true, cache); } cache.addMessageRef(reference); handle.reference(reference); handle.setDestinationCache(this); _handles.add(new QueueConsumerMessageHandle(handle)); checkMessageExpiry(reference, handle.getExpiryTime()); } } /** * Add a message, and notify any listeners. * * @param reference a reference to the message * @param message the message * @param handle the handle to add * @throws JMSException for any error */ protected void addMessage(MessageRef reference, MessageImpl message, MessageHandle handle) throws JMSException { addMessage(reference, message); _handles.add(handle); // notify any queue listeners that a message has arrived notifyQueueListeners(handle, message); // create a lease iff one is required checkMessageExpiry(reference, message); } /** * Notify queue browsers that a message has arrived. * * @param handle a handle to the message * @param message the message * @throws JMSException if a browser fails to handle the message */ protected void notifyQueueListeners(MessageHandle handle, MessageImpl message) throws JMSException { QueueBrowserEndpoint[] browsers = (QueueBrowserEndpoint[]) _browsers.toArray( new QueueBrowserEndpoint[0]); for (int index = 0; index < browsers.length; ++index) { QueueBrowserEndpoint browser = browsers[index]; browser.messageAdded(handle, message); } } /** * Remove an expired non-peristent message, and notify any listeners. * * @param reference the reference to the expired message * @throws JMSException for any error */ protected void messageExpired(MessageRef reference) throws JMSException { _handles.remove(reference.getMessageId()); // @todo - notify browser super.messageExpired(reference); } /** * Remove an expired persistent message, and notify any listeners. * * @param reference the reference to the expired message * @throws JMSException if a listener fails to handle the * expiration * @throws PersistenceException if there is a persistence related problem */ protected void persistentMessageExpired(MessageRef reference) throws JMSException, PersistenceException { _handles.remove(reference.getMessageId()); // @todo - notify browsers super.messageExpired(reference); } /** * Return the next QueueConsumerEndpoint that can consume the specified * message or null if there is none. * * @param message - the message to consume * @return the consumer who should receive this message, or null */ private ConsumerEndpoint getConsumerForMessage(MessageImpl message) { ConsumerEndpoint result = null; ConsumerEndpoint[] consumers = getConsumerArray(); final int size = consumers.length; if (size > 0) { synchronized (_lock) { // roll over the consumer index if it is greater // than the number of registered consumers if ((_lastConsumerIndex + 1) > size) { _lastConsumerIndex = 0; } // look over the list of consumers and return the // first endpoint that can process this message int index = _lastConsumerIndex; do { ConsumerEndpoint consumer = consumers[index]; // if the endpoint has a message listener registered // or the endpoint is waiting for a message and the // message satisfies the selector then return it to // the client. if ((consumer.isAsynchronous() || consumer.isWaitingForMessage()) && consumer.selects(message)) { _lastConsumerIndex = ++index; result = consumer; break; } // advance to the next consumer if (++index >= size) { index = 0; } } while (index != _lastConsumerIndex); } } return result; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?