📄 jmsconnector.java
字号:
int priority = extractPriority(properties); long timeToLive = extractTimeToLive(properties); if(properties != null && !properties.isEmpty()) setProperties(properties, jmsMessage); send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority, timeToLive); } void cleanup() { try{m_producer.close();}catch(Throwable t){} try{m_session.close();}catch(Throwable t){} } byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties) throws Exception { Destination reply = createTemporaryDestination(); MessageConsumer subscriber = createConsumer(reply); BytesMessage jmsMessage = m_session.createBytesMessage(); jmsMessage.writeBytes(message); jmsMessage.setJMSReplyTo(reply); int deliveryMode = extractDeliveryMode(properties); int priority = extractPriority(properties); long timeToLive = extractTimeToLive(properties); if(properties != null && !properties.isEmpty()) setProperties(properties, jmsMessage); send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority, timeToLive); BytesMessage response = null; try { response = (BytesMessage)subscriber.receive(timeout); } catch (ClassCastException cce) { throw new InvokeException ("Error: unexpected message type received - expected BytesMessage"); } byte[] respBytes = null; if(response != null) { byte[] buffer = new byte[8 * 1024]; ByteArrayOutputStream out = new ByteArrayOutputStream(); for(int bytesRead = response.readBytes(buffer); bytesRead != -1; bytesRead = response.readBytes(buffer)) { out.write(buffer, 0, bytesRead); } respBytes = out.toByteArray(); } subscriber.close(); deleteTemporaryDestination(reply); return respBytes; } private int extractPriority(HashMap properties) { return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY, JMSConstants.DEFAULT_PRIORITY); } private int extractDeliveryMode(HashMap properties) { return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE, JMSConstants.DEFAULT_DELIVERY_MODE); } private long extractTimeToLive(HashMap properties) { return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE, JMSConstants.DEFAULT_TIME_TO_LIVE); } private void setProperties(HashMap properties, Message message) throws JMSException { Iterator propertyIter = properties.entrySet().iterator(); while(propertyIter.hasNext()) { Map.Entry property = (Map.Entry)propertyIter.next(); setProperty((String)property.getKey(), property.getValue(), message); } } private void setProperty(String property, Object value, Message message) throws JMSException { if(property == null) return; if(property.equals(JMSConstants.JMS_CORRELATION_ID)) message.setJMSCorrelationID((String)value); else if(property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) message.setJMSCorrelationIDAsBytes((byte[])value); else if(property.equals(JMSConstants.JMS_TYPE)) message.setJMSType((String)value); else message.setObjectProperty(property, value); } } } AsyncConnection getReceiveConnection() { return m_receiveConnection; } protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory, javax.jms.Connection connection, String threadName, String clientID, String username, String password) throws JMSException; protected abstract class AsyncConnection extends Connection { HashMap m_subscriptions; Object m_subscriptionLock; protected AsyncConnection(ConnectionFactory connectionFactory, javax.jms.Connection connection, String threadName, String clientID, String username, String password) throws JMSException { super(connectionFactory, connection, threadName, clientID, username, password); m_subscriptions = new HashMap(); m_subscriptionLock = new Object(); } protected abstract ListenerSession createListenerSession( javax.jms.Connection connection, Subscription subscription) throws Exception; protected void onShutdown() { synchronized(m_subscriptionLock) { Iterator subscriptions = m_subscriptions.keySet().iterator(); while(subscriptions.hasNext()) { Subscription subscription = (Subscription)subscriptions.next(); ListenerSession session = (ListenerSession) m_subscriptions.get(subscription); if(session != null) { session.cleanup(); } } m_subscriptions.clear(); } } /** * @todo add in security exception propagation * @param subscription */ void subscribe(Subscription subscription) throws Exception { long timeoutTime = System.currentTimeMillis() + m_timeoutTime; synchronized(m_subscriptionLock) { if(m_subscriptions.containsKey(subscription)) return; while(true) { if(System.currentTimeMillis() > timeoutTime) { throw new InvokeTimeoutException("Cannot subscribe listener"); } try { ListenerSession session = createListenerSession(m_connection, subscription); m_subscriptions.put(subscription, session); break; } catch(JMSException jmse) { if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) { throw jmse; } try{m_subscriptionLock.wait(m_interactRetryInterval);} catch(InterruptedException ignore){} //give reconnect a chance Thread.yield(); continue; } catch(NullPointerException jmse) { //we ARE reconnecting try{m_subscriptionLock.wait(m_interactRetryInterval);} catch(InterruptedException ignore){} //give reconnect a chance Thread.yield(); continue; } } } } void unsubscribe(Subscription subscription) { long timeoutTime = System.currentTimeMillis() + m_timeoutTime; synchronized(m_subscriptionLock) { if(!m_subscriptions.containsKey(subscription)) return; while(true) { if(System.currentTimeMillis() > timeoutTime) { throw new InvokeTimeoutException("Cannot unsubscribe listener"); } //give reconnect a chance Thread.yield(); try { ListenerSession session = (ListenerSession) m_subscriptions.get(subscription); session.cleanup(); m_subscriptions.remove(subscription); break; } catch(NullPointerException jmse) { //we are reconnecting try{m_subscriptionLock.wait(m_interactRetryInterval);} catch(InterruptedException ignore){} continue; } } } } protected void onConnect() throws Exception { synchronized(m_subscriptionLock) { Iterator subscriptions = m_subscriptions.keySet().iterator(); while(subscriptions.hasNext()) { Subscription subscription = (Subscription)subscriptions.next(); if(m_subscriptions.get(subscription) == null) { m_subscriptions.put(subscription, createListenerSession(m_connection, subscription)); } } m_subscriptionLock.notifyAll(); } } protected void onException() { synchronized(m_subscriptionLock) { Iterator subscriptions = m_subscriptions.keySet().iterator(); while(subscriptions.hasNext()) { Subscription subscription = (Subscription)subscriptions.next(); m_subscriptions.put(subscription, null); } } } protected class ListenerSession extends ConnectorSession { protected MessageConsumer m_consumer; protected Subscription m_subscription; ListenerSession(Session session, MessageConsumer consumer, Subscription subscription) throws Exception { super(session); m_subscription = subscription; m_consumer = consumer; Destination destination = subscription.m_endpoint.getDestination(m_session); m_consumer.setMessageListener(subscription.m_listener); } void cleanup() { try{m_consumer.close();}catch(Exception ignore){} try{m_session.close();}catch(Exception ignore){} } } } private abstract class ConnectorSession { Session m_session; ConnectorSession(Session session) throws JMSException { m_session = session; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -