📄 abstractpollingmessagelistenercontainer.java
字号:
throw ex;
}
catch (RuntimeException ex) {
rollbackOnException(status, ex);
throw ex;
}
catch (Error err) {
rollbackOnException(status, err);
throw err;
}
this.transactionManager.commit(status);
return messageReceived;
}
else {
// Execute receive outside of transaction.
return doReceiveAndExecute(session, consumer, null);
}
}
/**
* Actually execute the listener for a message received from the given consumer,
* fetching all requires resources and invoking the listener.
* @param session the JMS Session to work on
* @param consumer the MessageConsumer to work on
* @param status the TransactionStatus (may be <code>null</code>)
* @return whether a message has been received
* @throws JMSException if thrown by JMS methods
* @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
*/
protected boolean doReceiveAndExecute(Session session, MessageConsumer consumer, TransactionStatus status)
throws JMSException {
Connection conToClose = null;
Session sessionToClose = null;
MessageConsumer consumerToClose = null;
try {
Session sessionToUse = session;
boolean transactional = false;
if (sessionToUse == null) {
sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
getConnectionFactory(), this.transactionalResourceFactory);
transactional = (sessionToUse != null);
}
if (sessionToUse == null) {
Connection conToUse = null;
if (sharedConnectionEnabled()) {
conToUse = getSharedConnection();
}
else {
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
}
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
}
MessageConsumer consumerToUse = consumer;
if (consumerToUse == null) {
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
}
Message message = receiveMessage(consumerToUse);
if (message != null) {
if (logger.isDebugEnabled()) {
logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
sessionToUse + "]");
}
messageReceived(message, session);
try {
doExecuteListener(sessionToUse, message);
}
catch (Throwable ex) {
if (status != null) {
if (logger.isDebugEnabled()) {
logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
}
status.setRollbackOnly();
}
handleListenerException(ex);
}
return true;
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
"session [" + sessionToUse + "] did not receive a message");
}
return false;
}
}
finally {
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
}
}
/**
* This implementation checks whether the Session is externally synchronized.
* In this case, the Session is not locally transacted, despite the listener
* container's "sessionTransacted" flag being set to "true".
* @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
*/
protected boolean isSessionLocallyTransacted(Session session) {
return super.isSessionLocallyTransacted(session) &&
!ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory());
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param status object representing the transaction
* @param ex the thrown application exception or error
*/
private void rollbackOnException(TransactionStatus status, Throwable ex) {
logger.debug("Initiating transaction rollback on application exception", ex);
try {
this.transactionManager.rollback(status);
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
/**
* Receive a message from the given consumer.
* @param consumer the MessageConsumer to use
* @return the Message, or <code>null</code> if none
* @throws JMSException if thrown by JMS methods
*/
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
}
/**
* Template method that gets called right when a new message has been received,
* before attempting to process it. Allows subclasses to react to the event
* of an actual incoming message, for example adapting their consumer count.
* @param message the received message
* @param session the receiving JMS Session
*/
protected void messageReceived(Message message, Session session) {
}
//-------------------------------------------------------------------------
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
//-------------------------------------------------------------------------
/**
* Fetch an appropriate Connection from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Connection.
* @param holder the JmsResourceHolder
* @return an appropriate Connection fetched from the holder,
* or <code>null</code> if none found
*/
protected Connection getConnection(JmsResourceHolder holder) {
return holder.getConnection();
}
/**
* Fetch an appropriate Session from the given JmsResourceHolder.
* <p>This implementation accepts any JMS 1.1 Session.
* @param holder the JmsResourceHolder
* @return an appropriate Session fetched from the holder,
* or <code>null</code> if none found
*/
protected Session getSession(JmsResourceHolder holder) {
return holder.getSession();
}
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
// Only pass in the NoLocal flag in case of a Topic:
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
if (isPubSubDomain()) {
if (isSubscriptionDurable() && destination instanceof Topic) {
return session.createDurableSubscriber(
(Topic) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
/**
* ResourceFactory implementation that delegates to this listener container's protected callback methods.
*/
private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
public Connection getConnection(JmsResourceHolder holder) {
return AbstractPollingMessageListenerContainer.this.getConnection(holder);
}
public Session getSession(JmsResourceHolder holder) {
return AbstractPollingMessageListenerContainer.this.getSession(holder);
}
public Connection createConnection() throws JMSException {
if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
return AbstractPollingMessageListenerContainer.this.getSharedConnection();
}
else {
return AbstractPollingMessageListenerContainer.this.createConnection();
}
}
public Session createSession(Connection con) throws JMSException {
return AbstractPollingMessageListenerContainer.this.createSession(con);
}
public boolean isSynchedLocalTransactionAllowed() {
return AbstractPollingMessageListenerContainer.this.isSessionTransacted();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -