📄 jmsconnector.java
字号:
if (connection != null) { connection.setExceptionListener(this); } if (clientId != null) { connection.setClientID(getClientId()); } return connection; } public void onException(JMSException jmsException) { final JmsConnector jmsConnector = JmsConnector.this; Map receivers = jmsConnector.getReceivers(); boolean isMultiConsumerReceiver = false; if (!receivers.isEmpty()) { Map.Entry entry = (Map.Entry) receivers.entrySet().iterator().next(); if (entry.getValue() instanceof MultiConsumerJmsMessageReceiver) { isMultiConsumerReceiver = true; } } int expectedReceiverCount = isMultiConsumerReceiver ? 1 : (jmsConnector.getReceivers().size() * jmsConnector.getNumberOfConcurrentTransactedReceivers()); if (logger.isDebugEnabled()) { logger.debug("About to recycle myself due to remote JMS connection shutdown but need " + "to wait for all active receivers to report connection loss. Receiver count: " + (receiverReportedExceptionCount.get() + 1) + '/' + expectedReceiverCount); } if (receiverReportedExceptionCount.incrementAndGet() >= expectedReceiverCount) { receiverReportedExceptionCount.set(0); handleException(new ConnectException(jmsException, this)); } } protected void doConnect() throws ConnectException { try { if (jmsSupport == null) { if (JmsConstants.JMS_SPECIFICATION_102B.equals(specification)) { jmsSupport = new Jms102bSupport(this); } else { jmsSupport = new Jms11Support(this); } } } catch (Exception e) { throw new ConnectException(CoreMessages.failedToCreate("Jms Connector"), e, this); } try { connection = createConnection(); if (started.get()) { connection.start(); } } catch (Exception e) { throw new ConnectException(e, this); } } protected void doDisconnect() throws ConnectException { try { if (connection != null) { connection.close(); } } catch (Exception e) { throw new ConnectException(e, this); } finally { // connectionFactory = null; connection = null; } } public MessageAdapter getMessageAdapter(Object message) throws MessagingException { JmsMessageAdapter adapter = (JmsMessageAdapter) super.getMessageAdapter(message); adapter.setSpecification(this.getSpecification()); return adapter; } protected Object getReceiverKey(Service service, InboundEndpoint endpoint) { return service.getName() + "~" + endpoint.getEndpointURI().getAddress(); } public Session getSessionFromTransaction() { Transaction tx = TransactionCoordination.getInstance().getTransaction(); if (tx != null) { if (tx.hasResource(connection)) { if (logger.isDebugEnabled()) { logger.debug("Retrieving jms session from current transaction " + tx); } Session session = (Session) tx.getResource(connection); if (logger.isDebugEnabled()) { logger.debug("Using " + session + " bound to transaction " + tx); } return session; } } return null; } public Session getSession(ImmutableEndpoint endpoint) throws JMSException { final boolean topic = getTopicResolver().isTopic(endpoint); return getSession(endpoint.getTransactionConfig().isTransacted(), topic); } public Session getSession(boolean transacted, boolean topic) throws JMSException { if (!isConnected()) { throw new JMSException("Not connected"); } Session session = getSessionFromTransaction(); if (session != null) { return session; } Transaction tx = TransactionCoordination.getInstance().getTransaction(); if (logger.isDebugEnabled()) { logger.debug(MessageFormat.format( "Retrieving new jms session from connection: " + "topic={0}, transacted={1}, ack mode={2}, nolocal={3}", new Object[]{Boolean.valueOf(topic), Boolean.valueOf(transacted), new Integer(acknowledgementMode), Boolean.valueOf(noLocal)})); } session = jmsSupport.createSession(connection, topic, transacted, acknowledgementMode, noLocal); if (tx != null) { logger.debug("Binding session " + session + " to current transaction " + tx); try { tx.bindResource(connection, session); } catch (TransactionException e) { closeQuietly(session); throw new RuntimeException("Could not bind session to current transaction", e); } } return session; } protected void doStart() throws MuleException { if (connection != null) { try { connection.start(); } catch (JMSException e) { throw new StartException(CoreMessages.failedToStart("Jms Connection"), e, this); } } } protected void doStop() throws MuleException { // template method } public ReplyToHandler getReplyToHandler() { return new JmsReplyToHandler(this, getDefaultResponseTransformers()); } public void onNotification(ServerNotification notification) { if (notification.getAction() == ConnectionNotification.CONNECTION_DISCONNECTED || notification.getAction() == ConnectionNotification.CONNECTION_FAILED) { // Remove all dispatchers as any cached session will be invalidated disposeDispatchers(); // TODO should we dispose receivers here as well (in case they are // transactional) // gives a harmless NPE at // AbstractConnector.connect(AbstractConnector.java:927) // disposeReceivers(); } } /** * This method may be overridden in case a certain JMS implementation does not * support all the standard JMS properties. */ public boolean supportsProperty(String property) { return true; } /** * This method may be overridden in order to apply pre-processing to the message * as soon as it arrives. * * @param message - the incoming message * @param session - the JMS session * @return the preprocessed message */ public javax.jms.Message preProcessMessage(javax.jms.Message message, Session session) throws Exception { return message; } /** * Closes the MessageProducer * * @param producer * @throws JMSException */ public void close(MessageProducer producer) throws JMSException { if (producer != null) { producer.close(); } } /** * Closes the MessageProducer without throwing an exception (an error message is * logged instead). * * @param producer */ public void closeQuietly(MessageProducer producer) { try { close(producer); } catch (JMSException e) { logger.error("Failed to close jms message producer", e); } } /** * Closes the MessageConsumer * * @param consumer * @throws JMSException */ public void close(MessageConsumer consumer) throws JMSException { if (consumer != null) { consumer.close(); } } /** * Closes the MessageConsumer without throwing an exception (an error message is * logged instead). * * @param consumer */ public void closeQuietly(MessageConsumer consumer) { try { close(consumer); } catch (JMSException e) { logger.error("Failed to close jms message consumer", e); } } /** * Closes the MuleSession * * @param session * @throws JMSException */ public void close(Session session) throws JMSException { if (session != null) { session.close(); } } /** * Closes the MuleSession without throwing an exception (an error message is logged * instead). * * @param session */ public void closeQuietly(Session session) { try { close(session); } catch (JMSException e) { logger.error("Failed to close jms session consumer", e); } } /** * Closes the TemporaryQueue * * @param tempQueue * @throws JMSException */ public void close(TemporaryQueue tempQueue) throws JMSException { if (tempQueue != null) { tempQueue.delete(); } } /** * Closes the TemporaryQueue without throwing an exception (an error message is * logged instead). * * @param tempQueue */ public void closeQuietly(TemporaryQueue tempQueue) { try { close(tempQueue); } catch (JMSException e) { if (logger.isErrorEnabled()) { String queueName = ""; try { queueName = tempQueue.getQueueName(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -