📄 defaultmessagelistenercontainer.java
字号:
* cache level.
* @see #CACHE_NONE
* @see #CACHE_CONNECTION
* @see #CACHE_SESSION
* @see #CACHE_CONSUMER
* @see #setTransactionManager
*/
public void setCacheLevel(int cacheLevel) {
this.cacheLevel = new Integer(cacheLevel);
}
/**
* Return the level of caching that this listener container is allowed to apply.
*/
public int getCacheLevel() {
return (this.cacheLevel != null ? this.cacheLevel.intValue() : CACHE_NONE);
}
/**
* Validates this instance's configuration.
*/
public void afterPropertiesSet() {
if (this.concurrentConsumers <= 0) {
throw new IllegalArgumentException("concurrentConsumers value must be at least 1 (one)");
}
if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
}
super.afterPropertiesSet();
}
//-------------------------------------------------------------------------
// Implementation of AbstractMessageListenerContainer's template methods
//-------------------------------------------------------------------------
public void initialize() {
if (this.transactionManager != null) {
if (this.cacheLevel == null) {
this.cacheLevel = new Integer(CACHE_NONE);
}
}
else {
if (this.cacheLevel == null) {
this.cacheLevel = new Integer(CACHE_CONSUMER);
}
}
// Prepare taskExecutor and maxMessagesPerTask.
if (this.taskExecutor == null) {
this.taskExecutor = new SimpleAsyncTaskExecutor(DEFAULT_THREAD_NAME_PREFIX);
}
else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
this.maxMessagesPerTask == Integer.MIN_VALUE) {
// TaskExecutor indicated a preference for short-lived tasks. According to
// setMaxMessagesPerTask javadoc, we'll use 1 message per task in this case
// unless the user specified a custom value.
this.maxMessagesPerTask = 1;
}
// Proceed with actual listener initialization.
super.initialize();
}
/**
* Use a shared JMS Connection depending on the "cacheLevel" setting.
* @see #setCacheLevel
* @see #CACHE_CONNECTION
*/
protected final boolean sharedConnectionEnabled() {
return (getCacheLevel() >= CACHE_CONNECTION);
}
/**
* Creates the specified number of concurrent consumers,
* in the form of a JMS Session plus associated MessageConsumer
* running in a separate thread.
* @see #setTaskExecutor
*/
protected void registerListener() throws JMSException {
for (int i = 0; i < this.concurrentConsumers; i++) {
this.taskExecutor.execute(new AsyncMessageListenerInvoker());
}
}
/**
* Executes the given task via this listener container's TaskExecutor.
* @see #setTaskExecutor
*/
protected void doRescheduleTask(Object task) {
this.taskExecutor.execute((Runnable) task);
}
/**
* Create a MessageConsumer for the given JMS Session,
* registering a MessageListener for the specified listener.
* @param session the JMS Session to work on
* @return the MessageConsumer
* @throws JMSException if thrown by JMS methods
* @see #receiveAndExecute
*/
protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
Destination destination = getDestination();
if (destination == null) {
destination = resolveDestinationName(session, getDestinationName());
}
return createConsumer(session, destination);
}
/**
* Execute the listener for a message received from the given consumer,
* wrapping the entire operation in an external transaction if demanded.
* @param session the JMS Session to work on
* @param consumer the MessageConsumer to work on
* @throws JMSException if thrown by JMS methods
* @see #doReceiveAndExecute
*/
protected void receiveAndExecute(Session session, MessageConsumer consumer) throws JMSException {
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
try {
doReceiveAndExecute(session, consumer, status);
}
catch (JMSException ex) {
rollbackOnException(status, ex);
throw ex;
}
catch (RuntimeException ex) {
rollbackOnException(status, ex);
throw ex;
}
catch (Error err) {
rollbackOnException(status, err);
throw err;
}
this.transactionManager.commit(status);
}
else {
// Execute receive outside of transaction.
doReceiveAndExecute(session, consumer, null);
}
}
/**
* Actually execute the listener for a message received from the given consumer,
* fetching all requires resources and invoking the listener.
* @param session the JMS Session to work on
* @param consumer the MessageConsumer to work on
* @param status the TransactionStatus (may be <code>null</code>)
* @throws JMSException if thrown by JMS methods
* @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
*/
protected void doReceiveAndExecute(Session session, MessageConsumer consumer, TransactionStatus status)
throws JMSException {
Connection conToClose = null;
Session sessionToClose = null;
MessageConsumer consumerToClose = null;
try {
Session sessionToUse = session;
boolean transactional = false;
if (sessionToUse == null) {
sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
getConnectionFactory(), this.transactionalResourceFactory);
transactional = (sessionToUse != null);
}
if (sessionToUse == null) {
Connection conToUse = null;
if (sharedConnectionEnabled()) {
conToUse = getSharedConnection();
}
else {
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
}
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
}
MessageConsumer consumerToUse = consumer;
if (consumerToUse == null) {
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
}
Message message = receiveMessage(consumerToUse);
if (message != null) {
if (logger.isDebugEnabled()) {
logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
sessionToUse + "]");
}
try {
doExecuteListener(sessionToUse, message);
}
catch (Throwable ex) {
if (status != null) {
if (logger.isDebugEnabled()) {
logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
}
status.setRollbackOnly();
}
handleListenerException(ex);
}
}
}
finally {
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
JmsUtils.closeConnection(conToClose, true);
}
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param status object representing the transaction
* @param ex the thrown application exception or error
*/
private void rollbackOnException(TransactionStatus status, Throwable ex) {
logger.debug("Initiating transaction rollback on application exception", ex);
try {
this.transactionManager.rollback(status);
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
/**
* Receive a message from the given consumer.
* @param consumer the MessageConsumer to use
* @return the Message, or <code>null</code> if none
* @throws JMSException if thrown by JMS methods
*/
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
}
/**
* Overridden to accept a failure in the initial setup - leaving it up to the
* asynchronous invokers to establish the shared Connection on first access.
* @see #refreshConnectionUntilSuccessful()
*/
protected void establishSharedConnection() {
try {
refreshSharedConnection();
}
catch (JMSException ex) {
logger.debug("Could not establish shared JMS Connection - " +
"leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
}
}
/**
* This implementations proceeds even after an exception thrown from
* <code>Connection.start()</code>, relying on listeners to perform
* appropriate recovery.
*/
protected void startSharedConnection() {
try {
super.startSharedConnection();
}
catch (JMSException ex) {
logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
}
}
/**
* This implementations proceeds even after an exception thrown from
* <code>Connection.stop()</code>, relying on listeners to perform
* appropriate recovery after a restart.
*/
protected void stopSharedConnection() {
try {
super.stopSharedConnection();
}
catch (JMSException ex) {
logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
}
}
/**
* Handle the given exception that arose during setup of a listener.
* Called for every such exception in every concurrent listener.
* <p>The default implementation logs the exception at error level
* if not recovered yet, and at debug level if already recovered.
* Can be overridden in subclasses.
* @param ex the exception to handle
* @param alreadyRecovered whether a previously executing listener
* already recovered from the present listener setup failure
* (this usually indicates a follow-up failure than be ignored
* other than for debug log purposes)
* @see #recoverAfterListenerSetupFailure()
*/
protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
if (ex instanceof JMSException) {
invokeExceptionListener((JMSException) ex);
}
if (ex instanceof SharedConnectionNotInitializedException) {
if (!alreadyRecovered) {
logger.debug("JMS message listener invoker needs to establish shared Connection");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -