⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstracttopicconsumerendpoint.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    /**
     * This event is called when a message is removed from the
     * <code>DestinationCache</code>.
     *
     * @param messageId the identifier of the removed message
     * @throws JMSException if the listener fails to handle the message
     */
    public void messageRemoved(String messageId) throws JMSException {
        MessageHandle handle = _handles.remove(messageId);
        if (handle != null) {
            handle.destroy();
        }
    }

    /**
     * This event is called when a persistent message is added to the
     * <code>DestinationCache</code>.
     *
     * @param handle     a handle to the added message
     * @param message    the added message
     * @param connection the database connection
     * @return <code>true</code> if the listener accepted the message;
     * @throws JMSException         if the listener fails to handle the message
     * @throws PersistenceException if there is a persistence related problem
     */
    public boolean persistentMessageAdded(MessageHandle handle,
                                          MessageImpl message,
                                          Connection connection)
            throws JMSException, PersistenceException {
        boolean accepted = true;

        // if the 'noLocal' indicator is set, and the message arrived on
        // the same connection, ignore the message
        if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
            accepted = false;
        } else {
            // create a message handle for this consumer
            handle = new TopicConsumerMessageHandle(handle, this);
            if (isPersistent()) {
                // and make it persistent if this is a durable consumer
                handle.add(connection);
            }

            accepted = _handles.add(handle);
            if (accepted) {
                addMessage(handle);
                schedule();
            } else {
                accepted = false;
                _log.warn("Endpoint=" + this + " already has message cached: " +
                          handle);
            }
        }
        return accepted;
    }

    /**
     * This event is called when a message is removed from the
     * <code>DestinationCache</code>.
     *
     * @param messageId  the identifier of the removed message
     * @param connection the database connection
     * @throws JMSException         if the listener fails to handle the message
     * @throws PersistenceException if there is a persistence related problem
     */
    public void persistentMessageRemoved(String messageId,
                                         Connection connection)
            throws JMSException, PersistenceException {
        MessageHandle handle = _handles.remove(messageId);
        if (handle != null) {
            handle.destroy(connection);
        }
    }

    /**
     * This method is called when a new destination is added to the {@link
     * DestinationManager}.
     *
     * @param destination the destination that was added
     * @param cache       the corresponding cache
     */
    public void destinationAdded(JmsDestination destination,
                                 DestinationCache cache) {
        if (destination instanceof JmsTopic) {
            JmsTopic myTopic = (JmsTopic) getDestination();
            JmsTopic topic = (JmsTopic) destination;
            if (myTopic.match(topic) && !_caches.containsKey(topic)) {
                _caches.put(topic, cache);
                cache.addConsumer(this);
            }
        }
    }

    /**
     * This method is called when a destination is removed from the {@link
     * DestinationManager}.
     *
     * @param destination  the destination that was removed
     * @param cache        the corresponding cache
     */
    public void destinationRemoved(JmsDestination destination,
                                   DestinationCache cache) {
        if (destination instanceof JmsTopic) {
            _caches.remove(destination);
        }
    }

    /**
     * Deliver messages in the cache to the consumer.
     *
     * @return <code>true</code> if the endpoint should be rescheduled
     */
    protected boolean deliverMessages() {
        boolean reschedule = true;

        for (int index = 0; index < MAX_MESSAGES;) {

            // check if we should exit the loop
            if (stopDelivery()) {
                reschedule = false;
                break;
            }

            // Process the first message on the list.
            MessageHandle handle = _handles.removeFirst();
            try {
                Selector selector = getSelector();
                if (selector != null) {
                    MessageImpl m = handle.getMessage();
                    if ((m != null) && selector.selects(m)) {
                        // this message has been selected by the selector
                        _listener.onMessage(handle);
                        index++;
                    } else {
                        // this message has not been selected
                        handle.destroy();
                    }
                } else {
                    // send the message to the consumer
                    _listener.onMessage(handle);
                    index++;
                }
            } catch (RemoteException exception) {
                _listener = null;
                returnMessage(handle);
            } catch (JMSException exception) {
                _log.error(exception, exception);
                returnMessage(handle);
            } catch (Exception exception) {
                _log.error(exception, exception);
                returnMessage(handle);
            }
        }
        return reschedule;
    }

    /**
     * Registers this with the associated {@link DestinationCache}s The consumer
     * may receive messages immediately.
     *
     * @throws JMSException for any JMS error
     */
    protected void init() throws JMSException {
        JmsTopic topic = (JmsTopic) getDestination();

        // register the endpoint with the destination
        DestinationManager destmgr = DestinationManager.instance();
        if (topic.isWildCard()) {
            // if the topic is a wild card then we need to retrieve a
            // set of matching destination caches.
            _caches = destmgr.getTopicDestinationCaches(topic);
            // for each cache register this endpoint as a consumer of
            // it's messages. Before doing so register as a destination
            // event listener with the DestinationManager
            destmgr.addDestinationEventListener(this);
            Iterator iterator = _caches.values().iterator();
            while (iterator.hasNext()) {
                DestinationCache cache = (DestinationCache) iterator.next();
                cache.addConsumer(this);
            }
        } else {
            // if the topic is not a wildcard then we need to get the
            // destination cache. If one does not exist then we need to
            // create it.
            DestinationCache cache = destmgr.getDestinationCache(topic);
            _caches.put(topic, cache);
            cache.addConsumer(this);
        }
    }

    /**
     * Add the handle to the cache.
     *
     * @param handle the message handle to add
     */
    protected void addMessage(MessageHandle handle) {
        _handles.add(handle);
        notifyMessageAvailable();
    }

    /**
     * Closes this endpoint.
     */
    protected void doClose() {
        // unregister as a destination event listener
        DestinationManager.instance().removeDestinationEventListener(this);
        // unregister from the destination before continuing
        DestinationCache[] caches = (DestinationCache[])
                _caches.values().toArray(new DestinationCache[0]);
        for (int i = 0; i < caches.length; ++i) {
            caches[i].removeConsumer(this);
        }
        _caches.clear();

        if (!isPersistent()) {
            // for non-persistent consumers, destroy all outstanding message
            // handles
            MessageHandle[] handles = _handles.toArray();
            for (int i = 0; i < handles.length; ++i) {
                MessageHandle handle = handles[i];
                try {
                    handle.destroy();
                } catch (JMSException exception) {
                    _log.error(exception, exception);
                }
            }
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -