📄 abstractmessagelistenercontainer.java
字号:
protected ExceptionListener getExceptionListener() {
return this.exceptionListener;
}
/**
* Set whether to expose the listener JMS Session to a registered
* {@link SessionAwareMessageListener}. Default is "true", reusing
* the listener's {@link Session}.
* <p>Turn this off to expose a fresh JMS Session fetched from the same
* underlying JMS {@link Connection} instead, which might be necessary
* on some JMS providers.
* @see SessionAwareMessageListener
*/
public void setExposeListenerSession(boolean exposeListenerSession) {
this.exposeListenerSession = exposeListenerSession;
}
/**
* Return whether to expose the listener JMS {@link Session} to a
* registered {@link SessionAwareMessageListener}.
*/
protected boolean isExposeListenerSession() {
return exposeListenerSession;
}
/**
* Set whether to automatically start the listener after initialization.
* <p>Default is "true"; set this to "false" to allow for manual startup.
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
/**
* Validate configuration and call {@link #initialize()}.
* @see #initialize()
*/
public void afterPropertiesSet() {
super.afterPropertiesSet();
if (this.destination == null) {
throw new IllegalArgumentException("destination or destinationName is required");
}
if (this.messageListener == null) {
throw new IllegalArgumentException("messageListener is required");
}
if (isSubscriptionDurable() && !isPubSubDomain()) {
throw new IllegalArgumentException("A durable subscription requires a topic (pub-sub domain)");
}
initialize();
}
/**
* Initialize this message listener container.
* <p>Creates a JMS Connection, registers the
* {@link #setMessageListener(Object) given listener object},
* and starts the {@link Connection}
* (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off).
* @throws JmsException if startup failed
*/
public void initialize() throws JmsException {
try {
synchronized (this.lifecycleMonitor) {
this.active = true;
this.lifecycleMonitor.notifyAll();
}
if (sharedConnectionEnabled()) {
establishSharedConnection();
}
if (this.autoStartup) {
doStart();
}
registerListener();
}
catch (JMSException ex) {
synchronized (this.sharedConnectionMonitor) {
JmsUtils.closeConnection(this.sharedConnection);
}
throw convertJmsAccessException(ex);
}
}
/**
* Establish a shared Connection for this message listener container.
* <p>The default implementation delegates to <code>refreshSharedConnection</code>,
* which does one immediate attempt and throws an exception if it fails.
* Can be overridden to have a recovery proces in place, retrying
* until a Connection can be successfully established.
* @throws JMSException if thrown by JMS API methods
* @see #refreshSharedConnection()
*/
protected void establishSharedConnection() throws JMSException {
refreshSharedConnection();
}
/**
* Refresh the shared Connection that this listener container holds.
* <p>Called on startup and also after an infrastructure exception
* that occured during listener setup and/or execution.
* @throws JMSException if thrown by JMS API methods
*/
protected final void refreshSharedConnection() throws JMSException {
boolean running = isRunning();
synchronized (this.sharedConnectionMonitor) {
JmsUtils.closeConnection(this.sharedConnection, running);
Connection con = createConnection();
try {
prepareSharedConnection(con);
}
catch (JMSException ex) {
JmsUtils.closeConnection(con);
throw ex;
}
this.sharedConnection = con;
}
}
/**
* Prepare the given Connection, which is about to be registered
* as shared Connection for this message listener container.
* <p>The default implementation sets the specified client id, if any.
* Subclasses can override this to apply further settings.
* @param connection the Connection to prepare
* @throws JMSException if the preparation efforts failed
* @see #setClientId
*/
protected void prepareSharedConnection(Connection connection) throws JMSException {
if (getClientId() != null) {
connection.setClientID(getClientId());
}
}
/**
* Return the shared JMS Connection maintained by this message listener container.
* Available after initialization.
* @throws IllegalStateException if this listener container does not maintain a
* shared Connection, or if the Connection hasn't been initialized yet
* @see #sharedConnectionEnabled()
*/
protected final Connection getSharedConnection() {
if (!sharedConnectionEnabled()) {
throw new IllegalStateException(
"This message listener container does not maintain a shared Connection");
}
synchronized (this.sharedConnectionMonitor) {
if (this.sharedConnection == null) {
throw new SharedConnectionNotInitializedException(
"This message listener container's shared Connection has not been initialized yet");
}
return this.sharedConnection;
}
}
/**
* Calls <code>shutdown</code> when the BeanFactory destroys
* the message listener container instance.
* @see #shutdown()
*/
public void destroy() {
shutdown();
}
/**
* Shut down the registered listeners and close this
* listener container.
* @throws JmsException if shutdown failed
* @see #destroyListener()
*/
public void shutdown() throws JmsException {
logger.debug("Shutting down message listener container");
boolean wasRunning = false;
synchronized (this.lifecycleMonitor) {
wasRunning = this.running;
this.running = false;
this.active = false;
this.lifecycleMonitor.notifyAll();
}
try {
destroyListener();
}
catch (JMSException ex) {
throw convertJmsAccessException(ex);
}
finally {
synchronized (this.sharedConnectionMonitor) {
JmsUtils.closeConnection(this.sharedConnection, wasRunning);
}
}
}
/**
* Return whether this listener container is currently active,
* that is, whether it has been set up but not shut down yet.
*/
public final boolean isActive() {
synchronized (this.lifecycleMonitor) {
return this.active;
}
}
//-------------------------------------------------------------------------
// Lifecycle methods for dynamically starting and stopping the listener
//-------------------------------------------------------------------------
/**
* Start this listener container.
* @throws JmsException if starting failed
* @see #doStart
*/
public void start() throws JmsException {
try {
doStart();
}
catch (JMSException ex) {
throw convertJmsAccessException(ex);
}
}
/**
* Start the shared Connection, if any, and notify all listener tasks.
* @throws JMSException if thrown by JMS API methods
* @see #startSharedConnection
*/
protected void doStart() throws JMSException {
synchronized (this.lifecycleMonitor) {
this.running = true;
this.lifecycleMonitor.notifyAll();
for (Iterator it = this.pausedTasks.iterator(); it.hasNext();) {
doRescheduleTask(it.next());
it.remove();
}
}
if (sharedConnectionEnabled()) {
startSharedConnection();
}
}
/**
* Start the shared Connection.
* @throws JMSException if thrown by JMS API methods
* @see javax.jms.Connection#start()
*/
protected void startSharedConnection() throws JMSException {
synchronized (this.sharedConnectionMonitor) {
if (this.sharedConnection != null) {
try {
this.sharedConnection.start();
}
catch (javax.jms.IllegalStateException ex) {
logger.debug("Ignoring Connection start exception - assuming already started", ex);
}
}
}
}
/**
* Stop this listener container.
* @throws JmsException if stopping failed
* @see #doStop
*/
public void stop() throws JmsException {
try {
doStop();
}
catch (JMSException ex) {
throw convertJmsAccessException(ex);
}
}
/**
* Notify all listener tasks and stop the shared Connection, if any.
* @throws JMSException if thrown by JMS API methods
* @see #stopSharedConnection
*/
protected void doStop() throws JMSException {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.lifecycleMonitor.notifyAll();
}
if (sharedConnectionEnabled()) {
stopSharedConnection();
}
}
/**
* Stop the shared Connection.
* @throws JMSException if thrown by JMS API methods
* @see javax.jms.Connection#start()
*/
protected void stopSharedConnection() throws JMSException {
synchronized (this.sharedConnectionMonitor) {
if (this.sharedConnection != null) {
try {
this.sharedConnection.stop();
}
catch (javax.jms.IllegalStateException ex) {
logger.debug("Ignoring Connection stop exception - assuming already stopped", ex);
}
}
}
}
/**
* Return whether this listener container is currently running,
* that is, whether it has been started and not stopped yet.
*/
public final boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
}
}
/**
* Wait while this listener container is not running.
* <p>To be called by asynchronous tasks that want to block
* while the listener container is in stopped state.
*/
protected final void waitWhileNotRunning() {
synchronized (this.lifecycleMonitor) {
while (this.active && !this.running) {
try {
this.lifecycleMonitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
}
}
}
/**
* Take the given task object and reschedule it,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -