⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractconsumerendpoint.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
 */
package org.exolab.jms.messagemgr;

import javax.jms.InvalidSelectorException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.scheduler.Scheduler;
import org.exolab.jms.selector.Selector;
import org.exolab.jms.server.ServerConnection;
import org.exolab.jms.server.JmsServerSession;


/**
 * Abstract implementation of the {@link ConsumerEndpoint} interface.
 *
 * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
 * @version $Revision: 1.1 $ $Date: 2005/03/18 03:58:38 $
 */
public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint {

    /**
     * The listener for messages.
     */
    private final ConsumerEndpointListener _session;

    /**
     * The identity of this consumer.
     */
    private final long _id;

    /**
     * The identity of the connection that owns this consumer.
     */
    private final long _connectionId;

    /**
     * The destination the consumer is acceesing.
     */
    private final JmsDestination _destination;

    /**
     * The message selector associated with this consumer. May be
     * <code>null</code>.
     */
    private final Selector _selector;

    /**
     * If true, and the destination is a topic, inhibits the delivery of
     * messages published by its own connection.
     */
    private final boolean _noLocal;

    /**
     * This determines whether message delivery to the registered listener is
     * enabled or disabled.
     */
    private volatile boolean _stopped = true;

    /**
     * Identifies this endpoint as being closed.
     */
    private volatile boolean _closed = false;

    /**
     * Indicates whether the this cache has been scheduled with the dispatcher
     * for asynchronous message delivery.
     */
    private boolean _scheduled = false;

    /**
     * Synchronization helper for close() and deliverMessages().
     */
    private final Object _lock = new Object();

    /**
     * Flag to indicate that the consumer is waiting for a message.
     */
    private boolean _waitingForMessage = false;;

    /**
     * Holds the consumer's message listener. This means that messages will be
     * pushed down.
     */
    protected ConsumerEndpointListener _listener = null;

    /**
     * Maintains the maximum size of this cache.
     */
    protected int _size = 1000;

    /**
     * This is the scheduler that is used to deliver messages if a consumer has
     * a registered listener.
     */
    protected transient Scheduler _scheduler = null;

    /**
     * The logger.
     */
    private static final Log _log = LogFactory.getLog(
            AbstractConsumerEndpoint.class);


    /**
     * Construct a new <code>ConsumerEndpoint</code>.
     * <p/>
     * The destination and selector determine where it will be sourcing its
     * messages from, and scheduler is used to asynchronously deliver messages
     * to the consumer.
     *
     * @param consumerId  the identity of this consumer
     * @param session     the owning session
     * @param destination the destination to access
     * @param selector    the message selector. May be <code>null</code>
     * @param noLocal     if true, and the destination is a topic, inhibits the
     *                    delivery of messages published by its own connection.
     * @param scheduler   used to schedule asynchronous message delivery.
     * @throws InvalidSelectorException if the selector is not well formed
     */
    public AbstractConsumerEndpoint(long consumerId, JmsServerSession session,
                                    JmsDestination destination,
                                    String selector, boolean noLocal,
                                    Scheduler scheduler)
            throws InvalidSelectorException {
        if (session == null) {
            throw new IllegalArgumentException("Argument 'session' is null");
        }
        if (destination == null) {
            throw new IllegalArgumentException(
                    "Argument 'destination' is null");
        }
        if (scheduler == null) {
            throw new IllegalArgumentException("Argument 'scheduler' is null");
        }

        _id = consumerId;
        _connectionId = session.getConnectionId();
        _destination = destination;
        _selector = (selector != null) ? new Selector(selector) : null;
        _noLocal = noLocal;
        _session = session;
        _scheduler = scheduler;
    }

    /**
     * Returns the identity of this consumer.
     *
     * @return the identity of this consumer
     */
    public long getId() {
        return _id;
    }

    /**
     * Determines if this is a persistent or non-persistent consumer.
     * <p/>
     * If persistent, then the consumer is persistent accross subscriptions and
     * server restarts, and {@link #getPersistentId} returns a non-null value.
     *
     * @return <code>false</code>
     */
    public boolean isPersistent() {
        return false;
    }

    /**
     * Returns the persistent identifier for this consumer. This is the identity
     * of the consumer which is persistent across subscriptions and server
     * restarts.
     *
     * @return <code>null</code>
     */
    public String getPersistentId() {
        return null;
    }

    /**
     * Return the destination that this consumer is accessing.
     *
     * @return the destination that this consumer is accessing
     */
    public JmsDestination getDestination() {
        return _destination;
    }

    /**
     * Returns the identity of the connection that owns this consumer.
     *
     * @return the identity of the connection
     * @see ServerConnection#getConnectionId
     */
    public long getConnectionId() {
        return _connectionId;
    }

    /**
     * Returns the message selector.
     *
     * @return the message selector, or <code>null</code> if none was specified
     *         by the client
     */
    public Selector getSelector() {
        return _selector;
    }


    /**
     * Returns if locally produced messages are being inhibited.
     *
     * @return <code>true</code> if locally published messages are being
     *         inhibited.
     */
    public boolean getNoLocal() {
        return _noLocal;
    }

    /**
     * Stop/start message delivery.
     *
     * @param stop if <code>true</code> to stop message delivery, otherwise
     *             start it
     */
    public synchronized void setStopped(boolean stop) {
        if (stop) {
            _stopped = true;
        } else {
            _stopped = false;
            // schedule message delivery if needed
            schedule();
        }
    }

    /**
     * Set the message listener for this consumer. If a message listener is set
     * then messages will be scheduled to be sent to it when they are available
     * <p/>
     *
     * @param listener the message listener to add, or <code>null</code> to
     *                 remove an existing listener
     */
    public synchronized void setMessageListener(
            ConsumerEndpointListener listener) {
        _listener = listener;
        if (listener == null) {
            // remove this from the scheduler
            _scheduler.remove(this);
            _scheduled = false;
        } else {
            // schedule it to run
            schedule();
        }
    }

    /**
     * The run method is used to asynchronously deliver the messages in the
     * cache to the consumer, by invoking {@link #deliverMessages}.
     * <p/>
     * It is scheduled by the {@link Scheduler}.
     */
    public void run() {
        synchronized (_lock) {
            if (!_closed) {
                boolean reschedule = deliverMessages();
                _scheduled = false;
                if (reschedule) {
                    schedule();
                }
            }
        }
    }


    /**
     * Close this endpoint.
     * <p/>
     * This synchronizes with {@link #deliverMessages} before invoking
     * {@link #doClose}
     */
    public final void close() {
        _stopped = true;

        synchronized (_lock) {
            // synchronize with deliverMessages()
            _scheduler.remove(this); // remove this, if it is scheduled
            _scheduled = false;

        }

        synchronized (this) {
            doClose();
            _closed = true;
        }
    }

    /**
     * Returns a stringified version of the consumer.
     *
     * @return a stringified version of the consumer
     */
    public String toString() {
        return _id + ":" + getDestination();
    }


    /**
     * Deliver messages in the cache to the consumer.
     *
     * @return <code>true</code> if the endpoint should be rescheduled
     */
    protected abstract boolean deliverMessages();

    /**
     * Closes the endpoint.
     */
    protected abstract void doClose();

    /**
     * Schedule asynchronouse message delivery.
     */
    protected void schedule() {
        if (!_stopped && !_closed && _listener != null && !_scheduled) {
            _scheduled = true;
            _scheduler.add(this);
        }
    }

    /**
     * Determines if this endpoint has been stopped.
     *
     * @return <code>true</code> if this endpoint has been stopped
     */
    protected final boolean isStopped() {
        return _stopped;
    }

    /**
     * Check if the consumer is waiting for a message. If it is then notify it
     * that a message has arrived.
     */
    protected void notifyMessageAvailable() {
        // if we need to notify then send out the request
        if (isWaitingForMessage()) {
            clearWaitingForMessage();

            try {
                _session.onMessageAvailable(getId());
            } catch (Exception exception) {
                if (_log.isDebugEnabled()) {
                    _log.debug("Failed to notify consumer of available message",
                               exception);
                }
            }
        }
    }

    /**
     * Determines if the endpoint is waiting for a message.
     *
     * @return <code>true</code> if the endpoint is waiting
     */
    protected final boolean isWaitingForMessage() {
        return _waitingForMessage;
    }

    /**
     * Set the waiting for message flag.
     */
    protected final void setWaitingForMessage() {
        _waitingForMessage = true;
    }

    /**
     * Clear the waiting for message flag.
     */
    protected final void clearWaitingForMessage() {
        _waitingForMessage = false;
    }

    /**
     * Helper for {@link #deliverMessages} implementations, to determine if
     * asynchronous message delivery should be stopped.
     *
     * @return <code>true</code> if asynchronous message delivery should be
     *         stopped
     */
    protected boolean stopDelivery() {
        return (_stopped || getMessageCount() == 0 || _listener == null);
    }

    /**
     * Determines if a message is selected by the consumer.
     *
     * @param message the message to check
     * @return <code>true</code> if the message is selected; otherwise
     *         <code>false</code>
     */
    protected boolean selects(MessageImpl message) {
        return (_selector == null || _selector.selects(message));
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -