📄 abstracttopicconsumerendpoint.java
字号:
/**
* This event is called when a message is removed from the
* <code>DestinationCache</code>.
*
* @param messageId the identifier of the removed message
* @throws JMSException if the listener fails to handle the message
*/
public void messageRemoved(String messageId) throws JMSException {
MessageHandle handle = _handles.remove(messageId);
if (handle != null) {
handle.destroy();
}
}
/**
* This event is called when a persistent message is added to the
* <code>DestinationCache</code>.
*
* @param handle a handle to the added message
* @param message the added message
* @param connection the database connection
* @return <code>true</code> if the listener accepted the message;
* @throws JMSException if the listener fails to handle the message
* @throws PersistenceException if there is a persistence related problem
*/
public boolean persistentMessageAdded(MessageHandle handle,
MessageImpl message,
Connection connection)
throws JMSException, PersistenceException {
boolean accepted = true;
// if the 'noLocal' indicator is set, and the message arrived on
// the same connection, ignore the message
if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
accepted = false;
} else {
// create a message handle for this consumer
handle = new TopicConsumerMessageHandle(handle, this);
if (isPersistent()) {
// and make it persistent if this is a durable consumer
handle.add(connection);
}
accepted = _handles.add(handle);
if (accepted) {
addMessage(handle);
schedule();
} else {
accepted = false;
_log.warn("Endpoint=" + this + " already has message cached: " +
handle);
}
}
return accepted;
}
/**
* This event is called when a message is removed from the
* <code>DestinationCache</code>.
*
* @param messageId the identifier of the removed message
* @param connection the database connection
* @throws JMSException if the listener fails to handle the message
* @throws PersistenceException if there is a persistence related problem
*/
public void persistentMessageRemoved(String messageId,
Connection connection)
throws JMSException, PersistenceException {
MessageHandle handle = _handles.remove(messageId);
if (handle != null) {
handle.destroy(connection);
}
}
/**
* This method is called when a new destination is added to the {@link
* DestinationManager}.
*
* @param destination the destination that was added
* @param cache the corresponding cache
*/
public void destinationAdded(JmsDestination destination,
DestinationCache cache) {
if (destination instanceof JmsTopic) {
JmsTopic myTopic = (JmsTopic) getDestination();
JmsTopic topic = (JmsTopic) destination;
if (myTopic.match(topic) && !_caches.containsKey(topic)) {
_caches.put(topic, cache);
cache.addConsumer(this);
}
}
}
/**
* This method is called when a destination is removed from the {@link
* DestinationManager}.
*
* @param destination the destination that was removed
* @param cache the corresponding cache
*/
public void destinationRemoved(JmsDestination destination,
DestinationCache cache) {
if (destination instanceof JmsTopic) {
_caches.remove(destination);
}
}
/**
* Deliver messages in the cache to the consumer.
*
* @return <code>true</code> if the endpoint should be rescheduled
*/
protected boolean deliverMessages() {
boolean reschedule = true;
for (int index = 0; index < MAX_MESSAGES;) {
// check if we should exit the loop
if (stopDelivery()) {
reschedule = false;
break;
}
// Process the first message on the list.
MessageHandle handle = _handles.removeFirst();
try {
Selector selector = getSelector();
if (selector != null) {
MessageImpl m = handle.getMessage();
if ((m != null) && selector.selects(m)) {
// this message has been selected by the selector
_listener.onMessage(handle);
index++;
} else {
// this message has not been selected
handle.destroy();
}
} else {
// send the message to the consumer
_listener.onMessage(handle);
index++;
}
} catch (RemoteException exception) {
_listener = null;
returnMessage(handle);
} catch (JMSException exception) {
_log.error(exception, exception);
returnMessage(handle);
} catch (Exception exception) {
_log.error(exception, exception);
returnMessage(handle);
}
}
return reschedule;
}
/**
* Registers this with the associated {@link DestinationCache}s The consumer
* may receive messages immediately.
*
* @throws JMSException for any JMS error
*/
protected void init() throws JMSException {
JmsTopic topic = (JmsTopic) getDestination();
// register the endpoint with the destination
DestinationManager destmgr = DestinationManager.instance();
if (topic.isWildCard()) {
// if the topic is a wild card then we need to retrieve a
// set of matching destination caches.
_caches = destmgr.getTopicDestinationCaches(topic);
// for each cache register this endpoint as a consumer of
// it's messages. Before doing so register as a destination
// event listener with the DestinationManager
destmgr.addDestinationEventListener(this);
Iterator iterator = _caches.values().iterator();
while (iterator.hasNext()) {
DestinationCache cache = (DestinationCache) iterator.next();
cache.addConsumer(this);
}
} else {
// if the topic is not a wildcard then we need to get the
// destination cache. If one does not exist then we need to
// create it.
DestinationCache cache = destmgr.getDestinationCache(topic);
_caches.put(topic, cache);
cache.addConsumer(this);
}
}
/**
* Add the handle to the cache.
*
* @param handle the message handle to add
*/
protected void addMessage(MessageHandle handle) {
_handles.add(handle);
notifyMessageAvailable();
}
/**
* Closes this endpoint.
*/
protected void doClose() {
// unregister as a destination event listener
DestinationManager.instance().removeDestinationEventListener(this);
// unregister from the destination before continuing
DestinationCache[] caches = (DestinationCache[])
_caches.values().toArray(new DestinationCache[0]);
for (int i = 0; i < caches.length; ++i) {
caches[i].removeConsumer(this);
}
_caches.clear();
if (!isPersistent()) {
// for non-persistent consumers, destroy all outstanding message
// handles
MessageHandle[] handles = _handles.toArray();
for (int i = 0; i < handles.length; ++i) {
MessageHandle handle = handles[i];
try {
handle.destroy();
} catch (JMSException exception) {
_log.error(exception, exception);
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -