defaultmessagelistenercontainer.java

来自「有关此类编程有心德的高手 希望能够多多给予指教」· Java 代码 · 共 914 行 · 第 1/3 页

JAVA
914
字号
	 * note that any ordering guarantees are lost once multiple consumers are
	 * registered. In general, stick with 1 consumer for low-volume queues.
	 * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
	 * This would lead to concurrent consumption of the same message,
	 * which is hardly ever desirable.
	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
	 * @see #setConcurrentConsumers
	 */
	public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
		Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
		synchronized (this.activeInvokerMonitor) {
			this.maxConcurrentConsumers =
					(maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers);
		}
	}

	/**
	 * Return the "maxConcurrentConsumer" setting.
	 * <p>This returns the currently configured "maxConcurrentConsumers" value;
	 * the number of currently scheduled/active consumers might differ.
	 * @see #getScheduledConsumerCount()
	 * @see #getActiveConsumerCount()
	 */
	public final int getMaxConcurrentConsumers() {
		synchronized (this.activeInvokerMonitor) {
			return this.maxConcurrentConsumers;
		}
	}

	/**
	 * Specify the maximum number of messages to process in one task.
	 * More concretely, this limits the number of message reception attempts per
	 * task, which includes receive iterations that did not actually pick up a
	 * message until they hit their timeout (see "receiveTimeout" property).
	 * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
	 * and 1 in case of a SchedulingTaskExecutor that indicates a preference for
	 * short-lived tasks. Specify a number of 10 to 100 messages to balance
	 * between extremely long-lived and extremely short-lived tasks here.
	 * <p>Long-lived tasks avoid frequent thread context switches through
	 * sticking with the same thread all the way through, while short-lived
	 * tasks allow thread pools to control the scheduling. Hence, thread
	 * pools will usually prefer short-lived tasks.
	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
	 * @see #setTaskExecutor
	 * @see #setReceiveTimeout
	 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
	 */
	public void setMaxMessagesPerTask(int maxMessagesPerTask) {
		Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
		synchronized (this.activeInvokerMonitor) {
			this.maxMessagesPerTask = maxMessagesPerTask;
		}
	}

	/**
	 * Return the maximum number of messages to process in one task.
	 */
	public int getMaxMessagesPerTask() {
		synchronized (this.activeInvokerMonitor) {
			return this.maxMessagesPerTask;
		}
	}

	/**
	 * Specify the limit for idle executions of a receive task, not having
	 * received any message within its execution. If this limit is reached,
	 * the task will shut down and leave receiving to other executing tasks
	 * (in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
	 * Default is 1.
	 * <p>Within each task execution, a number of message reception attempts
	 * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
	 * message (according to the "receiveTimeout" setting). If all of those receive
	 * attempts in a given task return without a message, the task is considered
	 * idle with respect to received messages. Such a task may still be rescheduled;
	 * however, once it reached the specified "idleTaskExecutionLimit", it will
	 * shut down (in case of dynamic scaling).
	 * <p>Raise this limit if you encounter too frequent scaling up and down.
	 * With this limit being higher, an idle consumer will be kept around longer,
	 * avoiding the restart of a consumer once a new load of messages comes in.
	 * Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value,
	 * which will also lead to idle consumers being kept around for a longer time
	 * (while also increasing the average execution time of each scheduled task).
	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
	 * @see #setMaxMessagesPerTask
	 * @see #setReceiveTimeout
	 */
	public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
		Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
		synchronized (this.activeInvokerMonitor) {
			this.idleTaskExecutionLimit = idleTaskExecutionLimit;
		}
	}

	/**
	 * Return the limit for idle executions of a receive task.
	 */
	public int getIdleTaskExecutionLimit() {
		synchronized (this.activeInvokerMonitor) {
			return this.idleTaskExecutionLimit;
		}
	}

	protected void validateConfiguration() {
		super.validateConfiguration();
		synchronized (this.activeInvokerMonitor) {
			if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
				throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
			}
		}
	}


	//-------------------------------------------------------------------------
	// Implementation of AbstractMessageListenerContainer's template methods
	//-------------------------------------------------------------------------

	public void initialize() {
		// Adapt default cache level.
		if (getTransactionManager() != null) {
			if (this.cacheLevel == null) {
				this.cacheLevel = new Integer(CACHE_NONE);
			}
		}
		else {
			if (this.cacheLevel == null) {
				this.cacheLevel = new Integer(CACHE_CONSUMER);
			}
		}

		// Prepare taskExecutor and maxMessagesPerTask.
		synchronized (this.activeInvokerMonitor) {
			if (this.taskExecutor == null) {
				this.taskExecutor = createDefaultTaskExecutor();
			}
			else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
					((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
					this.maxMessagesPerTask == Integer.MIN_VALUE) {
				// TaskExecutor indicated a preference for short-lived tasks. According to
				// setMaxMessagesPerTask javadoc, we'll use 1 message per task in this case
				// unless the user specified a custom value.
				this.maxMessagesPerTask = 1;
			}
		}

		// Proceed with actual listener initialization.
		super.initialize();
	}

	/**
	 * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
	 * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
	 * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
	 * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
	 */
	protected TaskExecutor createDefaultTaskExecutor() {
		String beanName = getBeanName();
		String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
		return new SimpleAsyncTaskExecutor(threadNamePrefix);
	}

	/**
	 * Use a shared JMS Connection depending on the "cacheLevel" setting.
	 * @see #setCacheLevel
	 * @see #CACHE_CONNECTION
	 */
	protected final boolean sharedConnectionEnabled() {
		return (getCacheLevel() >= CACHE_CONNECTION);
	}

	/**
	 * Creates the specified number of concurrent consumers,
	 * in the form of a JMS Session plus associated MessageConsumer
	 * running in a separate thread.
	 * @see #scheduleNewInvoker
	 * @see #setTaskExecutor
	 */
	protected void doInitialize() throws JMSException {
		synchronized (this.activeInvokerMonitor) {
			for (int i = 0; i < this.concurrentConsumers; i++) {
				scheduleNewInvoker();
			}
		}
	}

	/**
	 * Re-executes the given task via this listener container's TaskExecutor.
	 * @see #setTaskExecutor
	 */
	protected void doRescheduleTask(Object task) {
		this.taskExecutor.execute((Runnable) task);
	}

	protected void messageReceived(Message message, Session session) {
		scheduleNewInvokerIfAppropriate();
	}

	/**
	 * Schedule a new invoker, increasing the total number of scheduled
	 * invokers for this listener container, but only if the specified
	 * "maxConcurrentConsumers" limit has not been reached yet, and only
	 * if this listener container does not currently have idle invokers
	 * that are waiting for new messages already.
	 * <p>Called once a message has been received, to scale up while
	 * processing the message in the invoker that originally received it.
	 * @see #setTaskExecutor
	 * @see #getMaxConcurrentConsumers()
	 */
	protected void scheduleNewInvokerIfAppropriate() {
		if (isRunning()) {
			synchronized (this.activeInvokerMonitor) {
				if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && !hasIdleInvokers()) {
					scheduleNewInvoker();
					if (logger.isDebugEnabled()) {
						logger.debug("Raised scheduled invoker count: " + scheduledInvokers.size());
					}
				}
			}
		}
	}

	/**
	 * Schedule a new invoker, increasing the total number of scheduled
	 * invokers for this listener container.
	 */
	private void scheduleNewInvoker() {
		AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
		if (rescheduleTaskIfNecessary(invoker)) {
			// This should always be true, since we're only calling this when active.
			this.scheduledInvokers.add(invoker);
		}
	}

	/**
	 * Determine whether this listener container currently has any
	 * idle instances among its scheduled invokers.
	 */
	private boolean hasIdleInvokers() {
		for (Iterator it = this.scheduledInvokers.iterator(); it.hasNext();) {
			AsyncMessageListenerInvoker invoker = (AsyncMessageListenerInvoker) it.next();
			if (invoker.isIdle()) {
				return true;
			}
		}
		return false;
	}

	/**
	 * Determine whether the current invoker should be rescheduled,
	 * given that it might not have received a message in a while.
	 * @param idleTaskExecutionCount the number of idle executions
	 * that this invoker task has already accumulated (in a row)
	 */
	private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
		synchronized (this.activeInvokerMonitor) {
			boolean idle = (idleTaskExecutionCount >= this.idleTaskExecutionLimit);
			return (this.scheduledInvokers.size() <= (idle ? this.concurrentConsumers : this.maxConcurrentConsumers));
		}
	}

	/**
	 * Return the number of currently scheduled consumers.
	 * <p>This number will always be inbetween "concurrentConsumers" and
	 * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
	 * (in case of some consumers being scheduled but not executed at the moment).
	 * @see #getConcurrentConsumers()
	 * @see #getMaxConcurrentConsumers()
	 * @see #getActiveConsumerCount()
	 */
	public final int getScheduledConsumerCount() {
		synchronized (this.activeInvokerMonitor) {
			return this.scheduledInvokers.size();
		}
	}

	/**
	 * Return the number of currently active consumers.
	 * <p>This number will always be inbetween "concurrentConsumers" and
	 * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount".
	 * (in case of some consumers being scheduled but not executed at the moment).
	 * @see #getConcurrentConsumers()
	 * @see #getMaxConcurrentConsumers()
	 * @see #getActiveConsumerCount()
	 */
	public final int getActiveConsumerCount() {
		synchronized (this.activeInvokerMonitor) {
			return this.activeInvokerCount;
		}
	}


	/**
	 * Overridden to accept a failure in the initial setup - leaving it up to the
	 * asynchronous invokers to establish the shared Connection on first access.
	 * @see #refreshConnectionUntilSuccessful()
	 */
	protected void establishSharedConnection() {
		try {
			super.establishSharedConnection();
		}
		catch (JMSException ex) {
			logger.debug("Could not establish shared JMS Connection - " +
					"leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
		}
	}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?