📄 defaultmessagelistenercontainer.java
字号:
else {
if (alreadyRecovered) {
logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
}
else {
logger.error("Setup of JMS message listener invoker failed - trying to recover", ex);
}
}
}
/**
* Recover this listener container after a listener failed to set itself up,
* for example reestablishing the underlying Connection.
* <p>The default implementation delegates to DefaultMessageListenerContainer's
* recovery-capable <code>refreshConnectionUntilSuccessful</code> method, which will try
* to reestablish a Connection to the JMS provider both for the shared
* and the non-shared Connection case.
* @see #refreshConnectionUntilSuccessful()
* @see #refreshDestination()
*/
protected void recoverAfterListenerSetupFailure() {
refreshConnectionUntilSuccessful();
refreshDestination();
}
/**
* Refresh the underlying Connection, not returning before an attempt has been
* successful. Called in case of a shared Connection as well as without shared
* Connection, so either needs to operate on the shared Connection or on a
* temporary Connection that just gets established for validation purposes.
* <p>The default implementation retries until it successfully established a
* Connection, for as long as this message listener container is active.
* Applies the specified recovery interval between retries.
* @see #setRecoveryInterval
*/
protected void refreshConnectionUntilSuccessful() {
while (isActive()) {
try {
if (sharedConnectionEnabled()) {
refreshSharedConnection();
if (isRunning()) {
startSharedConnection();
}
}
else {
Connection con = createConnection();
JmsUtils.closeConnection(con);
}
logger.info("Successfully refreshed JMS Connection");
break;
}
catch (JMSException ex) {
if (logger.isInfoEnabled()) {
logger.info("Could not refresh JMS Connection - retrying in " + this.recoveryInterval + " ms", ex);
}
}
sleepInbetweenRecoveryAttempts();
}
}
/**
* Refresh the JMS destination that this listener container operates on.
* <p>Called after listener setup failure, assuming that a cached Destination
* object might have become invalid (a typical case on WebLogic JMS).
* <p>The default implementation removes the destination from a
* DestinationResolver's cache, in case of a CachingDestinationResolver.
* @see #setDestinationName
* @see org.springframework.jms.support.destination.CachingDestinationResolver
*/
protected void refreshDestination() {
String destName = getDestinationName();
if (destName != null) {
DestinationResolver destResolver = getDestinationResolver();
if (destResolver instanceof CachingDestinationResolver) {
((CachingDestinationResolver) destResolver).removeFromCache(destName);
}
}
}
protected void sleepInbetweenRecoveryAttempts() {
if (this.recoveryInterval > 0) {
try {
Thread.sleep(this.recoveryInterval);
}
catch (InterruptedException interEx) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
}
}
/**
* Destroy the registered JMS Sessions and associated MessageConsumers.
*/
protected void destroyListener() throws JMSException {
logger.debug("Waiting for shutdown of message listener invokers");
synchronized (this.activeInvokerMonitor) {
while (this.activeInvokerCount > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers");
}
try {
this.activeInvokerMonitor.wait();
}
catch (InterruptedException interEx) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
}
}
}
//-------------------------------------------------------------------------
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
//-------------------------------------------------------------------------
/**
* Fetch an appropriate Connection from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Connection.
* @param holder the JmsResourceHolder
* @return an appropriate Connection fetched from the holder,
* or <code>null</code> if none found
*/
protected Connection getConnection(JmsResourceHolder holder) {
return holder.getConnection();
}
/**
* Fetch an appropriate Session from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Session.
* @param holder the JmsResourceHolder
* @return an appropriate Session fetched from the holder,
* or <code>null</code> if none found
*/
protected Session getSession(JmsResourceHolder holder) {
return holder.getSession();
}
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
// Only pass in the NoLocal flag in case of a Topic:
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
if (destination instanceof Topic) {
if (isSubscriptionDurable()) {
return session.createDurableSubscriber(
(Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
//-------------------------------------------------------------------------
// Inner classes used as internal adapters
//-------------------------------------------------------------------------
/**
* Runnable that performs looped <code>MessageConsumer.receive()</code> calls.
*/
private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
private Session session;
private MessageConsumer consumer;
private Object lastRecoveryMarker;
private boolean lastMessageSucceeded;
public void run() {
synchronized (activeInvokerMonitor) {
activeInvokerCount++;
activeInvokerMonitor.notifyAll();
}
try {
if (maxMessagesPerTask < 0) {
while (isActive()) {
waitWhileNotRunning();
if (isActive()) {
invokeListener();
}
}
}
else {
int messageCount = 0;
while (isRunning() && messageCount < maxMessagesPerTask) {
invokeListener();
messageCount++;
}
}
}
catch (Throwable ex) {
clearResources();
if (!this.lastMessageSucceeded) {
// We failed more than once in a row - sleep for recovery interval
// even before first recovery attempt.
sleepInbetweenRecoveryAttempts();
}
this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
synchronized (recoveryMonitor) {
if (this.lastRecoveryMarker == currentRecoveryMarker) {
handleListenerSetupFailure(ex, false);
recoverAfterListenerSetupFailure();
currentRecoveryMarker = new Object();
}
else {
alreadyRecovered = true;
}
}
if (alreadyRecovered) {
handleListenerSetupFailure(ex, true);
}
}
synchronized (activeInvokerMonitor) {
activeInvokerCount--;
activeInvokerMonitor.notifyAll();
}
if (!rescheduleTaskIfNecessary(this)) {
// We're shutting down completely.
clearResources();
}
}
private void invokeListener() throws JMSException {
initResourcesIfNecessary();
receiveAndExecute(this.session, this.consumer);
this.lastMessageSucceeded = true;
}
private void initResourcesIfNecessary() throws JMSException {
if (getCacheLevel() <= CACHE_CONNECTION) {
updateRecoveryMarker();
}
else {
if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
updateRecoveryMarker();
this.session = createSession(getSharedConnection());
}
if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
this.consumer = createListenerConsumer(this.session);
}
}
}
private void updateRecoveryMarker() {
synchronized (recoveryMonitor) {
this.lastRecoveryMarker = currentRecoveryMarker;
}
}
private void clearResources() {
JmsUtils.closeMessageConsumer(this.consumer);
JmsUtils.closeSession(this.session);
this.consumer = null;
this.session = null;
}
public boolean isLongLived() {
return (maxMessagesPerTask < 0);
}
}
/**
* ResourceFactory implementation that delegates to this listener container's protected callback methods.
*/
private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
public Connection getConnection(JmsResourceHolder holder) {
return DefaultMessageListenerContainer.this.getConnection(holder);
}
public Session getSession(JmsResourceHolder holder) {
return DefaultMessageListenerContainer.this.getSession(holder);
}
public Connection createConnection() throws JMSException {
if (DefaultMessageListenerContainer.this.sharedConnectionEnabled()) {
return DefaultMessageListenerContainer.this.getSharedConnection();
}
else {
return DefaultMessageListenerContainer.this.createConnection();
}
}
public Session createSession(Connection con) throws JMSException {
return DefaultMessageListenerContainer.this.createSession(con);
}
public boolean isSynchedLocalTransactionAllowed() {
return DefaultMessageListenerContainer.this.isSessionTransacted();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -