abstracttopicconsumerendpoint.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 472 行 · 第 1/2 页
JAVA
472 行
* @return <code>true</code> if the listener accepted the message; * @throws JMSException if the listener fails to handle the message * @throws PersistenceException if there is a persistence related problem */ public boolean persistentMessageAdded(MessageHandle handle, MessageImpl message) throws JMSException, PersistenceException { boolean accepted = true; // if the 'noLocal' indicator is set, and the message arrived on // the same connection, ignore the message if (getNoLocal() && message.getConnectionId() == getConnectionId()) { accepted = false; } else { // create a message handle for this consumer handle = new TopicConsumerMessageHandle(handle, this); if (isPersistent()) { // and make it persistent if this is a durable consumer handle.add(); } if (!_handles.contains(handle)) { // if the message is not already in the cache then add it addMessage(handle); } else { accepted = false; _log.warn("Endpoint=" + this + " already has message cached: " + handle); } } return accepted; } /** * This event is called when a message is removed from the * <code>DestinationCache</code>. * * @param messageId the identifier of the removed message * @throws JMSException if the listener fails to handle the message * @throws PersistenceException if there is a persistence related problem */ public void persistentMessageRemoved(String messageId) throws JMSException, PersistenceException { MessageHandle handle = _handles.remove(messageId); if (handle != null) { handle.destroy(); } } /** * Invoked when a destination is created. * * @param destination the destination that was added */ public void destinationAdded(JmsDestination destination) { // no-op } /** * Invoked when a destination is removed. * * @param destination the destination that was removed */ public void destinationRemoved(JmsDestination destination) { // no-op } /** * Invoked when a message cache is created. * * @param destination the destination that messages are being cached for * @param cache the corresponding cache */ public void cacheAdded(JmsDestination destination, DestinationCache cache) { if (destination instanceof JmsTopic) { JmsTopic myTopic = (JmsTopic) getDestination(); JmsTopic topic = (JmsTopic) destination; if (myTopic.match(topic) && !_caches.containsKey(topic)) { _caches.put(topic, cache); cache.addConsumer(this); } } } /** * Invoked when a message cache is removed. * * @param destination the destination that messages are no longer being * cached for * @param cache the corresponding cache */ public void cacheRemoved(JmsDestination destination, DestinationCache cache) { if (destination instanceof JmsTopic) { _caches.remove(destination); } } /** * Registers this with the associated {@link DestinationCache}s. The * consumer may receive messages immediately. * * @throws JMSException for any JMS error */ protected void init() throws JMSException { JmsTopic topic = (JmsTopic) getDestination(); // register the endpoint with the destination if (topic.isWildCard()) { // if the topic is a wild card then we need to retrieve a // set of matching destination caches. _caches = _destinations.getTopicDestinationCaches(topic); // for each cache register this endpoint as a consumer of // it's messages. Before doing so register as a destination // event listener with the DestinationManager _destinations.addDestinationEventListener(this); DestinationCache[] caches = getDestinationCaches(); for (int i = 0; i < caches.length; ++i) { caches[i].addConsumer(this); } } else { // if the topic is not a wildcard then we need to get the // destination cache. If one does not exist then we need to // create it. DestinationCache cache = _destinations.getDestinationCache(topic); _caches.put(topic, cache); cache.addConsumer(this); } } /** * Set the connection identifier. * * @param connectionId the identity of the connection that owns this * consumer * @see #getConnectionId */ protected void setConnectionId(long connectionId) { _connectionId = connectionId; } /** * Add the handle to the cache. * * @param handle the message handle to add */ protected void addMessage(MessageHandle handle) { _handles.add(handle); notifyMessageAvailable(); } /** * Return the next available message to the client. * * @return the next message, or <code>null</code> if none is available * @throws JMSException for any error * @param cancel */ protected MessageHandle doReceive(Condition cancel) throws JMSException { MessageHandle result = null; MessageHandle handle; while (!cancel.get() && (handle = _handles.removeFirst()) != null) { if (_log.isDebugEnabled()) { _log.debug("doReceive() - next available=" + handle.getMessageId()); } // ensure that the message still exists MessageImpl message = handle.getMessage(); if (message != null) { if (selects(message)) { // got a message which is applicable for the endpoint result = handle; break; } else { // message has been filtered out so destroy the handle. handle.destroy(); } } } if (_log.isDebugEnabled()) { _log.debug("doReceive() - result=" + (result != null ? result.getMessageId() : null)); } return result; } /** * Closes this endpoint. */ protected void doClose() { // unregister as a destination event listener _destinations.removeDestinationEventListener(this); // unregister from the destination before continuing DestinationCache[] caches = getDestinationCaches(); for (int i = 0; i < caches.length; ++i) { caches[i].removeConsumer(this); } _caches.clear(); if (!isPersistent()) { // for non-persistent consumers, destroy all outstanding message // handles MessageHandle[] handles = _handles.toArray(); for (int i = 0; i < handles.length; ++i) { MessageHandle handle = handles[i]; try { handle.destroy(); } catch (JMSException exception) { _log.error(exception, exception); } } } } /** * Returns the destination manager. * * @return the destination manager */ protected DestinationManager getDestinationManager() { return _destinations; } /** * Returns the destination caches. * * @return the destination caches */ protected DestinationCache[] getDestinationCaches() { return (DestinationCache[]) _caches.values().toArray( new DestinationCache[0]); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?