📄 defaultmessagelistenercontainer.java
字号:
/*
* Copyright 2002-2006 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.jms.listener;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import org.springframework.core.Constants;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.connection.JmsResourceHolder;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.CachingDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Message listener container that uses plain JMS client API, specifically
* a loop of <code>MessageConsumer.receive()</code> calls that also allow for
* transactional reception of messages (registering them with XA transactions).
* Designed to work in a native JMS environment as well as in a J2EE environment,
* with only minimal differences in configuration.
*
* <p>This is a simple but nevertheless powerful form of a message listener container.
* It creates a fixed number of JMS Sessions to invoke the listener, not allowing
* for dynamic adaptation to runtime demands. Like SimpleMessageListenerContainer,
* its main advantage is its low level of complexity and the minimum requirements
* on the JMS provider: Not even the ServerSessionPool facility is required.
*
* <p>Actual MessageListener execution happens in separate threads that are
* created through Spring's TaskExecutor abstraction. By default, the appropriate
* number of threads are created on startup, according to the "concurrentConsumers"
* setting. Specify an alternative TaskExecutor to integrate with an existing
* thread pool facility, for example. With a native JMS setup, each of those
* listener threads is gonna use a cached JMS Session and MessageConsumer
* (only refreshed in case of failure), using the JMS provider's resources
* as efficiently as possible.
*
* <p>Message reception and listener execution can automatically be wrapped
* in transactions through passing a Spring PlatformTransactionManager into the
* "transactionManager" property. This will usually be a JtaTransactionManager
* in a J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory
* that this message listener container fetches its Connections from (check
* your J2EE server's documentation). Note that this listener container will
* automatically reobtain all JMS handles for each transaction in case of an
* external transaction manager specified, for compatibility with all J2EE servers
* (in particular JBoss). This non-caching behavior can be overridden through the
* "cacheLevel"/"cacheLevelName" property, enforcing caching of the Connection (or
* also Session and MessageConsumer) even in case of an external transaction manager.
*
* <p>See the {@link AbstractMessageListenerContainer AbstractMessageListenerContainer}
* javadoc for details on acknowledge modes and transaction options.
*
* <p>This class requires a JMS 1.1+ provider, because it builds on the
* domain-independent API. <b>Use the {@link DefaultMessageListenerContainer102
* DefaultMessageListenerContainer102} subclass for JMS 1.0.2 providers.</b>
*
* <p>For dynamic adaptation of the active number of Sessions, consider using
* ServerSessionMessageListenerContainer.
*
* @author Juergen Hoeller
* @since 2.0
* @see #setTransactionManager
* @see #setCacheLevel
* @see #setCacheLevelName
* @see org.springframework.transaction.jta.JtaTransactionManager
* @see javax.jms.MessageConsumer#receive(long)
* @see SimpleMessageListenerContainer
* @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
* @see DefaultMessageListenerContainer102
*/
public class DefaultMessageListenerContainer extends AbstractMessageListenerContainer {
/**
* Default thread name prefix: "SimpleAsyncTaskExecutor-".
*/
public static final String DEFAULT_THREAD_NAME_PREFIX =
ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
/**
* The default receive timeout: 1000 ms = 1 second.
*/
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
/**
* The default recovery interval: 5000 ms = 5 seconds.
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
/**
* Constant that indicates to cache no JMS resources at all.
* @see #setCacheLevel
*/
public static final int CACHE_NONE = 0;
/**
* Constant that indicates to cache a shared JMS Connection.
* @see #setCacheLevel
*/
public static final int CACHE_CONNECTION = 1;
/**
* Constant that indicates to cache a shared JMS Connection
* and a JMS Session for each listener thread.
* @see #setCacheLevel
*/
public static final int CACHE_SESSION = 2;
/**
* Constant that indicates to cache a shared JMS Connection
* and a JMS Session for each listener thread, as well as
* a JMS MessageConsumer for each listener thread.
* @see #setCacheLevel
*/
public static final int CACHE_CONSUMER = 3;
private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
private final MessageListenerContainerResourceFactory transactionalResourceFactory =
new MessageListenerContainerResourceFactory();
private boolean pubSubNoLocal = false;
private TaskExecutor taskExecutor;
private int concurrentConsumers = 1;
private int maxMessagesPerTask = Integer.MIN_VALUE;
private PlatformTransactionManager transactionManager;
private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
private Integer cacheLevel;
private Object currentRecoveryMarker = new Object();
private final Object recoveryMonitor = new Object();
private int activeInvokerCount = 0;
private final Object activeInvokerMonitor = new Object();
/**
* Set whether to inhibit the delivery of messages published by its own connection.
* Default is "false".
* @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
*/
public void setPubSubNoLocal(boolean pubSubNoLocal) {
this.pubSubNoLocal = pubSubNoLocal;
}
/**
* Return whether to inhibit the delivery of messages published by its own connection.
*/
protected boolean isPubSubNoLocal() {
return pubSubNoLocal;
}
/**
* Set the Spring TaskExecutor to use for running the listener threads.
* Default is SimpleAsyncTaskExecutor, starting up a number of new threads,
* according to the specified number of concurrent consumers.
* <p>Specify an alternative TaskExecutor for integration with an existing
* thread pool. Note that this really only adds value if the threads are
* managed in a specific fashion, for example within a J2EE environment.
* A plain thread pool does not add much value, as this listener container
* will occupy a number of threads for its entire lifetime.
* @see #setConcurrentConsumers
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Specify the number of concurrent consumers to create.
* Default is 1.
*/
public void setConcurrentConsumers(int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "concurrentConsumers must be positive");
this.concurrentConsumers = concurrentConsumers;
}
/**
* Set the maximum number of messages to process in one task.
* More concretely, this limits the number of message reception attempts,
* which includes receive iterations that did not actually pick up a
* message until they hit their timeout (see "receiveTimeout" property).
* <p>Default is unlimited (-1) in case of a standard TaskExecutor,
* and 1 in case of a SchedulingTaskExecutor that indicates a preference for
* short-lived tasks. Specify a number of 10 to 100 messages to balance
* between extremely long-lived and extremely short-lived tasks here.
* <p>Long-lived tasks avoid frequent thread context switches through
* sticking with the same thread all the way through, while short-lived
* tasks allow thread pools to control the scheduling. Hence, thread
* pools will usually prefer short-lived tasks.
* @see #setTaskExecutor
* @see #setReceiveTimeout
* @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "maxMessagesPerTask must not be 0");
this.maxMessagesPerTask = maxMessagesPerTask;
}
/**
* Specify the Spring PlatformTransactionManager to use for transactional
* wrapping of message reception plus listener execution.
* Default is none, not performing any transactional wrapping.
* <p>If specified, this will usually be a Spring JtaTransactionManager,
* in combination with a JTA-aware ConnectionFactory that this message
* listener container fetches its Connections from.
* <p>Alternatively, pass in a fully configured Spring TransactionTemplate
* into the "transactionTemplate" property.
*/
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
* Default is none, using the transaction manager's default timeout.
* @see org.springframework.transaction.TransactionDefinition#getTimeout()
* @see #setReceiveTimeout
*/
public void setTransactionTimeout(int transactionTimeout) {
this.transactionDefinition.setTimeout(transactionTimeout);
}
/**
* Set the timeout to use for receive calls, in <b>milliseconds</b>.
* The default is 1000 ms, that is, 1 second.
* <p><b>NOTE:</b> This value needs to be smaller than the transaction
* timeout used by the transaction manager (in the appropriate unit,
* of course). -1 indicates no timeout at all; however, this is only
* feasible if not running within a transaction manager.
* @see javax.jms.MessageConsumer#receive(long)
* @see javax.jms.MessageConsumer#receive
* @see #setTransactionTimeout
*/
public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
/**
* Specify the interval between recovery attempts, in <b>milliseconds</b>.
* The default is 5000 ms, that is, 5 seconds.
* @see #handleListenerSetupFailure
*/
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
/**
* Specify the level of caching that this listener container is allowed to apply,
* in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
* @see #setCacheLevel
*/
public void setCacheLevelName(String constantName) throws IllegalArgumentException {
if (constantName == null || !constantName.startsWith("CACHE_")) {
throw new IllegalArgumentException("Only cache constants allowed");
}
setCacheLevel(constants.asNumber(constantName).intValue());
}
/**
* Specify the level of caching that this listener container is allowed to apply.
* <p>Default is CACHE_NONE if an external transaction manager has been specified
* (to reobtain all resources freshly within the scope of the external transaction),
* and CACHE_CONSUMER else (operating with local JMS resources).
* <p>Some J2EE servers only register their JMS resources with an ongoing XA
* transaction in case of a freshly obtained JMS Connection and Session,
* which is why this listener container does by default not cache any of those.
* However, if you want to optimize for a specific server, consider switching
* this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
* conjunction with an external transaction manager.
* <p>Currently known servers that absolutely require CACHE_NONE for XA
* transaction processing: JBoss 4. For any others, consider raising the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -