defaultmessagelistenercontainer.java

来自「有关此类编程有心德的高手 希望能够多多给予指教」· Java 代码 · 共 914 行 · 第 1/3 页

JAVA
914
字号
/*
 * Copyright 2002-2007 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.jms.listener;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.springframework.core.Constants;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.CachingDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
 * Message listener container variant that uses plain JMS client API, specifically
 * a loop of <code>MessageConsumer.receive()</code> calls that also allow for
 * transactional reception of messages (registering them with XA transactions).
 * Designed to work in a native JMS environment as well as in a J2EE environment,
 * with only minimal differences in configuration.
 *
 * <p><b>NOTE:</b> This class requires a JMS 1.1+ provider, because it builds on
 * the domain-independent API. <b>Use the {@link DefaultMessageListenerContainer102}
 * subclass for a JMS 1.0.2 provider, e.g. when running on a J2EE 1.3 server.</b>
 *
 * <p>This is a simple but nevertheless powerful form of message listener container.
 * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
 * and optionally allows for dynamic adaptation at runtime (up until a maximum number).
 * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
 * of runtime complexity, in particular the minimal requirements on the JMS provider:
 * Not even the JMS ServerSessionPool facility is required. Beyond that, it is
 * fully self-recovering in case of the broker being temporarily unavailable,
 * and allows for stops/restarts as well as runtime changes to its configuration.
 *
 * <p>Actual MessageListener execution happens in asynchronous work units which are
 * created through Spring's {@link org.springframework.core.task.TaskExecutor}
 * abstraction. By default, the specified number of invoker tasks will be created
 * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
 * setting. Specify an alternative TaskExecutor to integrate with an existing
 * thread pool facility (such as a J2EE server's), for example using a
 * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
 * With a native JMS setup, each of those listener threads is going to use a
 * cached JMS Session and MessageConsumer (only refreshed in case of failure),
 * using the JMS provider's resources as efficiently as possible.
 *
 * <p>Message reception and listener execution can automatically be wrapped
 * in transactions through passing a Spring
 * {@link org.springframework.transaction.PlatformTransactionManager} into the
 * {@link #setTransactionManager "transactionManager"} property. This will usually
 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
 * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
 * from JNDI (check your J2EE server's documentation). Note that this listener
 * container will automatically reobtain all JMS handles for each transaction
 * in case of an external transaction manager specified, for compatibility with
 * all J2EE servers (in particular JBoss). This non-caching behavior can be
 * overridden through the {@link #setCacheLevel "cacheLevel"} /
 * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching
 * of the Connection (or also Session and MessageConsumer) even in case of
 * an external transaction manager being involved.
 *
 * <p>Dynamic scaling of the number of concurrent invokers can be activated
 * through specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
 * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
 * value. Since the latter's default is 1, you can also simply specify a
 * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
 * 5 concurrent consumers in case of increasing message load, as well as dynamic
 * shrinking back to the standard number of consumers once the load decreases.
 * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
 * setting to control the lifespan of each new task, to avoid frequent scaling up
 * and down, in particular if the ConnectionFactory does not pool JMS Sessions
 * and/or the TaskExecutor does not pool threads (check your configuration!).
 * Note that dynamic scaling only really makes sense for a queue in the first
 * place; for a topic, you will typically stick with the default number of 1
 * consumer, else you'd receive the same message multiple times on the same node.
 *
 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
 * javadoc for details on acknowledge modes and native transaction options,
 * as well as the {@link AbstractPollingMessageListenerContainer} javadoc
 * for details on configuring an external transaction manager.
 *
 * @author Juergen Hoeller
 * @since 2.0
 * @see #setTransactionManager
 * @see #setCacheLevel
 * @see javax.jms.MessageConsumer#receive(long)
 * @see SimpleMessageListenerContainer
 * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
 */
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {

	/**
	 * Default thread name prefix: "DefaultMessageListenerContainer-".
	 */
	public static final String DEFAULT_THREAD_NAME_PREFIX =
			ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";

	/**
	 * The default recovery interval: 5000 ms = 5 seconds.
	 */
	public static final long DEFAULT_RECOVERY_INTERVAL = 5000;


	/**
	 * Constant that indicates to cache no JMS resources at all.
	 * @see #setCacheLevel
	 */
	public static final int CACHE_NONE = 0;

	/**
	 * Constant that indicates to cache a shared JMS Connection.
	 * @see #setCacheLevel
	 */
	public static final int CACHE_CONNECTION = 1;

	/**
	 * Constant that indicates to cache a shared JMS Connection
	 * and a JMS Session for each listener thread.
	 * @see #setCacheLevel
	 */
	public static final int CACHE_SESSION = 2;

	/**
	 * Constant that indicates to cache a shared JMS Connection
	 * and a JMS Session for each listener thread, as well as
	 * a JMS MessageConsumer for each listener thread.
	 * @see #setCacheLevel
	 */
	public static final int CACHE_CONSUMER = 3;


	private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);


	private TaskExecutor taskExecutor;

	private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

	private Integer cacheLevel;

	private int concurrentConsumers = 1;

	private int maxConcurrentConsumers = 1;

	private int maxMessagesPerTask = Integer.MIN_VALUE;

	private int idleTaskExecutionLimit = 1;

	private final Set scheduledInvokers = new HashSet();

	private int activeInvokerCount = 0;

	private final Object activeInvokerMonitor = new Object();

	private Object currentRecoveryMarker = new Object();

	private final Object recoveryMonitor = new Object();


	/**
	 * Set the Spring TaskExecutor to use for running the listener threads.
	 * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
	 * starting up a number of new threads, according to the specified number
	 * of concurrent consumers.
	 * <p>Specify an alternative TaskExecutor for integration with an existing
	 * thread pool. Note that this really only adds value if the threads are
	 * managed in a specific fashion, for example within a J2EE environment.
	 * A plain thread pool does not add much value, as this listener container
	 * will occupy a number of threads for its entire lifetime.
	 * @see #setConcurrentConsumers
	 * @see org.springframework.core.task.SimpleAsyncTaskExecutor
	 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
	 */
	public void setTaskExecutor(TaskExecutor taskExecutor) {
		this.taskExecutor = taskExecutor;
	}

	/**
	 * Specify the interval between recovery attempts, in <b>milliseconds</b>.
	 * The default is 5000 ms, that is, 5 seconds.
	 * @see #handleListenerSetupFailure
	 */
	public void setRecoveryInterval(long recoveryInterval) {
		this.recoveryInterval = recoveryInterval;
	}

	/**
	 * Specify the level of caching that this listener container is allowed to apply,
	 * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
	 * @see #setCacheLevel
	 */
	public void setCacheLevelName(String constantName) throws IllegalArgumentException {
		if (constantName == null || !constantName.startsWith("CACHE_")) {
			throw new IllegalArgumentException("Only cache constants allowed");
		}
		setCacheLevel(constants.asNumber(constantName).intValue());
	}

	/**
	 * Specify the level of caching that this listener container is allowed to apply.
	 * <p>Default is CACHE_NONE if an external transaction manager has been specified
	 * (to reobtain all resources freshly within the scope of the external transaction),
	 * and CACHE_CONSUMER else (operating with local JMS resources).
	 * <p>Some J2EE servers only register their JMS resources with an ongoing XA
	 * transaction in case of a freshly obtained JMS Connection and Session,
	 * which is why this listener container does by default not cache any of those.
	 * However, if you want to optimize for a specific server, consider switching
	 * this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
	 * conjunction with an external transaction manager.
	 * <p>Currently known servers that absolutely require CACHE_NONE for XA
	 * transaction processing: JBoss 4. For any others, consider raising the
	 * cache level.
	 * @see #CACHE_NONE
	 * @see #CACHE_CONNECTION
	 * @see #CACHE_SESSION
	 * @see #CACHE_CONSUMER
	 * @see #setCacheLevelName
	 * @see #setTransactionManager
	 */
	public void setCacheLevel(int cacheLevel) {
		this.cacheLevel = new Integer(cacheLevel);
	}

	/**
	 * Return the level of caching that this listener container is allowed to apply.
	 */
	public int getCacheLevel() {
		return (this.cacheLevel != null ? this.cacheLevel.intValue() : CACHE_NONE);
	}


	/**
	 * Specify the number of concurrent consumers to create. Default is 1.
	 * <p>Specifying a higher value for this setting will increase the standard
	 * level of scheduled concurrent consumers at runtime: This is effectively
	 * the minimum number of concurrent consumers which will be scheduled
	 * at any given time. This is a static setting; for dynamic scaling,
	 * consider specifying the "maxConcurrentConsumers" setting instead.
	 * <p>Raising the number of concurrent consumers is recommendable in order
	 * to scale the consumption of messages coming in from a queue. However,
	 * note that any ordering guarantees are lost once multiple consumers are
	 * registered. In general, stick with 1 consumer for low-volume queues.
	 * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
	 * This would lead to concurrent consumption of the same message,
	 * which is hardly ever desirable.
	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
	 * @see #setMaxConcurrentConsumers
	 */
	public void setConcurrentConsumers(int concurrentConsumers) {
		Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
		synchronized (this.activeInvokerMonitor) {
			this.concurrentConsumers = concurrentConsumers;
			if (this.maxConcurrentConsumers < concurrentConsumers) {
				this.maxConcurrentConsumers = concurrentConsumers;
			}
		}
	}

	/**
	 * Return the "concurrentConsumer" setting.
	 * <p>This returns the currently configured "concurrentConsumers" value;
	 * the number of currently scheduled/active consumers might differ.
	 * @see #getScheduledConsumerCount()
	 * @see #getActiveConsumerCount()
	 */
	public final int getConcurrentConsumers() {
		synchronized (this.activeInvokerMonitor) {
			return this.concurrentConsumers;
		}
	}

	/**
	 * Specify the maximum number of concurrent consumers to create. Default is 1.
	 * <p>If this setting is higher than "concurrentConsumers", the listener container
	 * will dynamically schedule new consumers at runtime, provided that enough
	 * incoming messages are encountered. Once the load goes down again, the number of
	 * consumers will be reduced to the standard level ("concurrentConsumers") again.
	 * <p>Raising the number of concurrent consumers is recommendable in order
	 * to scale the consumption of messages coming in from a queue. However,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?