📄 abstractmessagelistenercontainer.java
字号:
* either immediately if this listener container is currently running,
* or later once this listener container has been restarted.
* <p>If this listener container has already been shut down,
* the task will not get rescheduled at all.
* @param task the task object to reschedule
* @return whether the task has been rescheduled
* (either immediately or for a restart of this container)
* @see #doRescheduleTask
*/
protected final boolean rescheduleTaskIfNecessary(Object task) {
Assert.notNull(task, "Task object must bot be null");
synchronized (this.lifecycleMonitor) {
if (this.running) {
doRescheduleTask(task);
return true;
}
else if (this.active) {
this.pausedTasks.add(task);
return true;
}
else {
return false;
}
}
}
/**
* Reschedule the given task object immediately.
* To be implemented by subclasses if they ever call
* <code>rescheduleTaskIfNecessary</code>.
* <p>This implementation throws an UnsupportedOperationException.
* @param task the task object to reschedule
* @see #rescheduleTaskIfNecessary
*/
protected void doRescheduleTask(Object task) {
throw new UnsupportedOperationException(
ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks");
}
//-------------------------------------------------------------------------
// Template methods for listener execution
//-------------------------------------------------------------------------
/**
* Execute the specified listener,
* committing or rolling back the transaction afterwards (if necessary).
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @see #invokeListener
* @see #commitIfNecessary
* @see #rollbackOnExceptionIfNecessary
* @see #handleListenerException
*/
protected void executeListener(Session session, Message message) {
try {
doExecuteListener(session, message);
}
catch (Throwable ex) {
handleListenerException(ex);
}
}
/**
* Execute the specified listener,
* committing or rolling back the transaction afterwards (if necessary).
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see #invokeListener
* @see #commitIfNecessary
* @see #rollbackOnExceptionIfNecessary
* @see #convertJmsAccessException
*/
protected void doExecuteListener(Session session, Message message) throws JMSException {
try {
invokeListener(session, message);
}
catch (JMSException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
catch (RuntimeException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
catch (Error err) {
rollbackOnExceptionIfNecessary(session, err);
throw err;
}
commitIfNecessary(session, message);
}
/**
* Invoke the specified listener: either as standard JMS MessageListener
* or (preferably) as Spring SessionAwareMessageListener.
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see #setMessageListener
*/
protected void invokeListener(Session session, Message message) throws JMSException {
if (getMessageListener() instanceof SessionAwareMessageListener) {
doInvokeListener((SessionAwareMessageListener) getMessageListener(), session, message);
}
else if (getMessageListener() instanceof MessageListener) {
doInvokeListener((MessageListener) getMessageListener(), message);
}
else {
throw new IllegalArgumentException("Only MessageListener and SessionAwareMessageListener supported");
}
}
/**
* Invoke the specified listener as Spring SessionAwareMessageListener,
* exposing a new JMS Session (potentially with its own transaction)
* to the listener if demanded.
* @param listener the Spring SessionAwareMessageListener to invoke
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see SessionAwareMessageListener
* @see #setExposeListenerSession
*/
protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message)
throws JMSException {
Connection conToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = session;
if (!isExposeListenerSession()) {
// We need to expose a separate Session.
conToClose = createConnection();
sessionToClose = createSession(conToClose);
sessionToUse = sessionToClose;
}
// Actually invoke the message listener...
if (logger.isDebugEnabled()) {
logger.debug("Invoking listener with message of type [" + message.getClass() +
"] and session [" + sessionToUse + "]");
}
listener.onMessage(message, sessionToUse);
// Clean up specially exposed Session, if any.
if (sessionToUse != session) {
if (sessionToUse.getTransacted() && isSessionTransacted()) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(sessionToUse);
}
}
}
finally {
JmsUtils.closeSession(sessionToClose);
JmsUtils.closeConnection(conToClose);
}
}
/**
* Invoke the specified listener as standard JMS MessageListener.
* <p>Default implementation performs a plain invocation of the
* <code>onMessage</code> method.
* @param listener the JMS MessageListener to invoke
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see javax.jms.MessageListener#onMessage
*/
protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
listener.onMessage(message);
}
/**
* Perform a commit or message acknowledgement, as appropriate.
* @param session the JMS Session to commit
* @param message the Message to acknowledge
* @throws javax.jms.JMSException in case of commit failure
*/
protected void commitIfNecessary(Session session, Message message) throws JMSException {
// Commit session or acknowledge message.
if (session.getTransacted()) {
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionTransacted()) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(session);
}
}
else if (isClientAcknowledge(session)) {
message.acknowledge();
}
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param session the JMS Session to rollback
* @param ex the thrown application exception or error
* @throws javax.jms.JMSException in case of a rollback error
*/
protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException {
try {
if (session.getTransacted() && isSessionTransacted()) {
// Transacted session created by this container -> rollback.
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception", ex);
}
JmsUtils.rollbackIfNecessary(session);
}
}
catch (IllegalStateException ex2) {
logger.debug("Could not roll back because Session already closed", ex2);
}
catch (JMSException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
/**
* Handle the given exception that arose during listener execution.
* <p>The default implementation logs the exception at error level,
* not propagating it to the JMS provider - assuming that all handling of
* acknowledgement and/or transactions is done by this listener container.
* This can be overridden in subclasses.
* @param ex the exception to handle
*/
protected void handleListenerException(Throwable ex) {
if (ex instanceof JMSException) {
invokeExceptionListener((JMSException) ex);
}
if (isActive()) {
// Regular case: failed while active.
// Log at error level.
logger.error("Execution of JMS message listener failed", ex);
}
else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown log.
logger.debug("Listener exception after container shutdown", ex);
}
}
/**
* Invoke the registered JMS ExceptionListener, if any.
* @param ex the exception that arose during JMS processing
* @see #setExceptionListener
*/
protected void invokeExceptionListener(JMSException ex) {
ExceptionListener exceptionListener = getExceptionListener();
if (exceptionListener != null) {
exceptionListener.onException(ex);
}
}
//-------------------------------------------------------------------------
// Template methods to be implemented by subclasses
//-------------------------------------------------------------------------
/**
* Return whether a shared JMS Connection should be maintained
* by this listener container base class.
* @see #getSharedConnection()
*/
protected abstract boolean sharedConnectionEnabled();
/**
* Register the specified listener on the underlying JMS Connection.
* <p>Subclasses need to implement this method for their specific
* listener management process.
* @throws JMSException if registration failed
* @see #getMessageListener()
* @see #getSharedConnection()
*/
protected abstract void registerListener() throws JMSException;
/**
* Destroy the registered listener.
* The JMS Connection will automatically be closed <i>afterwards</i>
* <p>Subclasses need to implement this method for their specific
* listener management process.
* @throws JMSException if destruction failed
*/
protected abstract void destroyListener() throws JMSException;
//-------------------------------------------------------------------------
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
//-------------------------------------------------------------------------
/**
* Create a JMS Connection via this template's ConnectionFactory.
* <p>This implementation uses JMS 1.1 API.
* @return the new JMS Connection
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected Connection createConnection() throws JMSException {
return getConnectionFactory().createConnection();
}
/**
* Create a JMS Session for the given Connection.
* <p>This implementation uses JMS 1.1 API.
* @param con the JMS Connection to create a Session for
* @return the new JMS Session
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected Session createSession(Connection con) throws JMSException {
return con.createSession(isSessionTransacted(), getSessionAcknowledgeMode());
}
/**
* Return whether the Session is in client acknowledge mode.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to check
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected boolean isClientAcknowledge(Session session) throws JMSException {
return (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
}
/**
* Exception that indicates that the initial setup of this listener container's
* shared JMS Connection failed. This is indicating to invokers that they need
* to establish the shared Connection themselves on first access.
*/
protected static class SharedConnectionNotInitializedException extends RuntimeException {
public SharedConnectionNotInitializedException(String msg) {
super(msg);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -