⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingmessagelistenercontainer.java

📁 Spring API核心源代码 Spring API核心源代码 Spring API核心源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Copyright 2002-2007 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.jms.listener;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.connection.JmsResourceHolder;
import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.ResourceTransactionManager;

/**
 * Base class for listener container implementations which are based on polling.
 * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
 * optionally participating in externally managed transactions.
 *
 * <p>This listener container variant is built for repeated polling attempts,
 * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
 * may be reobtained fo reach attempt or cached inbetween attempts; this is up
 * to the concrete implementation. The receive timeout for each attempt can be
 * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
 *
 * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
 * which is perfectly compatible with both native JMS and JMS in a J2EE environment.
 * Neither the JMS <code>MessageConsumer.setMessageListener</code> facility
 * nor the JMS ServerSessionPool facility is required. A further advantage
 * of this approach is full control over the listening process, allowing for
 * custom scaling and throttling and of concurrent message processing
 * (which is up to concrete subclasses).
 *
 * <p>Message reception and listener execution can automatically be wrapped
 * in transactions through passing a Spring
 * {@link org.springframework.transaction.PlatformTransactionManager} into the
 * {@link #setTransactionManager "transactionManager"} property. This will usually
 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
 * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
 * from JNDI (check your J2EE server's documentation).
 *
 * <p>This base class does not assume any specific mechanism for asynchronous
 * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
 * for a concrete implementation which is based on Spring's
 * {@link org.springframework.core.task.TaskExecutor} abstraction,
 * including dynamic scaling of concurrent consumers and automatic self recovery.
 *
 * @author Juergen Hoeller
 * @since 2.0.3
 * @see #createListenerConsumer(javax.jms.Session)
 * @see #receiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer)
 * @see #setTransactionManager
 */
public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer
		implements BeanNameAware {

	/**
	 * The default receive timeout: 1000 ms = 1 second.
	 */
	public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;


	private final MessageListenerContainerResourceFactory transactionalResourceFactory =
			new MessageListenerContainerResourceFactory();

	private boolean sessionTransactedCalled = false;

	private boolean pubSubNoLocal = false;

	private PlatformTransactionManager transactionManager;

	private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();

	private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;


	public void setSessionTransacted(boolean sessionTransacted) {
		super.setSessionTransacted(sessionTransacted);
		this.sessionTransactedCalled = true;
	}

	/**
	 * Set whether to inhibit the delivery of messages published by its own connection.
	 * Default is "false".
	 * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
	 */
	public void setPubSubNoLocal(boolean pubSubNoLocal) {
		this.pubSubNoLocal = pubSubNoLocal;
	}

	/**
	 * Return whether to inhibit the delivery of messages published by its own connection.
	 */
	protected boolean isPubSubNoLocal() {
		return this.pubSubNoLocal;
	}

	/**
	 * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
	 * to use for transactional wrapping of message reception plus listener execution.
	 * <p>Default is none, not performing any transactional wrapping.
	 * If specified, this will usually be a Spring
	 * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
	 * of its subclasses, in combination with a JTA-aware ConnectionFactory that
	 * this message listener container obtains its Connections from.
	 * <p><b>Note: Consider the use of local JMS transactions instead.</b>
	 * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
	 * to "true" in order to use a locally transacted JMS Session for the entire
	 * receive processing, including any Session operations performed by a
	 * {@link SessionAwareMessageListener} (e.g. sending a response message).
	 * Alternatively, a {@link org.springframework.jms.connection.JmsTransactionManager}
	 * may be used for fully synchronized Spring transactions based on local JMS
	 * transactions. Check {@link AbstractMessageListenerContainer}'s javadoc for
	 * a discussion of transaction choices and message redelivery scenarios.
	 * @see org.springframework.transaction.jta.JtaTransactionManager
	 * @see org.springframework.jms.connection.JmsTransactionManager
	 */
	public void setTransactionManager(PlatformTransactionManager transactionManager) {
		this.transactionManager = transactionManager;
	}

	/**
	 * Return the Spring PlatformTransactionManager to use for transactional
	 * wrapping of message reception plus listener execution.
	 */
	protected final PlatformTransactionManager getTransactionManager() {
		return this.transactionManager;
	}

	/**
	 * Specify the transaction name to use for transactional wrapping.
	 * Default is the bean name of this listener container, if any.
	 * @see org.springframework.transaction.TransactionDefinition#getName()
	 */
	public void setTransactionName(String transactionName) {
		this.transactionDefinition.setName(transactionName);
	}

	/**
	 * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
	 * Default is none, using the transaction manager's default timeout.
	 * @see org.springframework.transaction.TransactionDefinition#getTimeout()
	 * @see #setReceiveTimeout
	 */
	public void setTransactionTimeout(int transactionTimeout) {
		this.transactionDefinition.setTimeout(transactionTimeout);
	}

	/**
	 * Set the timeout to use for receive calls, in <b>milliseconds</b>.
	 * The default is 1000 ms, that is, 1 second.
	 * <p><b>NOTE:</b> This value needs to be smaller than the transaction
	 * timeout used by the transaction manager (in the appropriate unit,
	 * of course). -1 indicates no timeout at all; however, this is only
	 * feasible if not running within a transaction manager.
	 * @see javax.jms.MessageConsumer#receive(long)
	 * @see javax.jms.MessageConsumer#receive()
	 * @see #setTransactionTimeout
	 */
	public void setReceiveTimeout(long receiveTimeout) {
		this.receiveTimeout = receiveTimeout;
	}


	public void initialize() {
		// Set sessionTransacted=true in case of a non-JTA transaction manager.
		if (!this.sessionTransactedCalled && this.transactionManager instanceof ResourceTransactionManager &&
				((ResourceTransactionManager) this.transactionManager).getResourceFactory() != getConnectionFactory()) {
			super.setSessionTransacted(true);
		}

		// Use bean name as default transaction name.
		if (this.transactionDefinition.getName() == null) {
			this.transactionDefinition.setName(getBeanName());
		}

		// Proceed with superclass initialization.
		super.initialize();
	}


	/**
	 * Create a MessageConsumer for the given JMS Session,
	 * registering a MessageListener for the specified listener.
	 * @param session the JMS Session to work on
	 * @return the MessageConsumer
	 * @throws javax.jms.JMSException if thrown by JMS methods
	 * @see #receiveAndExecute
	 */
	protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
		Destination destination = getDestination();
		if (destination == null) {
			destination = resolveDestinationName(session, getDestinationName());
		}
		return createConsumer(session, destination);
	}

	/**
	 * Execute the listener for a message received from the given consumer,
	 * wrapping the entire operation in an external transaction if demanded.
	 * @param session the JMS Session to work on
	 * @param consumer the MessageConsumer to work on
	 * @return whether a message has been received
	 * @throws JMSException if thrown by JMS methods
	 * @see #doReceiveAndExecute
	 */
	protected boolean receiveAndExecute(Session session, MessageConsumer consumer) throws JMSException {
		if (this.transactionManager != null) {
			// Execute receive within transaction.
			TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
			boolean messageReceived = true;
			try {
				messageReceived = doReceiveAndExecute(session, consumer, status);
			}
			catch (JMSException ex) {
				rollbackOnException(status, ex);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -