📄 jmsconnector.java
字号:
} catch(Exception e) { // insert code to handle non recoverable errors // simply retry continue; } synchronized(m_jmsLock) { try { m_jmsLock.wait(); } catch(InterruptedException ie) { } // until notified due to some change in status } } // no longer staying connected, so see what we can cleanup internalOnShutdown(); } void startConnection() { synchronized(m_lifecycleLock) { if(m_startConnection) return; m_startConnection = true; try {m_connection.start();}catch(Throwable e) { } // ignore } } void stopConnection() { synchronized(m_lifecycleLock) { if(!m_startConnection) return; m_startConnection = false; try {m_connection.stop();}catch(Throwable e) { } // ignore } } void shutdown() { m_isActive = false; synchronized(m_jmsLock) { m_jmsLock.notifyAll(); } } public void onException(JMSException exception) { if(m_adapter.isRecoverable(exception, JMSVendorAdapter.ON_EXCEPTION_ACTION)) return; onException(); synchronized(m_jmsLock) { m_jmsLock.notifyAll(); } } private final void internalOnConnect() throws Exception { onConnect(); synchronized(m_lifecycleLock) { if(m_startConnection) { try {m_connection.start();}catch(Throwable e) { } // ignore } } } private final void internalOnShutdown() { stopConnection(); onShutdown(); try { m_connection.close(); } catch(Throwable e) { } // ignore } protected abstract void onConnect()throws Exception; protected abstract void onShutdown(); protected abstract void onException(); } protected abstract SyncConnection createSyncConnection(ConnectionFactory factory, javax.jms.Connection connection, int numSessions, String threadName, String clientID, String username, String password) throws JMSException; SyncConnection getSendConnection() { return m_sendConnection; } protected abstract class SyncConnection extends Connection { LinkedList m_senders; int m_numSessions; Object m_senderLock; SyncConnection(ConnectionFactory connectionFactory, javax.jms.Connection connection, int numSessions, String threadName, String clientID, String username, String password) throws JMSException { super(connectionFactory, connection, threadName, clientID, username, password); m_senders = new LinkedList(); m_numSessions = numSessions; m_senderLock = new Object(); } protected abstract SendSession createSendSession(javax.jms.Connection connection) throws JMSException; protected void onConnect() throws JMSException { synchronized(m_senderLock) { for(int i = 0; i < m_numSessions; i++) { m_senders.add(createSendSession(m_connection)); } m_senderLock.notifyAll(); } } byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties) throws Exception { long timeoutTime = System.currentTimeMillis() + timeout; while(true) { if(System.currentTimeMillis() > timeoutTime) { throw new InvokeTimeoutException("Unable to complete call in time allotted"); } SendSession sendSession = null; try { sendSession = getSessionFromPool(m_poolTimeout); byte[] response = sendSession.call(endpoint, message, timeoutTime - System.currentTimeMillis(), properties); returnSessionToPool(sendSession); if(response == null) { throw new InvokeTimeoutException("Unable to complete call in time allotted"); } return response; } catch(JMSException jmse) { if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) { //this we cannot recover from //but it does not invalidate the session returnSessionToPool(sendSession); throw jmse; } //for now we will assume this is a reconnect related issue //and let the sender be collected //give the reconnect thread a chance to fill the pool Thread.yield(); continue; } catch(NullPointerException npe) { Thread.yield(); continue; } } } /** @todo add in handling for security exceptions * @todo add support for timeouts */ void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception { long timeoutTime = System.currentTimeMillis() + m_timeoutTime; while(true) { if(System.currentTimeMillis() > timeoutTime) { throw new InvokeTimeoutException("Cannot complete send in time allotted"); } SendSession sendSession = null; try { sendSession = getSessionFromPool(m_poolTimeout); sendSession.send(endpoint, message, properties); returnSessionToPool(sendSession); } catch(JMSException jmse) { if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) { //this we cannot recover from //but it does not invalidate the session returnSessionToPool(sendSession); throw jmse; } //for now we will assume this is a reconnect related issue //and let the sender be collected //give the reconnect thread a chance to fill the pool Thread.yield(); continue; } catch(NullPointerException npe) { //give the reconnect thread a chance to fill the pool Thread.yield(); continue; } break; } } protected void onException() { synchronized(m_senderLock) { m_senders.clear(); } } protected void onShutdown() { synchronized(m_senderLock) { Iterator senders = m_senders.iterator(); while(senders.hasNext()) { SendSession session = (SendSession)senders.next(); session.cleanup(); } m_senders.clear(); } } private SendSession getSessionFromPool(long timeout) { synchronized(m_senderLock) { while(m_senders.size() == 0) { try { m_senderLock.wait(timeout); if(m_senders.size() == 0) { return null; } } catch(InterruptedException ignore) { return null; } } return (SendSession)m_senders.removeFirst(); } } private void returnSessionToPool(SendSession sendSession) { synchronized(m_senderLock) { m_senders.addLast(sendSession); m_senderLock.notifyAll(); } } protected abstract class SendSession extends ConnectorSession { MessageProducer m_producer; SendSession(Session session, MessageProducer producer) throws JMSException { super(session); m_producer = producer; } protected abstract Destination createTemporaryDestination() throws JMSException; protected abstract void deleteTemporaryDestination(Destination destination) throws JMSException; protected abstract MessageConsumer createConsumer(Destination destination) throws JMSException; protected abstract void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException; void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception { BytesMessage jmsMessage = m_session.createBytesMessage(); jmsMessage.writeBytes(message); int deliveryMode = extractDeliveryMode(properties);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -