abstracttopicconsumerendpoint.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 472 行 · 第 1/2 页

JAVA
472
字号
     * @return <code>true</code> if the listener accepted the message;     * @throws JMSException         if the listener fails to handle the message     * @throws PersistenceException if there is a persistence related problem     */    public boolean persistentMessageAdded(MessageHandle handle,                                          MessageImpl message)            throws JMSException, PersistenceException {        boolean accepted = true;        // if the 'noLocal' indicator is set, and the message arrived on        // the same connection, ignore the message        if (getNoLocal() && message.getConnectionId() == getConnectionId()) {            accepted = false;        } else {            // create a message handle for this consumer            handle = new TopicConsumerMessageHandle(handle, this);            if (isPersistent()) {                // and make it persistent if this is a durable consumer                handle.add();            }            if (!_handles.contains(handle)) {                // if the message is not already in the cache then add it                addMessage(handle);            } else {                accepted = false;                _log.warn("Endpoint=" + this + " already has message cached: " +                          handle);            }        }        return accepted;    }    /**     * This event is called when a message is removed from the     * <code>DestinationCache</code>.     *     * @param messageId the identifier of the removed message     * @throws JMSException         if the listener fails to handle the message     * @throws PersistenceException if there is a persistence related problem     */    public void persistentMessageRemoved(String messageId)            throws JMSException, PersistenceException {        MessageHandle handle = _handles.remove(messageId);        if (handle != null) {            handle.destroy();        }    }    /**     * Invoked when a destination is created.     *     * @param destination the destination that was added     */    public void destinationAdded(JmsDestination destination) {        // no-op    }    /**     * Invoked when a destination is removed.     *     * @param destination the destination that was removed     */    public void destinationRemoved(JmsDestination destination) {        // no-op    }    /**     * Invoked when a message cache is created.     *     * @param destination the destination that messages are being cached for     * @param cache       the corresponding cache     */    public void cacheAdded(JmsDestination destination,                           DestinationCache cache) {        if (destination instanceof JmsTopic) {            JmsTopic myTopic = (JmsTopic) getDestination();            JmsTopic topic = (JmsTopic) destination;            if (myTopic.match(topic) && !_caches.containsKey(topic)) {                _caches.put(topic, cache);                cache.addConsumer(this);            }        }    }    /**     * Invoked when a message cache is removed.     *     * @param destination the destination that messages are no longer being     *                    cached for     * @param cache       the corresponding cache     */    public void cacheRemoved(JmsDestination destination,                             DestinationCache cache) {        if (destination instanceof JmsTopic) {            _caches.remove(destination);        }    }    /**     * Registers this with the associated {@link DestinationCache}s. The     * consumer may receive messages immediately.     *     * @throws JMSException for any JMS error     */    protected void init() throws JMSException {        JmsTopic topic = (JmsTopic) getDestination();        // register the endpoint with the destination        if (topic.isWildCard()) {            // if the topic is a wild card then we need to retrieve a            // set of matching destination caches.            _caches = _destinations.getTopicDestinationCaches(topic);            // for each cache register this endpoint as a consumer of            // it's messages. Before doing so register as a destination            // event listener with the DestinationManager            _destinations.addDestinationEventListener(this);            DestinationCache[] caches = getDestinationCaches();            for (int i = 0; i < caches.length; ++i) {                caches[i].addConsumer(this);            }        } else {            // if the topic is not a wildcard then we need to get the            // destination cache. If one does not exist then we need to            // create it.            DestinationCache cache = _destinations.getDestinationCache(topic);            _caches.put(topic, cache);            cache.addConsumer(this);        }    }    /**     * Set the connection identifier.     *     * @param connectionId the identity of the connection that owns this     *                     consumer     * @see #getConnectionId     */    protected void setConnectionId(long connectionId) {        _connectionId = connectionId;    }    /**     * Add the handle to the cache.     *     * @param handle the message handle to add     */    protected void addMessage(MessageHandle handle) {        _handles.add(handle);        notifyMessageAvailable();    }    /**     * Return the next available message to the client.     *     * @return the next message, or <code>null</code> if none is available     * @throws JMSException for any error     * @param cancel     */    protected MessageHandle doReceive(Condition cancel) throws JMSException {        MessageHandle result = null;        MessageHandle handle;        while (!cancel.get() && (handle = _handles.removeFirst()) != null) {            if (_log.isDebugEnabled()) {                _log.debug("doReceive() - next available=" + handle.getMessageId());            }            // ensure that the message still exists            MessageImpl message = handle.getMessage();            if (message != null) {                if (selects(message)) {                    // got a message which is applicable for the endpoint                    result = handle;                    break;                } else {                    // message has been filtered out so destroy the handle.                    handle.destroy();                }            }        }        if (_log.isDebugEnabled()) {            _log.debug("doReceive() - result=" + (result != null ? result.getMessageId() : null));        }        return result;    }    /**     * Closes this endpoint.     */    protected void doClose() {        // unregister as a destination event listener        _destinations.removeDestinationEventListener(this);        // unregister from the destination before continuing        DestinationCache[] caches = getDestinationCaches();        for (int i = 0; i < caches.length; ++i) {            caches[i].removeConsumer(this);        }        _caches.clear();        if (!isPersistent()) {            // for non-persistent consumers, destroy all outstanding message            // handles            MessageHandle[] handles = _handles.toArray();            for (int i = 0; i < handles.length; ++i) {                MessageHandle handle = handles[i];                try {                    handle.destroy();                } catch (JMSException exception) {                    _log.error(exception, exception);                }            }        }    }    /**     * Returns the destination manager.     *     * @return the destination manager     */    protected DestinationManager getDestinationManager() {        return _destinations;    }    /**     * Returns the destination caches.     *     * @return the destination caches     */    protected DestinationCache[] getDestinationCaches() {        return (DestinationCache[]) _caches.values().toArray(                new DestinationCache[0]);    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?