queuedestinationcache.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 523 行 · 第 1/2 页

JAVA
523
字号
            MessageHandle handle = handles[i];            MessageImpl message = handle.getMessage();            if (message != null) {                browser.messageAdded(handle, message);            }        }    }    /**     * Return a message handle back to the cache, to recover unsent or     * unacknowledged messages.     *     * @param handle the message handle to return     */    public void returnMessageHandle(MessageHandle handle) {        // add the message to the destination cache        _handles.add(handle);        try {            MessageImpl message = handle.getMessage();            if (message != null) {                // if there are any registered consumers, notify one of them                // that a message has arrived                ConsumerEndpoint consumer = getConsumerForMessage(message);                if (consumer != null) {                    consumer.messageAdded(handle, message);                }            }        } catch (JMSException exception) {            _log.debug(exception, exception);        }    }    /**     * Determines if there are any registered consumers.     *     * @return <code>true</code> if there are registered consumers     */    public boolean hasConsumers() {        boolean active = super.hasConsumers();        if (!active && !_browsers.isEmpty()) {            active = true;        }        if (_log.isDebugEnabled()) {            _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]="                    + active);        }        return active;    }    /**     * Returns the number of messages in the cache.     *     * @return the number of messages in the cache     */    public int getMessageCount() {        return _handles.size();    }    /**     * Determines if this cache can be destroyed. A <code>QueueDestinationCache</code>     * can be destroyed if there are no active consumers and: <ul> <li>the queue     * is persistent and there are no messages</li> <li> the queue is temporary     * and the corresponding connection is closed </li> </ul>     *     * @return <code>true</code> if the cache can be destroyed, otherwise     *         <code>false</code>     */    public boolean canDestroy() {        boolean destroy = false;        if (!hasConsumers()) {            JmsDestination queue = getDestination();            if (queue.getPersistent() && getMessageCount() == 0) {                destroy = true;            } else if (queue.isTemporaryDestination()) {                // check if there is a corresponding connection. If                // not, it has been closed, and the cache can be removed                long connectionId =                        ((JmsTemporaryDestination) queue).getConnectionId();                if (_connections.getConnection(connectionId) == null) {                    destroy = true;                }            }        }        return destroy;    }    /**     * Destroy this object.     */    public void destroy() {        super.destroy();        _browsers.clear();    }    /**     * Initialise the cache. This removes all the expired messages, and then     * retrieves all unacked messages from the database and stores them     * locally.     *     * @throws JMSException if the cache can't be initialised     */    protected void init() throws JMSException {        JmsDestination queue = getDestination();        List handles;        DatabaseService service = null;        try {            service = DatabaseService.getInstance();            Connection connection = service.getConnection();            service.getAdapter().removeExpiredMessageHandles(connection,                    queue.getName());            handles = service.getAdapter().getMessageHandles(connection, queue,                    queue.getName());        } catch (PersistenceException exception) {            _log.error(exception, exception);            try {                if (service != null) {                    service.rollback();                }            } catch (PersistenceException error) {                _log.error(error, error);            }            throw new JMSException(exception.getMessage());        }        Iterator iterator = handles.iterator();        DefaultMessageCache cache = getMessageCache();        while (iterator.hasNext()) {            PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next();            String messageId = handle.getMessageId();            MessageRef reference = cache.getMessageRef(messageId);            if (reference == null) {                reference = new CachedMessageRef(messageId, true, cache);            }            cache.addMessageRef(reference);            handle.reference(reference);            handle.setDestinationCache(this);            _handles.add(new QueueConsumerMessageHandle(handle));            checkMessageExpiry(reference, handle.getExpiryTime());        }    }    /**     * Add a message, and notify any listeners.     *     * @param reference a reference to the message     * @param message   the message     * @param handle    the handle to add     * @throws JMSException for any error     */    protected void addMessage(MessageRef reference, MessageImpl message,                              MessageHandle handle) throws JMSException {        addMessage(reference, message);        _handles.add(handle);        // notify any queue listeners that a message has arrived        notifyQueueListeners(handle, message);        // create a lease iff one is required        checkMessageExpiry(reference, message);    }    /**     * Notify queue browsers that a message has arrived.     *     * @param handle  a handle to the message     * @param message the message     * @throws JMSException if a browser fails to handle the message     */    protected void notifyQueueListeners(MessageHandle handle,                                        MessageImpl message)            throws JMSException {        QueueBrowserEndpoint[] browsers =                (QueueBrowserEndpoint[]) _browsers.toArray(                        new QueueBrowserEndpoint[0]);        for (int index = 0; index < browsers.length; ++index) {            QueueBrowserEndpoint browser = browsers[index];            browser.messageAdded(handle, message);        }    }    /**     * Remove an expired non-peristent message, and notify any listeners.     *     * @param reference the reference to the expired message     * @throws JMSException for any error     */    protected void messageExpired(MessageRef reference) throws JMSException {        _handles.remove(reference.getMessageId());        // @todo - notify browser        super.messageExpired(reference);    }    /**     * Remove an expired persistent message, and notify any listeners.     *     * @param reference the reference to the expired message     * @throws JMSException         if a listener fails to handle the     *                              expiration     * @throws PersistenceException if there is a persistence related problem     */    protected void persistentMessageExpired(MessageRef reference)            throws JMSException, PersistenceException {        _handles.remove(reference.getMessageId());        // @todo - notify browsers        super.messageExpired(reference);    }    /**     * Return the next QueueConsumerEndpoint that can consume the specified     * message or null if there is none.     *     * @param message - the message to consume     * @return the consumer who should receive this message, or null     */    private ConsumerEndpoint getConsumerForMessage(MessageImpl message) {        ConsumerEndpoint result = null;        ConsumerEndpoint[] consumers = getConsumerArray();        final int size = consumers.length;        if (size > 0) {            synchronized (_lock) {                // roll over the consumer index if it is greater                // than the number of registered consumers                if ((_lastConsumerIndex + 1) > size) {                    _lastConsumerIndex = 0;                }                // look over the list of consumers and return the                // first endpoint that can process this message                int index = _lastConsumerIndex;                do {                    ConsumerEndpoint consumer = consumers[index];                    // if the endpoint has a message listener registered                    // or the endpoint is waiting for a message and the                    // message satisfies the selector then return it to                    // the client.                    if ((consumer.isAsynchronous()                            || consumer.isWaitingForMessage())                            && consumer.selects(message)) {                        _lastConsumerIndex = ++index;                        result = consumer;                        break;                    }                    // advance to the next consumer                    if (++index >= size) {                        index = 0;                    }                } while (index != _lastConsumerIndex);            }        }        return result;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?