📄 abstractpollingmessagelistenercontainer.java
字号:
/*
* Copyright 2002-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.jms.listener;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.connection.JmsResourceHolder;
import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.ResourceTransactionManager;
/**
* Base class for listener container implementations which are based on polling.
* Provides support for listener handling based on {@link javax.jms.MessageConsumer},
* optionally participating in externally managed transactions.
*
* <p>This listener container variant is built for repeated polling attempts,
* each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
* may be reobtained fo reach attempt or cached inbetween attempts; this is up
* to the concrete implementation. The receive timeout for each attempt can be
* configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
*
* <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
* which is perfectly compatible with both native JMS and JMS in a J2EE environment.
* Neither the JMS <code>MessageConsumer.setMessageListener</code> facility
* nor the JMS ServerSessionPool facility is required. A further advantage
* of this approach is full control over the listening process, allowing for
* custom scaling and throttling and of concurrent message processing
* (which is up to concrete subclasses).
*
* <p>Message reception and listener execution can automatically be wrapped
* in transactions through passing a Spring
* {@link org.springframework.transaction.PlatformTransactionManager} into the
* {@link #setTransactionManager "transactionManager"} property. This will usually
* be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
* J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
* from JNDI (check your J2EE server's documentation).
*
* <p>This base class does not assume any specific mechanism for asynchronous
* execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
* for a concrete implementation which is based on Spring's
* {@link org.springframework.core.task.TaskExecutor} abstraction,
* including dynamic scaling of concurrent consumers and automatic self recovery.
*
* @author Juergen Hoeller
* @since 2.0.3
* @see #createListenerConsumer(javax.jms.Session)
* @see #receiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer)
* @see #setTransactionManager
*/
public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer
implements BeanNameAware {
/**
* The default receive timeout: 1000 ms = 1 second.
*/
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
private final MessageListenerContainerResourceFactory transactionalResourceFactory =
new MessageListenerContainerResourceFactory();
private boolean sessionTransactedCalled = false;
private boolean pubSubNoLocal = false;
private PlatformTransactionManager transactionManager;
private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
public void setSessionTransacted(boolean sessionTransacted) {
super.setSessionTransacted(sessionTransacted);
this.sessionTransactedCalled = true;
}
/**
* Set whether to inhibit the delivery of messages published by its own connection.
* Default is "false".
* @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
*/
public void setPubSubNoLocal(boolean pubSubNoLocal) {
this.pubSubNoLocal = pubSubNoLocal;
}
/**
* Return whether to inhibit the delivery of messages published by its own connection.
*/
protected boolean isPubSubNoLocal() {
return this.pubSubNoLocal;
}
/**
* Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
* to use for transactional wrapping of message reception plus listener execution.
* <p>Default is none, not performing any transactional wrapping.
* If specified, this will usually be a Spring
* {@link org.springframework.transaction.jta.JtaTransactionManager} or one
* of its subclasses, in combination with a JTA-aware ConnectionFactory that
* this message listener container obtains its Connections from.
* <p><b>Note: Consider the use of local JMS transactions instead.</b>
* Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
* to "true" in order to use a locally transacted JMS Session for the entire
* receive processing, including any Session operations performed by a
* {@link SessionAwareMessageListener} (e.g. sending a response message).
* Alternatively, a {@link org.springframework.jms.connection.JmsTransactionManager}
* may be used for fully synchronized Spring transactions based on local JMS
* transactions. Check {@link AbstractMessageListenerContainer}'s javadoc for
* a discussion of transaction choices and message redelivery scenarios.
* @see org.springframework.transaction.jta.JtaTransactionManager
* @see org.springframework.jms.connection.JmsTransactionManager
*/
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Return the Spring PlatformTransactionManager to use for transactional
* wrapping of message reception plus listener execution.
*/
protected final PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}
/**
* Specify the transaction name to use for transactional wrapping.
* Default is the bean name of this listener container, if any.
* @see org.springframework.transaction.TransactionDefinition#getName()
*/
public void setTransactionName(String transactionName) {
this.transactionDefinition.setName(transactionName);
}
/**
* Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
* Default is none, using the transaction manager's default timeout.
* @see org.springframework.transaction.TransactionDefinition#getTimeout()
* @see #setReceiveTimeout
*/
public void setTransactionTimeout(int transactionTimeout) {
this.transactionDefinition.setTimeout(transactionTimeout);
}
/**
* Set the timeout to use for receive calls, in <b>milliseconds</b>.
* The default is 1000 ms, that is, 1 second.
* <p><b>NOTE:</b> This value needs to be smaller than the transaction
* timeout used by the transaction manager (in the appropriate unit,
* of course). -1 indicates no timeout at all; however, this is only
* feasible if not running within a transaction manager.
* @see javax.jms.MessageConsumer#receive(long)
* @see javax.jms.MessageConsumer#receive()
* @see #setTransactionTimeout
*/
public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public void initialize() {
// Set sessionTransacted=true in case of a non-JTA transaction manager.
if (!this.sessionTransactedCalled && this.transactionManager instanceof ResourceTransactionManager &&
((ResourceTransactionManager) this.transactionManager).getResourceFactory() != getConnectionFactory()) {
super.setSessionTransacted(true);
}
// Use bean name as default transaction name.
if (this.transactionDefinition.getName() == null) {
this.transactionDefinition.setName(getBeanName());
}
// Proceed with superclass initialization.
super.initialize();
}
/**
* Create a MessageConsumer for the given JMS Session,
* registering a MessageListener for the specified listener.
* @param session the JMS Session to work on
* @return the MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS methods
* @see #receiveAndExecute
*/
protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
Destination destination = getDestination();
if (destination == null) {
destination = resolveDestinationName(session, getDestinationName());
}
return createConsumer(session, destination);
}
/**
* Execute the listener for a message received from the given consumer,
* wrapping the entire operation in an external transaction if demanded.
* @param session the JMS Session to work on
* @param consumer the MessageConsumer to work on
* @return whether a message has been received
* @throws JMSException if thrown by JMS methods
* @see #doReceiveAndExecute
*/
protected boolean receiveAndExecute(Session session, MessageConsumer consumer) throws JMSException {
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
boolean messageReceived = true;
try {
messageReceived = doReceiveAndExecute(session, consumer, status);
}
catch (JMSException ex) {
rollbackOnException(status, ex);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -