📄 jmsconnector.java
字号:
/* * Copyright 2001, 2002,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.axis.transport.jms;import org.apache.axis.components.jms.JMSVendorAdapter;import javax.jms.BytesMessage;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.ExceptionListener;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Session;import java.io.ByteArrayOutputStream;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;// No vendor dependent exception classes//import progress.message.client.EUserAlreadyConnected;//import progress.message.jclient.ErrorCodes;/** * JMSConnector is an abstract class that encapsulates the work of connecting * to JMS destinations. Its subclasses are TopicConnector and QueueConnector * which further specialize connections to the pub-sub and the ptp domains. * It also implements the capability to retry connections in the event of * failures. * * @author Jaime Meritt (jmeritt@sonicsoftware.com) * @author Richard Chung (rchung@sonicsoftware.com) * @author Dave Chappell (chappell@sonicsoftware.com) * @author Ray Chun (rchun@sonicsoftware.com) */public abstract class JMSConnector{ protected int m_numRetries; protected long m_connectRetryInterval; protected long m_interactRetryInterval; protected long m_timeoutTime; protected long m_poolTimeout; protected AsyncConnection m_receiveConnection; protected SyncConnection m_sendConnection; protected int m_numSessions; protected boolean m_allowReceive; protected JMSVendorAdapter m_adapter; protected JMSURLHelper m_jmsurl; public JMSConnector(ConnectionFactory connectionFactory, int numRetries, int numSessions, long connectRetryInterval, long interactRetryInterval, long timeoutTime, boolean allowReceive, String clientID, String username, String password, JMSVendorAdapter adapter, JMSURLHelper jmsurl) throws JMSException { m_numRetries = numRetries; m_connectRetryInterval = connectRetryInterval; m_interactRetryInterval = interactRetryInterval; m_timeoutTime = timeoutTime; m_poolTimeout = timeoutTime/(long)numRetries; m_numSessions = numSessions; m_allowReceive = allowReceive; m_adapter = adapter; m_jmsurl = jmsurl; // try to connect initially so we can fail fast // in the case of irrecoverable errors. // If we fail in a recoverable fashion we will retry javax.jms.Connection sendConnection = createConnectionWithRetry( connectionFactory, username, password); m_sendConnection = createSyncConnection(connectionFactory, sendConnection, m_numSessions, "SendThread", clientID, username, password); m_sendConnection.start(); if(m_allowReceive) { javax.jms.Connection receiveConnection = createConnectionWithRetry( connectionFactory, username, password); m_receiveConnection = createAsyncConnection(connectionFactory, receiveConnection, "ReceiveThread", clientID, username, password); m_receiveConnection.start(); } } public int getNumRetries() { return m_numRetries; } public int numSessions() { return m_numSessions; } public ConnectionFactory getConnectionFactory() { // there is always a send connection return getSendConnection().getConnectionFactory(); } public String getClientID() { return getSendConnection().getClientID(); } public String getUsername() { return getSendConnection().getUsername(); } public String getPassword() { return getSendConnection().getPassword(); } public JMSVendorAdapter getVendorAdapter() { return m_adapter; } public JMSURLHelper getJMSURL() { return m_jmsurl; } protected javax.jms.Connection createConnectionWithRetry( ConnectionFactory connectionFactory, String username, String password) throws JMSException { javax.jms.Connection connection = null; for(int numTries = 1; connection == null; numTries++) { try { connection = internalConnect(connectionFactory, username, password); } catch(JMSException jmse) { if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries) throw jmse; else try{Thread.sleep(m_connectRetryInterval);}catch(InterruptedException ie){}; } } return connection; } public void stop() { JMSConnectorManager.getInstance().removeConnectorFromPool(this); m_sendConnection.stopConnection(); if(m_allowReceive) m_receiveConnection.stopConnection(); } public void start() { m_sendConnection.startConnection(); if(m_allowReceive) m_receiveConnection.startConnection(); JMSConnectorManager.getInstance().addConnectorToPool(this); } public void shutdown() { m_sendConnection.shutdown(); if(m_allowReceive) m_receiveConnection.shutdown(); } public abstract JMSEndpoint createEndpoint(String destinationName) throws JMSException; public abstract JMSEndpoint createEndpoint(Destination destination) throws JMSException; protected abstract javax.jms.Connection internalConnect( ConnectionFactory connectionFactory, String username, String password) throws JMSException; private abstract class Connection extends Thread implements ExceptionListener { private ConnectionFactory m_connectionFactory; protected javax.jms.Connection m_connection; protected boolean m_isActive; private boolean m_needsToConnect; private boolean m_startConnection; private String m_clientID; private String m_username; private String m_password; private Object m_jmsLock; private Object m_lifecycleLock; protected Connection(ConnectionFactory connectionFactory, javax.jms.Connection connection, String threadName, String clientID, String username, String password) throws JMSException { super(threadName); m_connectionFactory = connectionFactory; m_clientID = clientID; m_username = username; m_password = password; m_jmsLock = new Object(); m_lifecycleLock = new Object(); if (connection != null) { m_needsToConnect = false; m_connection = connection; m_connection.setExceptionListener(this); if(m_clientID != null) m_connection.setClientID(m_clientID); } else { m_needsToConnect = true; } m_isActive = true; } public ConnectionFactory getConnectionFactory() { return m_connectionFactory; } public String getClientID() { return m_clientID; } public String getUsername() { return m_username; } public String getPassword() { return m_password; } /** * @todo handle non-recoverable errors */ public void run() { // loop until a connection is made and when a connection is made (re)establish // any subscriptions while (m_isActive) { if (m_needsToConnect) { m_connection = null; try { m_connection = internalConnect(m_connectionFactory, m_username, m_password); m_connection.setExceptionListener(this); if(m_clientID != null) m_connection.setClientID(m_clientID); } catch(JMSException e) { // simply backoff for a while and then retry try { Thread.sleep(m_connectRetryInterval); } catch(InterruptedException ie) { } continue; } } else m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because // we lost the connection // we now have a valid connection so establish some context try { internalOnConnect();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -