⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queuedestinationcache.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * @param handle the message handle to return
     */
    public void returnMessageHandle(MessageHandle handle) {
        // add the message to the destination cache
        _handles.add(handle);

        // if there are registered consumers then check whether
        // any of them have registered message listeners
        ConsumerEndpoint[] consumers = getConsumerArray();
        final int size = consumers.length;
        if (size > 0) {
            // roll over the consumer index if it is greater
            // than the number of registered consumers
            if ((_lastConsumerIndex + 1) > size) {
                _lastConsumerIndex = 0;
            }

            int index = (_lastConsumerIndex >= size) ? 0 : _lastConsumerIndex;

            do {
                QueueConsumerEndpoint endpoint
                        = (QueueConsumerEndpoint) consumers[index];

                if (endpoint.hasMessageListener()) {
                    // if we find an endpoint with a listener then
                    // we should reschedule it.
                    endpoint.schedule();
                    _lastConsumerIndex = ++index;
                    break;
                } else if (endpoint.isWaitingForMessage()) {
                    endpoint.notifyMessageAvailable();
                    _lastConsumerIndex = ++index;
                    break;
                }

                // advance to the next consumer
                if (++index >= size) {
                    index = 0;
                }
            } while (index != _lastConsumerIndex);
        }
    }

    /**
     * Determines if there are any registered consumers.
     *
     * @return <code>true</code> if there are registered consumers
     */
    public boolean hasActiveConsumers() {
        boolean active = super.hasActiveConsumers();
        if (!active && !_browsers.isEmpty()) {
            active = true;
        }
        if (_log.isDebugEnabled()) {
            _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]="
                       + active);
        }
        return active;
    }

    /**
     * Determines if this cache can be destroyed.
     * A <code>QueueDestinationCache</code> can be destroyed if there are no
     * active consumers and:
     * <ul>
     *   <li>the queue is persistent and there are no messages</li>
     *   <li> the queue is temporary and the corresponding connection is closed
     *   </li>
     * </ul>
     *
     * @return <code>true</code> if the cache can be destroyed, otherwise
     *         <code>false</code>
     */
    public boolean canDestroy() {
        boolean destroy = false;
        if (!hasActiveConsumers()) {
            JmsDestination queue = getDestination();
            if (queue.getPersistent() && getMessageCount() == 0) {
                destroy = true;
            } else if (queue.isTemporaryDestination()) {
                // check if there is a corresponding connection. If
                // not, it has been closed, and the cache can be removed
                long connectionId =
                        ((JmsTemporaryDestination) queue).getConnectionId();
                JmsServerConnectionManager manager =
                        JmsServerConnectionManager.instance();
                if (manager.getConnection(connectionId) == null) {
                    destroy = true;
                }
            }
        }
        return destroy;
    }

    /**
     * Destroy this object
     */
    public synchronized void destroy() {
        super.destroy();
        _browsers.clear();
    }

    /**
     * Initialise the cache. This removes all the expired messages, and then
     * retrieves all unacked messages from the database and stores them
     * locally.
     *
     * @param connection the database connection
     * @throws JMSException for any JMS error
     * @throws PersistenceException for any persistence error
     */
    protected void init(Connection connection) throws JMSException, PersistenceException {
        _handles = new MessageQueue();

        JmsDestination queue = getDestination();
        DatabaseService.getAdapter().removeExpiredMessageHandles(connection,
                                                                 queue.getName());
        DefaultMessageCache cache = getMessageCache();
        List handles = DatabaseService.getAdapter().getMessageHandles(
                connection, queue, queue.getName());
        Iterator iterator = handles.iterator();
        while (iterator.hasNext()) {
            PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next();
            String messageId = handle.getMessageId();
            MessageRef reference = cache.getMessageRef(messageId);
            if (reference == null) {
                reference = new CachedMessageRef(messageId, true, cache);
            }
            cache.addMessageRef(reference);
            handle.reference(reference);
            _handles.add(new QueueConsumerMessageHandle(handle));

            checkMessageExpiry(reference, handle.getExpiryTime());
        }
    }

    /**
     * Add a message, and notify any listeners.
     *
     * @param reference a reference to the message
     * @param message the message
     * @param handle the handle to add
     * @throws JMSException for any error
     */
    protected void addMessage(MessageRef reference, MessageImpl message,
                              MessageHandle handle) throws JMSException {
        addMessage(reference, message);
        _handles.add(handle);

        // notify any queue listeners that a message has arrived
        notifyQueueListeners(handle, message);

        // create a lease iff one is required
        checkMessageExpiry(reference, message);
    }


    /**
     * Notify queue browsers that a message has arrived.
     *
     * @param handle a handle to the message
     * @param message the message
     * @throws JMSException if a browser fails to handle the message
     */
    protected void notifyQueueListeners(MessageHandle handle,
                                        MessageImpl message)
            throws JMSException {
        QueueBrowserEndpoint[] browsers =
                (QueueBrowserEndpoint[]) _browsers.toArray(
                        new QueueBrowserEndpoint[0]);

        for (int index = 0; index < browsers.length; ++index) {
            QueueBrowserEndpoint browser = browsers[index];
            browser.messageAdded(handle, message);
        }
    }

    /**
     * Remove an expired non-peristent message, and notify any listeners.
     *
     * @param reference the reference to the expired message
     * @throws JMSException for any error
     */
    protected void messageExpired(MessageRef reference) throws JMSException {
        _handles.remove(reference.getMessageId());
        // @todo - notify browser
        super.messageExpired(reference);
    }

    /**
     * Remove an expired persistent message, and notify any listeners.
     *
     * @param reference  the reference to the expired message
     * @param connection the database connection to use
     * @throws JMSException         if a listener fails to handle the
     *                              expiration
     * @throws PersistenceException if there is a persistence related problem
     */
    protected void persistentMessageExpired(MessageRef reference,
                                            Connection connection)
            throws JMSException, PersistenceException {
        _handles.remove(reference.getMessageId());
        // @todo - notify browsers
        super.messageExpired(reference);
    }

    /**
     * Return the next QueueConsumerEndpoint that can consume the
     * specified message or null if there is none.
     *
     * @param message - the message to consume
     * @return the consumer who should receive this message, or null
     */
    private synchronized QueueConsumerEndpoint getEndpointForMessage(
            MessageImpl message) {
        QueueConsumerEndpoint result = null;

        ConsumerEndpoint[] consumers = getConsumerArray();
        final int size = consumers.length;
        if (size > 0) {
            // roll over the consumer index if it is greater
            // than the number of registered consumers
            if ((_lastConsumerIndex + 1) > size) {
                _lastConsumerIndex = 0;
            }

            // look over the list of consumers and return the
            // first endpoint that can process this message
            int index = _lastConsumerIndex;
            do {
                QueueConsumerEndpoint endpoint =
                        (QueueConsumerEndpoint) consumers[index];
                Selector selector = endpoint.getSelector();

                // if the endpoint has a message listener registered
                // or the endpoint is waiting for a message and the
                // message satisfies the selector then return it to
                // the client.
                if (((endpoint.hasMessageListener()) ||
                        (endpoint.isWaitingForMessage())) &&
                        ((selector == null) ||
                        (selector.selects(message)))) {
                    _lastConsumerIndex = ++index;
                    result = endpoint;
                    break;
                }

                // advance to the next consumer
                if (++index >= size) {
                    index = 0;
                }
            } while (index != _lastConsumerIndex);
        }

        return result;
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -