📄 consumerendpoint.java
字号:
*/
public JmsServerSession getSession() {
return _session;
}
/**
* Deliver messages in the cache to the consumer
*
* @return <code>true</code> if the endpoint should be rescheduled
*/
public abstract boolean deliverMessages();
/**
* The run method is used to asynchronously deliver the messages in the
* cache to the consumer, by invoking {@link #deliverMessages}.
* <p>
* It is scheduled by the {@link Scheduler}.
*/
public void run() {
synchronized (_lock) {
if (!_closed) {
boolean reschedule = deliverMessages();
_scheduled = false;
if (reschedule) {
schedule();
}
}
}
}
// implementation of DestinationCacheEventListener.messageAdded
public synchronized boolean messageAdded(MessageImpl message) {
boolean added = false;
// create a message handle
try {
// if the nolocal indicator is set and the message arrived on
// the same connection, add this consumer then mark it as
// received, but do not add it to the queue
if (getNoLocal()
&& message.getConnectionId() == getConnectionId()) {
// inform them that we have processed the message
return true;
}
MessageHandle handle =
MessageHandleFactory.getHandle(this, message);
if (!_cache.containsHandle(handle)) {
// if the message is not already in the cache then add it
// and flag that we have added the message to the cache
addMessage(handle, message);
added = true;
schedule();
}
} catch (JMSException exception) {
_log.error("Failed to add message to endpoint", exception);
}
return added;
}
// implementation of DestinationCacheEventListener.messageRemoved
public synchronized boolean messageRemoved(MessageImpl message) {
boolean removed = false;
try {
//retrieve the message handle
MessageHandle handle =
MessageHandleFactory.getHandle(this, message);
if (_cache.containsHandle(handle)) {
// call remove regardless whether it exists
removeMessage(handle);
removed = true;
}
} catch (JMSException exception) {
_log.error("Failed to remove message from endpoint", exception);
}
return removed;
}
// implementation of DestinationCacheEventListener.persistentMessageAdded
public synchronized boolean persistentMessageAdded(Connection connection,
MessageImpl message)
throws PersistenceException {
return messageAdded(message);
}
// implementation of DestinationCacheEventListener.persistentMessageRemoved
public synchronized boolean persistentMessageRemoved(Connection connection,
MessageImpl message)
throws PersistenceException {
return messageRemoved(message);
}
/**
* Stop/start message delivery
*
* @param stop if <code>true</code> to stop message delivery, otherwise
* start it
*/
public synchronized void setStopped(boolean stop) {
if (stop) {
_stopped = true;
} else {
_stopped = false;
// schedule message delivery if needed
schedule();
}
}
/**
* This message will return all unacked messages to the queue and allow
* them to be resent to the consumer with the redelivery flag on.
*/
public synchronized void recover() {
// default behaviour is to do nothing
}
/**
* Close this endpoint.
* <p>
* This synchronizes with {@link #deliverMessages} before invoking
* @link {#doClose}
*/
public final void close() {
_stopped = true;
synchronized (_lock) {
// synchronize with deliverMessages()
_scheduler.remove(this); // remove this, if it is scheduled
_scheduled = false;
}
synchronized (this) {
doClose();
// clear all messages in the cache
if (_cache != null) {
_cache.clear();
}
_closed = true;
}
}
/**
* Set the message listener for this consmer. If a message listener is set
* then messages will be scheduled to be sent to it when they are available
* <p>
* Each consumer cache can only have a single message listener. To remove
* the message listener call this method with null argument
*
* @param listener - the message listener to add.
*/
public synchronized void setMessageListener(
InternalMessageListener listener) {
_listener = listener;
if (listener == null) {
// remove this from the scheduler
_scheduler.remove(this);
_scheduled = false;
} else {
// scheduler for it to run
schedule();
}
}
/**
* Return the specified message to the cache.
*
* @param handle - handle to return
*/
public synchronized void returnMessage(MessageHandle handle) {
if (_cache != null) {
addMessage(handle);
schedule();
}
}
/**
* Return the next message to the client. This will also mark the message as
* sent and move it to the sent queue
*
* @param wait - the number of milliseconds to wait
* @return MessageHandle - handle to the next message in the list
*/
abstract public MessageHandle receiveMessage(long wait);
// implementation of GarbageCollectable.collectGarbage
public void collectGarbage(boolean aggressive) {
if (aggressive) {
// clear all persistent messages in the cache
_cache.clearPersistentMessages();
if (_log.isDebugEnabled()) {
_log.debug("Evicted all persistent messages from dest "
+ getDestination().getName() + " and name "
+ getId());
}
}
if (_log.isDebugEnabled()) {
_log.debug("ENDPOINT- " + getDestination().getName() + ":"
+ getPersistentId() + " Messages: P["
+ _cache.getPersistentCount() + "] T["
+ _cache.getTransientCount() + "] Handles: ["
+ _cache.getHandleCount() + "]");
}
}
/**
* Closes the endpoint
*/
protected abstract void doClose();
/**
* Schedule asynchronouse message delivery
*/
protected void schedule() {
if (!_stopped && !_closed && _listener != null && !_scheduled) {
_scheduled = true;
_scheduler.add(this);
}
}
/**
* Clear all messages in the cache, regardless of whether they are
* persistent or non-persistent
*/
protected void clearMessages() {
_cache.clear();
}
/**
* Check whether the vector of handles contains one or more persistent
* handles
*
* @param handles - collection of {@link MessageHandle} objects
* @return true if there is one or more persistent handles
*/
protected boolean collectionHasPersistentHandles(Vector collection) {
boolean result = false;
Enumeration enum = collection.elements();
while (enum.hasMoreElements()) {
if (enum.nextElement() instanceof PersistentMessageHandle) {
result = true;
break;
}
}
return result;
}
/**
* Add the handle to the cache
*
* @param handle - the message handle to add
*/
protected void addMessage(MessageHandle handle) {
handle.setConsumerName(getPersistentId());
_cache.addHandle(handle);
// check to see whether the consumer is waiting to
// be notified
if (isWaitingForMessage()) {
notifyMessageAvailable();
}
}
/**
* Cache a handle and its corresponding message
*
* @param handle the handle to cache
* @param message the corresponding message to cache
*/
protected void addMessage(MessageHandle handle, MessageImpl message) {
handle.setConsumerName(getPersistentId());
_cache.addMessage(handle, message);
// check to see whether the consumer is waiting to
// be notified
if (isWaitingForMessage()) {
notifyMessageAvailable();
}
}
/**
* Return the message for the specified handle
*
* @param handle - the handle
* @return MessageImpl - the associated message
*/
protected MessageImpl getMessage(MessageHandle handle) {
return _cache.getMessage(handle);
}
/**
* Remove the handle from the cache
*
* @param handle the handle to remove
* @return <code>true</code> if the message was removed
*/
protected boolean removeMessage(MessageHandle handle) {
return _cache.removeHandle(handle);
}
/**
* Determines if a message handle is already cached
*
* @return <code>true</code> if it is cached
*/
protected boolean containsMessage(MessageHandle handle) {
return _cache.containsHandle(handle);
}
/**
* Return the first message handle in the cache
*
* @return the first message or null if cache is empty
*/
protected MessageHandle removeFirstMessage() {
return _cache.removeFirstHandle();
}
/**
* Delete the message with the specified handle from the cache
*
* @param handle the handle
*/
protected void deleteMessage(MessageHandle handle) {
_cache.removeMessage(handle.getMessageId());
}
/**
* Determines if this endpoint has been stopped
*
* @return <code>true</code> if this endpoint has been stopped
*/
protected final boolean isStopped() {
return _stopped;
}
/**
* Check if the consumer is waiting for a message. If it is then
* notify it that a message has arrived
*/
protected void notifyMessageAvailable() {
// if we need to notify then send out the request
if (isWaitingForMessage()) {
clearWaitingForMessage();
try {
_session.onMessageAvailable(getClientId());
} catch (Exception exception) {
//getLogger().logError("Error in notifyMessageAvailable " +
// getDestination().getName() + " " + exception.toString());
}
}
}
/**
* Check whether the endpoint is waiting for a message
*
* @return boolean
*/
protected final boolean isWaitingForMessage() {
return _waitingForMessage;
}
/**
* Set the waiting for message flag
*/
protected final void setWaitingForMessage() {
_waitingForMessage = true;
}
/**
* Clear the waiting for message flag
*/
protected final void clearWaitingForMessage() {
_waitingForMessage = false;
}
/**
* Helper for {@link #deliverMessages} implementations, to determines if
* asynchronous message delivery should be stopped
*
* @return <code>true</code> if asynchronous message delivery should be
* stopped
*/
protected boolean stopDelivery() {
return (_stopped || getMessageCount() == 0 || _listener == null);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -