defaultmessagelistenercontainer.java
来自「有关此类编程有心德的高手 希望能够多多给予指教」· Java 代码 · 共 914 行 · 第 1/3 页
JAVA
914 行
/**
* This implementations proceeds even after an exception thrown from
* <code>Connection.start()</code>, relying on listeners to perform
* appropriate recovery.
*/
protected void startSharedConnection() {
try {
super.startSharedConnection();
}
catch (JMSException ex) {
logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
}
}
/**
* This implementations proceeds even after an exception thrown from
* <code>Connection.stop()</code>, relying on listeners to perform
* appropriate recovery after a restart.
*/
protected void stopSharedConnection() {
try {
super.stopSharedConnection();
}
catch (JMSException ex) {
logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
}
}
/**
* Handle the given exception that arose during setup of a listener.
* Called for every such exception in every concurrent listener.
* <p>The default implementation logs the exception at error level
* if not recovered yet, and at debug level if already recovered.
* Can be overridden in subclasses.
* @param ex the exception to handle
* @param alreadyRecovered whether a previously executing listener
* already recovered from the present listener setup failure
* (this usually indicates a follow-up failure than be ignored
* other than for debug log purposes)
* @see #recoverAfterListenerSetupFailure()
*/
protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
if (ex instanceof JMSException) {
invokeExceptionListener((JMSException) ex);
}
if (ex instanceof SharedConnectionNotInitializedException) {
if (!alreadyRecovered) {
logger.debug("JMS message listener invoker needs to establish shared Connection");
}
}
else {
if (alreadyRecovered) {
logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
}
else {
logger.error("Setup of JMS message listener invoker failed - trying to recover", ex);
}
}
}
/**
* Recover this listener container after a listener failed to set itself up,
* for example reestablishing the underlying Connection.
* <p>The default implementation delegates to DefaultMessageListenerContainer's
* recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
* try to re-establish a Connection to the JMS provider both for the shared
* and the non-shared Connection case.
* @see #refreshConnectionUntilSuccessful()
* @see #refreshDestination()
*/
protected void recoverAfterListenerSetupFailure() {
refreshConnectionUntilSuccessful();
refreshDestination();
}
/**
* Refresh the underlying Connection, not returning before an attempt has been
* successful. Called in case of a shared Connection as well as without shared
* Connection, so either needs to operate on the shared Connection or on a
* temporary Connection that just gets established for validation purposes.
* <p>The default implementation retries until it successfully established a
* Connection, for as long as this message listener container is active.
* Applies the specified recovery interval between retries.
* @see #setRecoveryInterval
*/
protected void refreshConnectionUntilSuccessful() {
while (isRunning()) {
try {
if (sharedConnectionEnabled()) {
refreshSharedConnection();
startSharedConnection();
}
else {
Connection con = createConnection();
JmsUtils.closeConnection(con);
}
logger.info("Successfully refreshed JMS Connection");
break;
}
catch (Exception ex) {
if (logger.isInfoEnabled()) {
logger.info("Could not refresh JMS Connection - retrying in " + this.recoveryInterval + " ms", ex);
}
}
sleepInbetweenRecoveryAttempts();
}
}
/**
* Refresh the JMS destination that this listener container operates on.
* <p>Called after listener setup failure, assuming that a cached Destination
* object might have become invalid (a typical case on WebLogic JMS).
* <p>The default implementation removes the destination from a
* DestinationResolver's cache, in case of a CachingDestinationResolver.
* @see #setDestinationName
* @see org.springframework.jms.support.destination.CachingDestinationResolver
*/
protected void refreshDestination() {
String destName = getDestinationName();
if (destName != null) {
DestinationResolver destResolver = getDestinationResolver();
if (destResolver instanceof CachingDestinationResolver) {
((CachingDestinationResolver) destResolver).removeFromCache(destName);
}
}
}
/**
* Sleep according to the specified recovery interval.
* Called inbetween recovery attempts.
*/
protected void sleepInbetweenRecoveryAttempts() {
if (this.recoveryInterval > 0) {
try {
Thread.sleep(this.recoveryInterval);
}
catch (InterruptedException interEx) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
}
}
/**
* Destroy the registered JMS Sessions and associated MessageConsumers.
*/
protected void doShutdown() throws JMSException {
logger.debug("Waiting for shutdown of message listener invokers");
synchronized (this.activeInvokerMonitor) {
while (this.activeInvokerCount > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers");
}
try {
this.activeInvokerMonitor.wait();
}
catch (InterruptedException interEx) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
}
}
}
//-------------------------------------------------------------------------
// Inner classes used as internal adapters
//-------------------------------------------------------------------------
/**
* Runnable that performs looped <code>MessageConsumer.receive()</code> calls.
*/
private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
private Session session;
private MessageConsumer consumer;
private Object lastRecoveryMarker;
private boolean lastMessageSucceeded;
private int idleTaskExecutionCount = 0;
private volatile boolean idle = true;
public void run() {
synchronized (activeInvokerMonitor) {
activeInvokerCount++;
activeInvokerMonitor.notifyAll();
}
boolean messageReceived = false;
try {
if (maxMessagesPerTask < 0) {
while (isActive()) {
waitWhileNotRunning();
if (isActive()) {
messageReceived = invokeListener();
}
}
}
else {
int messageCount = 0;
while (isRunning() && messageCount < maxMessagesPerTask) {
messageReceived = (invokeListener() || messageReceived);
messageCount++;
}
}
}
catch (Throwable ex) {
clearResources();
if (!this.lastMessageSucceeded) {
// We failed more than once in a row - sleep for recovery interval
// even before first recovery attempt.
sleepInbetweenRecoveryAttempts();
}
this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
synchronized (recoveryMonitor) {
if (this.lastRecoveryMarker == currentRecoveryMarker) {
handleListenerSetupFailure(ex, false);
recoverAfterListenerSetupFailure();
currentRecoveryMarker = new Object();
}
else {
alreadyRecovered = true;
}
}
if (alreadyRecovered) {
handleListenerSetupFailure(ex, true);
}
}
synchronized (activeInvokerMonitor) {
activeInvokerCount--;
activeInvokerMonitor.notifyAll();
}
if (!messageReceived) {
this.idleTaskExecutionCount++;
}
else {
this.idleTaskExecutionCount = 0;
}
if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
// We're shutting down completely.
synchronized (activeInvokerMonitor) {
scheduledInvokers.remove(this);
if (logger.isDebugEnabled()) {
logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
}
activeInvokerMonitor.notifyAll();
}
clearResources();
}
}
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this.session, this.consumer);
this.lastMessageSucceeded = true;
this.idle = !messageReceived;
return messageReceived;
}
private void initResourcesIfNecessary() throws JMSException {
if (getCacheLevel() <= CACHE_CONNECTION) {
updateRecoveryMarker();
}
else {
if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
updateRecoveryMarker();
this.session = createSession(getSharedConnection());
}
if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
this.consumer = createListenerConsumer(this.session);
}
}
}
private void updateRecoveryMarker() {
synchronized (recoveryMonitor) {
this.lastRecoveryMarker = currentRecoveryMarker;
}
}
private void clearResources() {
JmsUtils.closeMessageConsumer(this.consumer);
JmsUtils.closeSession(this.session);
this.consumer = null;
this.session = null;
}
public boolean isLongLived() {
return (maxMessagesPerTask < 0);
}
public boolean isIdle() {
return this.idle;
}
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?