📄 abstractconsumerendpoint.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
*/
package org.exolab.jms.messagemgr;
import javax.jms.InvalidSelectorException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.scheduler.Scheduler;
import org.exolab.jms.selector.Selector;
import org.exolab.jms.server.ServerConnection;
import org.exolab.jms.server.JmsServerSession;
/**
* Abstract implementation of the {@link ConsumerEndpoint} interface.
*
* @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
* @version $Revision: 1.1 $ $Date: 2005/03/18 03:58:38 $
*/
public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint {
/**
* The listener for messages.
*/
private final ConsumerEndpointListener _session;
/**
* The identity of this consumer.
*/
private final long _id;
/**
* The identity of the connection that owns this consumer.
*/
private final long _connectionId;
/**
* The destination the consumer is acceesing.
*/
private final JmsDestination _destination;
/**
* The message selector associated with this consumer. May be
* <code>null</code>.
*/
private final Selector _selector;
/**
* If true, and the destination is a topic, inhibits the delivery of
* messages published by its own connection.
*/
private final boolean _noLocal;
/**
* This determines whether message delivery to the registered listener is
* enabled or disabled.
*/
private volatile boolean _stopped = true;
/**
* Identifies this endpoint as being closed.
*/
private volatile boolean _closed = false;
/**
* Indicates whether the this cache has been scheduled with the dispatcher
* for asynchronous message delivery.
*/
private boolean _scheduled = false;
/**
* Synchronization helper for close() and deliverMessages().
*/
private final Object _lock = new Object();
/**
* Flag to indicate that the consumer is waiting for a message.
*/
private boolean _waitingForMessage = false;;
/**
* Holds the consumer's message listener. This means that messages will be
* pushed down.
*/
protected ConsumerEndpointListener _listener = null;
/**
* Maintains the maximum size of this cache.
*/
protected int _size = 1000;
/**
* This is the scheduler that is used to deliver messages if a consumer has
* a registered listener.
*/
protected transient Scheduler _scheduler = null;
/**
* The logger.
*/
private static final Log _log = LogFactory.getLog(
AbstractConsumerEndpoint.class);
/**
* Construct a new <code>ConsumerEndpoint</code>.
* <p/>
* The destination and selector determine where it will be sourcing its
* messages from, and scheduler is used to asynchronously deliver messages
* to the consumer.
*
* @param consumerId the identity of this consumer
* @param session the owning session
* @param destination the destination to access
* @param selector the message selector. May be <code>null</code>
* @param noLocal if true, and the destination is a topic, inhibits the
* delivery of messages published by its own connection.
* @param scheduler used to schedule asynchronous message delivery.
* @throws InvalidSelectorException if the selector is not well formed
*/
public AbstractConsumerEndpoint(long consumerId, JmsServerSession session,
JmsDestination destination,
String selector, boolean noLocal,
Scheduler scheduler)
throws InvalidSelectorException {
if (session == null) {
throw new IllegalArgumentException("Argument 'session' is null");
}
if (destination == null) {
throw new IllegalArgumentException(
"Argument 'destination' is null");
}
if (scheduler == null) {
throw new IllegalArgumentException("Argument 'scheduler' is null");
}
_id = consumerId;
_connectionId = session.getConnectionId();
_destination = destination;
_selector = (selector != null) ? new Selector(selector) : null;
_noLocal = noLocal;
_session = session;
_scheduler = scheduler;
}
/**
* Returns the identity of this consumer.
*
* @return the identity of this consumer
*/
public long getId() {
return _id;
}
/**
* Determines if this is a persistent or non-persistent consumer.
* <p/>
* If persistent, then the consumer is persistent accross subscriptions and
* server restarts, and {@link #getPersistentId} returns a non-null value.
*
* @return <code>false</code>
*/
public boolean isPersistent() {
return false;
}
/**
* Returns the persistent identifier for this consumer. This is the identity
* of the consumer which is persistent across subscriptions and server
* restarts.
*
* @return <code>null</code>
*/
public String getPersistentId() {
return null;
}
/**
* Return the destination that this consumer is accessing.
*
* @return the destination that this consumer is accessing
*/
public JmsDestination getDestination() {
return _destination;
}
/**
* Returns the identity of the connection that owns this consumer.
*
* @return the identity of the connection
* @see ServerConnection#getConnectionId
*/
public long getConnectionId() {
return _connectionId;
}
/**
* Returns the message selector.
*
* @return the message selector, or <code>null</code> if none was specified
* by the client
*/
public Selector getSelector() {
return _selector;
}
/**
* Returns if locally produced messages are being inhibited.
*
* @return <code>true</code> if locally published messages are being
* inhibited.
*/
public boolean getNoLocal() {
return _noLocal;
}
/**
* Stop/start message delivery.
*
* @param stop if <code>true</code> to stop message delivery, otherwise
* start it
*/
public synchronized void setStopped(boolean stop) {
if (stop) {
_stopped = true;
} else {
_stopped = false;
// schedule message delivery if needed
schedule();
}
}
/**
* Set the message listener for this consumer. If a message listener is set
* then messages will be scheduled to be sent to it when they are available
* <p/>
*
* @param listener the message listener to add, or <code>null</code> to
* remove an existing listener
*/
public synchronized void setMessageListener(
ConsumerEndpointListener listener) {
_listener = listener;
if (listener == null) {
// remove this from the scheduler
_scheduler.remove(this);
_scheduled = false;
} else {
// schedule it to run
schedule();
}
}
/**
* The run method is used to asynchronously deliver the messages in the
* cache to the consumer, by invoking {@link #deliverMessages}.
* <p/>
* It is scheduled by the {@link Scheduler}.
*/
public void run() {
synchronized (_lock) {
if (!_closed) {
boolean reschedule = deliverMessages();
_scheduled = false;
if (reschedule) {
schedule();
}
}
}
}
/**
* Close this endpoint.
* <p/>
* This synchronizes with {@link #deliverMessages} before invoking
* {@link #doClose}
*/
public final void close() {
_stopped = true;
synchronized (_lock) {
// synchronize with deliverMessages()
_scheduler.remove(this); // remove this, if it is scheduled
_scheduled = false;
}
synchronized (this) {
doClose();
_closed = true;
}
}
/**
* Returns a stringified version of the consumer.
*
* @return a stringified version of the consumer
*/
public String toString() {
return _id + ":" + getDestination();
}
/**
* Deliver messages in the cache to the consumer.
*
* @return <code>true</code> if the endpoint should be rescheduled
*/
protected abstract boolean deliverMessages();
/**
* Closes the endpoint.
*/
protected abstract void doClose();
/**
* Schedule asynchronouse message delivery.
*/
protected void schedule() {
if (!_stopped && !_closed && _listener != null && !_scheduled) {
_scheduled = true;
_scheduler.add(this);
}
}
/**
* Determines if this endpoint has been stopped.
*
* @return <code>true</code> if this endpoint has been stopped
*/
protected final boolean isStopped() {
return _stopped;
}
/**
* Check if the consumer is waiting for a message. If it is then notify it
* that a message has arrived.
*/
protected void notifyMessageAvailable() {
// if we need to notify then send out the request
if (isWaitingForMessage()) {
clearWaitingForMessage();
try {
_session.onMessageAvailable(getId());
} catch (Exception exception) {
if (_log.isDebugEnabled()) {
_log.debug("Failed to notify consumer of available message",
exception);
}
}
}
}
/**
* Determines if the endpoint is waiting for a message.
*
* @return <code>true</code> if the endpoint is waiting
*/
protected final boolean isWaitingForMessage() {
return _waitingForMessage;
}
/**
* Set the waiting for message flag.
*/
protected final void setWaitingForMessage() {
_waitingForMessage = true;
}
/**
* Clear the waiting for message flag.
*/
protected final void clearWaitingForMessage() {
_waitingForMessage = false;
}
/**
* Helper for {@link #deliverMessages} implementations, to determine if
* asynchronous message delivery should be stopped.
*
* @return <code>true</code> if asynchronous message delivery should be
* stopped
*/
protected boolean stopDelivery() {
return (_stopped || getMessageCount() == 0 || _listener == null);
}
/**
* Determines if a message is selected by the consumer.
*
* @param message the message to check
* @return <code>true</code> if the message is selected; otherwise
* <code>false</code>
*/
protected boolean selects(MessageImpl message) {
return (_selector == null || _selector.selects(message));
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -