⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queuedestinationcache.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        }
    }

    // implementation of MessageMgr.persistentMessageAdded
    public boolean persistentMessageAdded(Connection connection,
                                          JmsDestination destination,
                                          MessageImpl message)
        throws PersistenceException {

        boolean processed = false;

        if ((destination != null) &&
            (message != null)) {

            // check that it is not already present before adding it.
            if (destination.equals(_queue)) {

                // create a handle for the message
                try {

                    // all messages are added to this queue. Receivers will
                    // then pick messages of it as required.
                    MessageHandle handle =
                        MessageHandleFactory.getHandle(this, message);
                    addMessage(handle, message);

                    // increment the number of messages received
                    _publishCount++;

                    // if we have any registered consumers then we need to
                    // send the message to one of them first. If none are
                    // registered then cache it.
                    QueueConsumerEndpoint endpoint =
                        getEndpointForMessage(message);
                    if (endpoint != null) {
                        endpoint.persistentMessageAdded(connection, message);
                    }

                    // notify any queue listeners that a message has arrived
                    notifyQueueListeners(message);

                    // create a lease iff one is required
                    checkMessageExpiry(message);

                    // check the message as processed
                    processed = true;
                } catch (JMSException exception) {
                    _log.error("Failed to add persistent message",
                        exception);
                }
            } else {
                // need to notify someone or something that we are
                // dropping messages. Do we throw an exception
            }
        }

        return processed;
    }

    // implementation of MessageMgr.persistentMessageAdded
    public synchronized void persistentMessageRemoved(
        Connection connection, JmsDestination destination,
        MessageImpl message)
        throws PersistenceException {

        if ((destination != null) &&
            (message != null)) {

            try {
                PersistentMessageHandle handle = (PersistentMessageHandle)
                    MessageHandleFactory.getHandle(this, message);

                // call remove regardless whether it exists
                if (destination.equals(_queue)) {
                    removeMessage(handle);
                    notifyOnRemovePersistentMessage(connection, message);
                    MessageHandleFactory.destroyPersistentHandle(connection,
                        handle);
                }
            } catch (JMSException exception) {
                _log.error("Failed to remove persistent message", exception);
            }
        }
    }

    /**
     * Return the next {@link ConsumerEndpoint} that can consume the specified
     * message or null if there is none.
     *
     * @param  message - the message to consume
     * @return the consumer who should receive this message or null
     */
    private synchronized QueueConsumerEndpoint getEndpointForMessage(
        MessageImpl message) {
        QueueConsumerEndpoint selectedEndpoint = null;

        if (_consumers.size() > 0) {
            // roll over the consumer index if it is greater
            // than the number of registered consumers
            if ((_lastConsumerIndex + 1) > _consumers.size()) {
                _lastConsumerIndex = 0;
            }

            // look over the list of consumers and return the
            // first endpoint that can process this message
            int index = _lastConsumerIndex;
            do {
                QueueConsumerEndpoint endpoint =
                    (QueueConsumerEndpoint) _consumers.get(index);
                Selector selector = endpoint.getSelector();

                // if the endpoint has a message listener registered
                // or the endpoint is waiting for a message and the
                // message satisfies the selector then return it to
                // the client.
                if (((endpoint.hasMessageListener()) ||
                    (endpoint.isWaitingForMessage())) &&
                    ((selector == null) ||
                    (selector.selects(message)))) {
                    _lastConsumerIndex = ++index;
                    selectedEndpoint = endpoint;
                    break;
                }

                // advance to the next consumer
                if (++index >= _consumers.size()) {
                    index = 0;
                }
            } while (index != _lastConsumerIndex);
        }

        return selectedEndpoint;
    }

    /**
     * Return the first message of the queue or null if there are no messages
     * in the cache
     *
     * @param QueueConsumerEndpoint - the consumer who will receive the message
     * @return MessageHandle - handle to the first message
     */
    public synchronized MessageHandle getMessage(
        QueueConsumerEndpoint endpoint) {
        MessageHandle handle = null;
        // do not return a message is the endpoint is null;
        if ((endpoint != null) &&
            (getMessageCount() > 0)) {
            Selector selector = endpoint.getSelector();
            if (selector == null) {
                // if no selector has been specified then remove and return
                // the first message
                handle = removeFirstMessage();
                _consumeCount++;
            } else {
                // for non null selector we must find the first matching
                Object[] handles = toMessageArray();
                for (int i = 0; i < handles.length; ++i) {
                    MessageHandle hdl = (MessageHandle) handles[i];
                    MessageImpl message = hdl.getMessage();
                    if (message != null && selector.selects(message)) {
                        handle = hdl;
                        removeMessage(hdl);
                        _consumeCount++;
                        break;
                    }
                }
            }
        }

        return handle;
    }

    /**
     * Playback all the messages in the cache to the specified
     * {@link QueueListener}
     *
     * @param listener - the queue listener
     */
    public void playbackMessages(QueueListener listener) {

        Object[] messages = toMessageArray();
        if ((listener != null) &&
            (messages.length > 0)) {
            try {
                for (int index = 0; index < messages.length; index++) {
                    listener.onMessage(((MessageHandle) messages[index]).getMessage());
                }
            } catch (IndexOutOfBoundsException exception) {
                // ignore the exception since the list is dynamic and may
                // be modified while it is being processed.
            }
        }
    }

    /**
     * Return the specified message to top of the queue. This is called to
     * recover unsent or unacked messages
     *
     * @param message - message to return
     */
    public synchronized void returnMessage(MessageHandle handle) {

        // add the message to the destination cache
        addMessage(handle);

        // if there are registered consumers then check whether
        // any of them have registered message listeners
        if (_consumers.size() > 0) {
            // roll over the consumer index if it is greater
            // than the number of registered consumers
            if ((_lastConsumerIndex + 1) > _consumers.size()) {
                _lastConsumerIndex = 0;
            }

            int index =
                (_lastConsumerIndex >= _consumers.size()) ?
                0 : _lastConsumerIndex;

            do {

                QueueConsumerEndpoint endpoint =
                    (QueueConsumerEndpoint) _consumers.get(index);

                // if we find an endpoint with a listener then
                // we should reschedule it.
                if (endpoint.hasMessageListener()) {
                    endpoint.schedule();
                    _lastConsumerIndex = ++index;
                    break;
                }

                // advance to the next consumer
                if (++index >= _consumers.size()) {
                    index = 0;
                }
            } while (index != _lastConsumerIndex);
        }
    }

    /**
     * Notify all the queue listeners, that this message has arrived. This is
     * ideal for browsers and iterators
     *
     * @param message - message to deliver
     */
    void notifyQueueListeners(MessageImpl message) {
        if (!_queueListeners.isEmpty()) {
            QueueListener[] listeners =
                (QueueListener[]) _queueListeners.toArray(
                    new QueueListener[0]);

            int size = listeners.length;
            for (int index = 0; index < size; ++index) {
                QueueListener listener = listeners[index];
                if (listener instanceof QueueBrowserEndpoint) {
                    QueueBrowserEndpoint browser =
                        (QueueBrowserEndpoint) listener;
                    Selector selector = browser.getSelector();

                    // if a selector has been specified then apply the filter
                    // before sending down the message
                    if ((selector == null) ||
                        (selector.selects(message))) {
                        browser.onMessage(message);
                    }
                } else {
                    // if there is any other type of subscriber then just
                    // send the message to it.
                    listener.onMessage(message);
                }
            }
        }
    }

    // implementation of DestinationCache.notifyOnAddMessage
    boolean notifyOnAddMessage(MessageImpl message) {
        return true;
    }

    // implementation of DestinationCache.notifyOnRemoveMessage
    void notifyOnRemoveMessage(MessageImpl message) {
    }

    // implementation of DestinationCache.hasActiveConsumers
    boolean hasActiveConsumers() {
        boolean active = true;
        if (_queueListeners.isEmpty() && _consumers.isEmpty()) {
            active = false;
        }
        if (_log.isDebugEnabled()) {
            _log.debug("hasActiveConsumers()[queue=" + _queue + "]=" + active);
        }
        return active;
    }

    /**
     * Determines if this cache can be destroyed.
     * A <code>QueueDestinationCache</code> can be destroyed if there
     * are no active consumers and:
     * <ul>
     *   <li>the queue is persistent and there are no messages</li>
     *   <li>
     *     the queue is temporary and the corresponding connection is closed
     *   </li>
     * </ul>
     *
     * @return <code>true</code> if the cache can be destroyed, otherwise
     * <code>false</code>
     */
    public boolean canDestroy() {
        boolean destroy = false;
        if (!hasActiveConsumers()) {
            JmsDestination queue = getDestination();
            if (queue.getPersistent() && getMessageCount() == 0) {
                destroy = true;
            } else if (queue.isTemporaryDestination()) {
                // check if there is a corresponding connection. If
                // not, it has been closed, and the cache can be removed
                String connectionId =
                    ((JmsTemporaryDestination) queue).getConnectionId();
                JmsServerConnectionManager manager =
                    JmsServerConnectionManager.instance();
                if (manager.getConnection(connectionId) == null) {
                    destroy = true;
                }
            }
        }
        return destroy;
    }

    /**
     * Destroy this object
     */
    synchronized void destroy() {
        super.destroy();
        _queueListeners.clear();
    }

    // override Object.toString
    public String toString() {
        return _queue.toString();
    }

    // override Object.hashCode
    public int hashCode() {
        return _queue.hashCode();
    }

} //-- QueueDestinationCache

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -