defaultmessagelistenercontainer.java
来自「有关此类编程有心德的高手 希望能够多多给予指教」· Java 代码 · 共 914 行 · 第 1/3 页
JAVA
914 行
* note that any ordering guarantees are lost once multiple consumers are
* registered. In general, stick with 1 consumer for low-volume queues.
* <p><b>Do not raise the number of concurrent consumers for a topic.</b>
* This would lead to concurrent consumption of the same message,
* which is hardly ever desirable.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
* @see #setConcurrentConsumers
*/
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
synchronized (this.activeInvokerMonitor) {
this.maxConcurrentConsumers =
(maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers);
}
}
/**
* Return the "maxConcurrentConsumer" setting.
* <p>This returns the currently configured "maxConcurrentConsumers" value;
* the number of currently scheduled/active consumers might differ.
* @see #getScheduledConsumerCount()
* @see #getActiveConsumerCount()
*/
public final int getMaxConcurrentConsumers() {
synchronized (this.activeInvokerMonitor) {
return this.maxConcurrentConsumers;
}
}
/**
* Specify the maximum number of messages to process in one task.
* More concretely, this limits the number of message reception attempts per
* task, which includes receive iterations that did not actually pick up a
* message until they hit their timeout (see "receiveTimeout" property).
* <p>Default is unlimited (-1) in case of a standard TaskExecutor,
* and 1 in case of a SchedulingTaskExecutor that indicates a preference for
* short-lived tasks. Specify a number of 10 to 100 messages to balance
* between extremely long-lived and extremely short-lived tasks here.
* <p>Long-lived tasks avoid frequent thread context switches through
* sticking with the same thread all the way through, while short-lived
* tasks allow thread pools to control the scheduling. Hence, thread
* pools will usually prefer short-lived tasks.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
* @see #setTaskExecutor
* @see #setReceiveTimeout
* @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
synchronized (this.activeInvokerMonitor) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
}
/**
* Return the maximum number of messages to process in one task.
*/
public int getMaxMessagesPerTask() {
synchronized (this.activeInvokerMonitor) {
return this.maxMessagesPerTask;
}
}
/**
* Specify the limit for idle executions of a receive task, not having
* received any message within its execution. If this limit is reached,
* the task will shut down and leave receiving to other executing tasks
* (in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
* Default is 1.
* <p>Within each task execution, a number of message reception attempts
* (according to the "maxMessagesPerTask" setting) will each wait for an incoming
* message (according to the "receiveTimeout" setting). If all of those receive
* attempts in a given task return without a message, the task is considered
* idle with respect to received messages. Such a task may still be rescheduled;
* however, once it reached the specified "idleTaskExecutionLimit", it will
* shut down (in case of dynamic scaling).
* <p>Raise this limit if you encounter too frequent scaling up and down.
* With this limit being higher, an idle consumer will be kept around longer,
* avoiding the restart of a consumer once a new load of messages comes in.
* Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value,
* which will also lead to idle consumers being kept around for a longer time
* (while also increasing the average execution time of each scheduled task).
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
* @see #setMaxMessagesPerTask
* @see #setReceiveTimeout
*/
public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
synchronized (this.activeInvokerMonitor) {
this.idleTaskExecutionLimit = idleTaskExecutionLimit;
}
}
/**
* Return the limit for idle executions of a receive task.
*/
public int getIdleTaskExecutionLimit() {
synchronized (this.activeInvokerMonitor) {
return this.idleTaskExecutionLimit;
}
}
protected void validateConfiguration() {
super.validateConfiguration();
synchronized (this.activeInvokerMonitor) {
if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
}
}
}
//-------------------------------------------------------------------------
// Implementation of AbstractMessageListenerContainer's template methods
//-------------------------------------------------------------------------
public void initialize() {
// Adapt default cache level.
if (getTransactionManager() != null) {
if (this.cacheLevel == null) {
this.cacheLevel = new Integer(CACHE_NONE);
}
}
else {
if (this.cacheLevel == null) {
this.cacheLevel = new Integer(CACHE_CONSUMER);
}
}
// Prepare taskExecutor and maxMessagesPerTask.
synchronized (this.activeInvokerMonitor) {
if (this.taskExecutor == null) {
this.taskExecutor = createDefaultTaskExecutor();
}
else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
this.maxMessagesPerTask == Integer.MIN_VALUE) {
// TaskExecutor indicated a preference for short-lived tasks. According to
// setMaxMessagesPerTask javadoc, we'll use 1 message per task in this case
// unless the user specified a custom value.
this.maxMessagesPerTask = 1;
}
}
// Proceed with actual listener initialization.
super.initialize();
}
/**
* Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
* <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
* with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
*/
protected TaskExecutor createDefaultTaskExecutor() {
String beanName = getBeanName();
String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
/**
* Use a shared JMS Connection depending on the "cacheLevel" setting.
* @see #setCacheLevel
* @see #CACHE_CONNECTION
*/
protected final boolean sharedConnectionEnabled() {
return (getCacheLevel() >= CACHE_CONNECTION);
}
/**
* Creates the specified number of concurrent consumers,
* in the form of a JMS Session plus associated MessageConsumer
* running in a separate thread.
* @see #scheduleNewInvoker
* @see #setTaskExecutor
*/
protected void doInitialize() throws JMSException {
synchronized (this.activeInvokerMonitor) {
for (int i = 0; i < this.concurrentConsumers; i++) {
scheduleNewInvoker();
}
}
}
/**
* Re-executes the given task via this listener container's TaskExecutor.
* @see #setTaskExecutor
*/
protected void doRescheduleTask(Object task) {
this.taskExecutor.execute((Runnable) task);
}
protected void messageReceived(Message message, Session session) {
scheduleNewInvokerIfAppropriate();
}
/**
* Schedule a new invoker, increasing the total number of scheduled
* invokers for this listener container, but only if the specified
* "maxConcurrentConsumers" limit has not been reached yet, and only
* if this listener container does not currently have idle invokers
* that are waiting for new messages already.
* <p>Called once a message has been received, to scale up while
* processing the message in the invoker that originally received it.
* @see #setTaskExecutor
* @see #getMaxConcurrentConsumers()
*/
protected void scheduleNewInvokerIfAppropriate() {
if (isRunning()) {
synchronized (this.activeInvokerMonitor) {
if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && !hasIdleInvokers()) {
scheduleNewInvoker();
if (logger.isDebugEnabled()) {
logger.debug("Raised scheduled invoker count: " + scheduledInvokers.size());
}
}
}
}
}
/**
* Schedule a new invoker, increasing the total number of scheduled
* invokers for this listener container.
*/
private void scheduleNewInvoker() {
AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
if (rescheduleTaskIfNecessary(invoker)) {
// This should always be true, since we're only calling this when active.
this.scheduledInvokers.add(invoker);
}
}
/**
* Determine whether this listener container currently has any
* idle instances among its scheduled invokers.
*/
private boolean hasIdleInvokers() {
for (Iterator it = this.scheduledInvokers.iterator(); it.hasNext();) {
AsyncMessageListenerInvoker invoker = (AsyncMessageListenerInvoker) it.next();
if (invoker.isIdle()) {
return true;
}
}
return false;
}
/**
* Determine whether the current invoker should be rescheduled,
* given that it might not have received a message in a while.
* @param idleTaskExecutionCount the number of idle executions
* that this invoker task has already accumulated (in a row)
*/
private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
synchronized (this.activeInvokerMonitor) {
boolean idle = (idleTaskExecutionCount >= this.idleTaskExecutionLimit);
return (this.scheduledInvokers.size() <= (idle ? this.concurrentConsumers : this.maxConcurrentConsumers));
}
}
/**
* Return the number of currently scheduled consumers.
* <p>This number will always be inbetween "concurrentConsumers" and
* "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
* (in case of some consumers being scheduled but not executed at the moment).
* @see #getConcurrentConsumers()
* @see #getMaxConcurrentConsumers()
* @see #getActiveConsumerCount()
*/
public final int getScheduledConsumerCount() {
synchronized (this.activeInvokerMonitor) {
return this.scheduledInvokers.size();
}
}
/**
* Return the number of currently active consumers.
* <p>This number will always be inbetween "concurrentConsumers" and
* "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount".
* (in case of some consumers being scheduled but not executed at the moment).
* @see #getConcurrentConsumers()
* @see #getMaxConcurrentConsumers()
* @see #getActiveConsumerCount()
*/
public final int getActiveConsumerCount() {
synchronized (this.activeInvokerMonitor) {
return this.activeInvokerCount;
}
}
/**
* Overridden to accept a failure in the initial setup - leaving it up to the
* asynchronous invokers to establish the shared Connection on first access.
* @see #refreshConnectionUntilSuccessful()
*/
protected void establishSharedConnection() {
try {
super.establishSharedConnection();
}
catch (JMSException ex) {
logger.debug("Could not establish shared JMS Connection - " +
"leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?