📄 jmsconnector.java
字号:
/* * $Id: JmsConnector.java 13044 2008-10-10 18:50:09Z tcarlson $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.jms;import org.mule.api.MessagingException;import org.mule.api.MuleException;import org.mule.api.MuleRuntimeException;import org.mule.api.context.notification.ConnectionNotificationListener;import org.mule.api.context.notification.ServerNotification;import org.mule.api.endpoint.ImmutableEndpoint;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.lifecycle.StartException;import org.mule.api.service.Service;import org.mule.api.transaction.Transaction;import org.mule.api.transaction.TransactionException;import org.mule.api.transport.MessageAdapter;import org.mule.api.transport.ReplyToHandler;import org.mule.config.ExceptionHelper;import org.mule.config.i18n.CoreMessages;import org.mule.context.notification.ConnectionNotification;import org.mule.context.notification.NotificationException;import org.mule.transaction.TransactionCoordination;import org.mule.transport.AbstractConnector;import org.mule.transport.ConnectException;import org.mule.transport.jms.i18n.JmsMessages;import org.mule.transport.jms.xa.ConnectionFactoryWrapper;import java.text.MessageFormat;import java.util.Hashtable;import java.util.Map;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TemporaryQueue;import javax.jms.TemporaryTopic;import javax.jms.XAConnectionFactory;import javax.naming.CommunicationException;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;/** * <code>JmsConnector</code> is a JMS 1.0.2b compliant connector that can be used * by a Mule endpoint. The connector supports all JMS functionality including topics * and queues, durable subscribers, acknowledgement modes and local transactions. */public class JmsConnector extends AbstractConnector implements ConnectionNotificationListener, javax.jms.ExceptionListener{ public static final String JMS = "jms"; private AtomicInteger receiverReportedExceptionCount = new AtomicInteger(); //////////////////////////////////////////////////////////////////////// // Properties //////////////////////////////////////////////////////////////////////// private int acknowledgementMode = Session.AUTO_ACKNOWLEDGE; private String clientId; private boolean durable; private boolean noLocal; private boolean persistentDelivery; private boolean honorQosHeaders; private int maxRedelivery = 0; private boolean cacheJmsSessions = false; /** Whether to create a consumer on connect. */ private boolean eagerConsumer = true; /** * @deprecated This attribute is no longer relevant for the new retry policies. * @see MULE-3812 */ private boolean recoverJmsConnections; //////////////////////////////////////////////////////////////////////// // JMS Connection //////////////////////////////////////////////////////////////////////// /** * JMS Connection, not settable by the user. */ private Connection connection; private ConnectionFactory connectionFactory; public String username = null; public String password = null; //////////////////////////////////////////////////////////////////////// // JNDI Connection //////////////////////////////////////////////////////////////////////// private Context jndiContext = null; /** * This object guards all access to the jndiContext */ private Object jndiLock = new Object(); private String jndiProviderUrl; private String jndiInitialFactory; private Map jndiProviderProperties; private String connectionFactoryJndiName; private boolean jndiDestinations = false; private boolean forceJndiDestinations = false; //////////////////////////////////////////////////////////////////////// // Strategy classes //////////////////////////////////////////////////////////////////////// private String specification = JmsConstants.JMS_SPECIFICATION_102B; private JmsSupport jmsSupport; private JmsTopicResolver topicResolver; private RedeliveryHandlerFactory redeliveryHandlerFactory; //////////////////////////////////////////////////////////////////////// // Methods //////////////////////////////////////////////////////////////////////// /* Register the Jms Exception reader if this class gets loaded */ static { ExceptionHelper.registerExceptionReader(new JmsExceptionReader()); } public String getProtocol() { return JMS; } protected void doInitialise() throws InitialisationException { try { connectionFactory = this.createConnectionFactory(); } catch (NamingException ne) { throw new InitialisationException(JmsMessages.errorCreatingConnectionFactory(), this); } if (topicResolver == null) { topicResolver = new DefaultJmsTopicResolver(this); } if (redeliveryHandlerFactory == null) { redeliveryHandlerFactory = new DefaultRedeliveryHandlerFactory(); } try { muleContext.registerListener(this, getName()); } catch (NotificationException nex) { throw new InitialisationException(nex, this); } } protected ConnectionFactory createConnectionFactory() throws InitialisationException, NamingException { // if an initial factory class was configured that takes precedence over the // spring-configured connection factory or the one that our subclasses may provide if (jndiInitialFactory != null) { this.initJndiContext(); Object temp = jndiContext.lookup(connectionFactoryJndiName); if (temp instanceof ConnectionFactory) { return (ConnectionFactory)temp; } else { throw new InitialisationException( JmsMessages.invalidResourceType(ConnectionFactory.class, temp), this); } } else { // don't look up objects from JNDI in any case jndiDestinations = false; forceJndiDestinations = false; // don't use JNDI. Use the spring-configured connection factory if that's provided if (connectionFactory != null) { return connectionFactory; } // no spring-configured connection factory. See if there is a default one (e.g. from // subclass) ConnectionFactory factory = this.getDefaultConnectionFactory(); if (factory != null) { return factory; } // no connection factory ... give up throw new InitialisationException(JmsMessages.noConnectionFactoryConfigured(), this); } } /** * Override this method to provide a default ConnectionFactory for a vendor-specific JMS Connector. */ protected ConnectionFactory getDefaultConnectionFactory() { return null; } protected void doDispose() { if (connection != null) { try { connection.close(); } catch (JMSException e) { logger.error("Jms connector failed to dispose properly: ", e); } connection = null; } if (jndiContext != null) { try { jndiContext.close(); } catch (NamingException ne) { logger.error("Jms connector failed to dispose properly: ", ne); } finally { jndiContext = null; } } } protected void initJndiContext() throws NamingException, InitialisationException { synchronized (jndiLock) { Hashtable<String, Object> props = new Hashtable<String, Object>(); if (jndiInitialFactory != null) { props.put(Context.INITIAL_CONTEXT_FACTORY, jndiInitialFactory); } else if (jndiProviderProperties == null || !jndiProviderProperties.containsKey(Context.INITIAL_CONTEXT_FACTORY)) { throw new InitialisationException(CoreMessages.objectIsNull("jndiInitialFactory"), this); } if (jndiProviderUrl != null) { props.put(Context.PROVIDER_URL, jndiProviderUrl); } if (jndiProviderProperties != null) { props.putAll(jndiProviderProperties); } jndiContext = new InitialContext(props); } } protected Object lookupFromJndi(String jndiName) throws NamingException { synchronized (jndiLock) { try { return jndiContext.lookup(jndiName); } catch (CommunicationException ce) { logger.warn("JNDI communication error", ce); // Our connection to JNDI failed. Make a single attempt to reconnect to JNDI. try { /* Uncomment for manual testing ... this gives you time to restart the JNDI server try { logger.info("sleep for 20 secs before JNDI retry"); Thread.sleep(20000); logger.info("done sleeping"); } catch (InterruptedException e) { throw new RuntimeException(e); } */ // re-connect to JNDI this.initJndiContext(); // now retry the lookup. return jndiContext.lookup(jndiName); } catch (InitialisationException ie) { // this may actually never happen as we were connected to JNDI before throw new MuleRuntimeException(JmsMessages.errorInitializingJndi(), ie); } } } } protected Connection createConnection() throws NamingException, JMSException, InitialisationException { ConnectionFactory cf = this.connectionFactory; Connection connection; try { if (cf instanceof XAConnectionFactory && muleContext.getTransactionManager() != null) { cf = new ConnectionFactoryWrapper(cf); } } catch (Exception e) { throw new InitialisationException(e, this); } if (username != null) { connection = jmsSupport.createConnection(cf, username, password); } else { connection = jmsSupport.createConnection(cf); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -