defaultmessagelistenercontainer.java

来自「有关此类编程有心德的高手 希望能够多多给予指教」· Java 代码 · 共 914 行 · 第 1/3 页

JAVA
914
字号
	/**
	 * This implementations proceeds even after an exception thrown from
	 * <code>Connection.start()</code>, relying on listeners to perform
	 * appropriate recovery.
	 */
	protected void startSharedConnection() {
		try {
			super.startSharedConnection();
		}
		catch (JMSException ex) {
			logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
		}
	}

	/**
	 * This implementations proceeds even after an exception thrown from
	 * <code>Connection.stop()</code>, relying on listeners to perform
	 * appropriate recovery after a restart.
	 */
	protected void stopSharedConnection() {
		try {
			super.stopSharedConnection();
		}
		catch (JMSException ex) {
			logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
		}
	}

	/**
	 * Handle the given exception that arose during setup of a listener.
	 * Called for every such exception in every concurrent listener.
	 * <p>The default implementation logs the exception at error level
	 * if not recovered yet, and at debug level if already recovered.
	 * Can be overridden in subclasses.
	 * @param ex the exception to handle
	 * @param alreadyRecovered whether a previously executing listener
	 * already recovered from the present listener setup failure
	 * (this usually indicates a follow-up failure than be ignored
	 * other than for debug log purposes)
	 * @see #recoverAfterListenerSetupFailure()
	 */
	protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
		if (ex instanceof JMSException) {
			invokeExceptionListener((JMSException) ex);
		}
		if (ex instanceof SharedConnectionNotInitializedException) {
			if (!alreadyRecovered) {
				logger.debug("JMS message listener invoker needs to establish shared Connection");
			}
		}
		else {
			if (alreadyRecovered) {
				logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
			}
			else {
				logger.error("Setup of JMS message listener invoker failed - trying to recover", ex);
			}
		}
	}

	/**
	 * Recover this listener container after a listener failed to set itself up,
	 * for example reestablishing the underlying Connection.
	 * <p>The default implementation delegates to DefaultMessageListenerContainer's
	 * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
	 * try to re-establish a Connection to the JMS provider both for the shared
	 * and the non-shared Connection case.
	 * @see #refreshConnectionUntilSuccessful()
	 * @see #refreshDestination()
	 */
	protected void recoverAfterListenerSetupFailure() {
		refreshConnectionUntilSuccessful();
		refreshDestination();
	}

	/**
	 * Refresh the underlying Connection, not returning before an attempt has been
	 * successful. Called in case of a shared Connection as well as without shared
	 * Connection, so either needs to operate on the shared Connection or on a
	 * temporary Connection that just gets established for validation purposes.
	 * <p>The default implementation retries until it successfully established a
	 * Connection, for as long as this message listener container is active.
	 * Applies the specified recovery interval between retries.
	 * @see #setRecoveryInterval
	 */
	protected void refreshConnectionUntilSuccessful() {
		while (isRunning()) {
			try {
				if (sharedConnectionEnabled()) {
					refreshSharedConnection();
					startSharedConnection();
				}
				else {
					Connection con = createConnection();
					JmsUtils.closeConnection(con);
				}
				logger.info("Successfully refreshed JMS Connection");
				break;
			}
			catch (Exception ex) {
				if (logger.isInfoEnabled()) {
					logger.info("Could not refresh JMS Connection - retrying in " + this.recoveryInterval + " ms", ex);
				}
			}
			sleepInbetweenRecoveryAttempts();
		}
	}

	/**
	 * Refresh the JMS destination that this listener container operates on.
	 * <p>Called after listener setup failure, assuming that a cached Destination
	 * object might have become invalid (a typical case on WebLogic JMS).
	 * <p>The default implementation removes the destination from a
	 * DestinationResolver's cache, in case of a CachingDestinationResolver.
	 * @see #setDestinationName
	 * @see org.springframework.jms.support.destination.CachingDestinationResolver
	 */
	protected void refreshDestination() {
		String destName = getDestinationName();
		if (destName != null) {
			DestinationResolver destResolver = getDestinationResolver();
			if (destResolver instanceof CachingDestinationResolver) {
				((CachingDestinationResolver) destResolver).removeFromCache(destName);
			}
		}
	}

	/**
	 * Sleep according to the specified recovery interval.
	 * Called inbetween recovery attempts.
	 */
	protected void sleepInbetweenRecoveryAttempts() {
		if (this.recoveryInterval > 0) {
			try {
				Thread.sleep(this.recoveryInterval);
			}
			catch (InterruptedException interEx) {
				// Re-interrupt current thread, to allow other threads to react.
				Thread.currentThread().interrupt();
			}
		}
	}


	/**
	 * Destroy the registered JMS Sessions and associated MessageConsumers.
	 */
	protected void doShutdown() throws JMSException {
		logger.debug("Waiting for shutdown of message listener invokers");
		synchronized (this.activeInvokerMonitor) {
			while (this.activeInvokerCount > 0) {
				if (logger.isDebugEnabled()) {
					logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
							" message listener invokers");
				}
				try {
					this.activeInvokerMonitor.wait();
				}
				catch (InterruptedException interEx) {
					// Re-interrupt current thread, to allow other threads to react.
					Thread.currentThread().interrupt();
				}
			}
		}
	}


	//-------------------------------------------------------------------------
	// Inner classes used as internal adapters
	//-------------------------------------------------------------------------

	/**
	 * Runnable that performs looped <code>MessageConsumer.receive()</code> calls.
	 */
	private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {

		private Session session;

		private MessageConsumer consumer;

		private Object lastRecoveryMarker;

		private boolean lastMessageSucceeded;

		private int idleTaskExecutionCount = 0;

		private volatile boolean idle = true;

		public void run() {
			synchronized (activeInvokerMonitor) {
				activeInvokerCount++;
				activeInvokerMonitor.notifyAll();
			}
			boolean messageReceived = false;
			try {
				if (maxMessagesPerTask < 0) {
					while (isActive()) {
						waitWhileNotRunning();
						if (isActive()) {
							messageReceived = invokeListener();
						}
					}
				}
				else {
					int messageCount = 0;
					while (isRunning() && messageCount < maxMessagesPerTask) {
						messageReceived = (invokeListener() || messageReceived);
						messageCount++;
					}
				}
			}
			catch (Throwable ex) {
				clearResources();
				if (!this.lastMessageSucceeded) {
					// We failed more than once in a row - sleep for recovery interval
					// even before first recovery attempt.
					sleepInbetweenRecoveryAttempts();
				}
				this.lastMessageSucceeded = false;
				boolean alreadyRecovered = false;
				synchronized (recoveryMonitor) {
					if (this.lastRecoveryMarker == currentRecoveryMarker) {
						handleListenerSetupFailure(ex, false);
						recoverAfterListenerSetupFailure();
						currentRecoveryMarker = new Object();
					}
					else {
						alreadyRecovered = true;
					}
				}
				if (alreadyRecovered) {
					handleListenerSetupFailure(ex, true);
				}
			}
			synchronized (activeInvokerMonitor) {
				activeInvokerCount--;
				activeInvokerMonitor.notifyAll();
			}
			if (!messageReceived) {
				this.idleTaskExecutionCount++;
			}
			else {
				this.idleTaskExecutionCount = 0;
			}
			if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
				// We're shutting down completely.
				synchronized (activeInvokerMonitor) {
					scheduledInvokers.remove(this);
					if (logger.isDebugEnabled()) {
						logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
					}
					activeInvokerMonitor.notifyAll();
				}
				clearResources();
			}
		}

		private boolean invokeListener() throws JMSException {
			initResourcesIfNecessary();
			boolean messageReceived = receiveAndExecute(this.session, this.consumer);
			this.lastMessageSucceeded = true;
			this.idle = !messageReceived;
			return messageReceived;
		}

		private void initResourcesIfNecessary() throws JMSException {
			if (getCacheLevel() <= CACHE_CONNECTION) {
				updateRecoveryMarker();
			}
			else {
				if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
					updateRecoveryMarker();
					this.session = createSession(getSharedConnection());
				}
				if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
					this.consumer = createListenerConsumer(this.session);
				}
			}
		}

		private void updateRecoveryMarker() {
			synchronized (recoveryMonitor) {
				this.lastRecoveryMarker = currentRecoveryMarker;
			}
		}

		private void clearResources() {
			JmsUtils.closeMessageConsumer(this.consumer);
			JmsUtils.closeSession(this.session);
			this.consumer = null;
			this.session = null;
		}

		public boolean isLongLived() {
			return (maxMessagesPerTask < 0);
		}

		public boolean isIdle() {
			return this.idle;
		}
	}

}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?