⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsconnector.java

📁 提供ESB 应用mule源代码 提供ESB 应用mule源代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * $Id: JmsConnector.java 13044 2008-10-10 18:50:09Z tcarlson $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.jms;import org.mule.api.MessagingException;import org.mule.api.MuleException;import org.mule.api.MuleRuntimeException;import org.mule.api.context.notification.ConnectionNotificationListener;import org.mule.api.context.notification.ServerNotification;import org.mule.api.endpoint.ImmutableEndpoint;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.lifecycle.StartException;import org.mule.api.service.Service;import org.mule.api.transaction.Transaction;import org.mule.api.transaction.TransactionException;import org.mule.api.transport.MessageAdapter;import org.mule.api.transport.ReplyToHandler;import org.mule.config.ExceptionHelper;import org.mule.config.i18n.CoreMessages;import org.mule.context.notification.ConnectionNotification;import org.mule.context.notification.NotificationException;import org.mule.transaction.TransactionCoordination;import org.mule.transport.AbstractConnector;import org.mule.transport.ConnectException;import org.mule.transport.jms.i18n.JmsMessages;import org.mule.transport.jms.xa.ConnectionFactoryWrapper;import java.text.MessageFormat;import java.util.Hashtable;import java.util.Map;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TemporaryQueue;import javax.jms.TemporaryTopic;import javax.jms.XAConnectionFactory;import javax.naming.CommunicationException;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;/** * <code>JmsConnector</code> is a JMS 1.0.2b compliant connector that can be used * by a Mule endpoint. The connector supports all JMS functionality including topics * and queues, durable subscribers, acknowledgement modes and local transactions. */public class JmsConnector extends AbstractConnector implements ConnectionNotificationListener, javax.jms.ExceptionListener{    public static final String JMS = "jms";    private AtomicInteger receiverReportedExceptionCount = new AtomicInteger();        ////////////////////////////////////////////////////////////////////////    // Properties    ////////////////////////////////////////////////////////////////////////    private int acknowledgementMode = Session.AUTO_ACKNOWLEDGE;    private String clientId;    private boolean durable;    private boolean noLocal;    private boolean persistentDelivery;    private boolean honorQosHeaders;    private int maxRedelivery = 0;    private boolean cacheJmsSessions = false;    /** Whether to create a consumer on connect. */    private boolean eagerConsumer = true;    /**     * @deprecated This attribute is no longer relevant for the new retry policies.       * @see MULE-3812     */    private boolean recoverJmsConnections;        ////////////////////////////////////////////////////////////////////////    // JMS Connection    ////////////////////////////////////////////////////////////////////////    /**     * JMS Connection, not settable by the user.     */    private Connection connection;    private ConnectionFactory connectionFactory;    public String username = null;    public String password = null;    ////////////////////////////////////////////////////////////////////////    // JNDI Connection    ////////////////////////////////////////////////////////////////////////        private Context jndiContext = null;    /**     * This object guards all access to the jndiContext     */    private Object jndiLock = new Object();    private String jndiProviderUrl;    private String jndiInitialFactory;    private Map jndiProviderProperties;    private String connectionFactoryJndiName;    private boolean jndiDestinations = false;    private boolean forceJndiDestinations = false;    ////////////////////////////////////////////////////////////////////////    // Strategy classes    ////////////////////////////////////////////////////////////////////////    private String specification = JmsConstants.JMS_SPECIFICATION_102B;    private JmsSupport jmsSupport;    private JmsTopicResolver topicResolver;    private RedeliveryHandlerFactory redeliveryHandlerFactory;    ////////////////////////////////////////////////////////////////////////    // Methods    ////////////////////////////////////////////////////////////////////////    /* Register the Jms Exception reader if this class gets loaded */    static    {        ExceptionHelper.registerExceptionReader(new JmsExceptionReader());    }    public String getProtocol()    {        return JMS;    }    protected void doInitialise() throws InitialisationException    {        try        {            connectionFactory = this.createConnectionFactory();        }        catch (NamingException ne)        {            throw new InitialisationException(JmsMessages.errorCreatingConnectionFactory(), this);        }                if (topicResolver == null)        {            topicResolver = new DefaultJmsTopicResolver(this);        }        if (redeliveryHandlerFactory == null)        {            redeliveryHandlerFactory = new DefaultRedeliveryHandlerFactory();        }        try        {            muleContext.registerListener(this, getName());        }        catch (NotificationException nex)        {            throw new InitialisationException(nex, this);        }    }    protected ConnectionFactory createConnectionFactory() throws InitialisationException, NamingException    {        // if an initial factory class was configured that takes precedence over the         // spring-configured connection factory or the one that our subclasses may provide        if (jndiInitialFactory != null)        {            this.initJndiContext();            Object temp = jndiContext.lookup(connectionFactoryJndiName);            if (temp instanceof ConnectionFactory)            {                return (ConnectionFactory)temp;            }            else            {                throw new InitialisationException(                    JmsMessages.invalidResourceType(ConnectionFactory.class, temp), this);            }        }        else        {            // don't look up objects from JNDI in any case            jndiDestinations = false;            forceJndiDestinations = false;            // don't use JNDI. Use the spring-configured connection factory if that's provided            if (connectionFactory != null)            {                return connectionFactory;            }                        // no spring-configured connection factory. See if there is a default one (e.g. from            // subclass)            ConnectionFactory factory = this.getDefaultConnectionFactory();            if (factory != null)            {                return factory;            }                        // no connection factory ... give up            throw new InitialisationException(JmsMessages.noConnectionFactoryConfigured(), this);        }    }        /**      * Override this method to provide a default ConnectionFactory for a vendor-specific JMS Connector.      */    protected ConnectionFactory getDefaultConnectionFactory()    {        return null;    }    protected void doDispose()    {        if (connection != null)        {            try            {                connection.close();            }            catch (JMSException e)            {                logger.error("Jms connector failed to dispose properly: ", e);            }            connection = null;        }                if (jndiContext != null)        {            try            {                jndiContext.close();            }            catch (NamingException ne)            {                logger.error("Jms connector failed to dispose properly: ", ne);            }            finally            {                jndiContext = null;            }        }    }    protected void initJndiContext() throws NamingException, InitialisationException    {        synchronized (jndiLock)        {            Hashtable<String, Object> props = new Hashtable<String, Object>();            if (jndiInitialFactory != null)            {                props.put(Context.INITIAL_CONTEXT_FACTORY, jndiInitialFactory);            }            else if (jndiProviderProperties == null                     || !jndiProviderProperties.containsKey(Context.INITIAL_CONTEXT_FACTORY))            {                throw new InitialisationException(CoreMessages.objectIsNull("jndiInitialFactory"), this);            }            if (jndiProviderUrl != null)            {                props.put(Context.PROVIDER_URL, jndiProviderUrl);            }            if (jndiProviderProperties != null)            {                props.putAll(jndiProviderProperties);            }                        jndiContext = new InitialContext(props);        }    }    protected Object lookupFromJndi(String jndiName) throws NamingException    {        synchronized (jndiLock)        {            try            {                return jndiContext.lookup(jndiName);            }            catch (CommunicationException ce)            {                logger.warn("JNDI communication error", ce);                                // Our connection to JNDI failed. Make a single attempt to reconnect to JNDI.                try                {                    /*                     Uncomment for manual testing ... this gives you time to restart the JNDI                     server                    try                    {                        logger.info("sleep for 20 secs before JNDI retry");                        Thread.sleep(20000);                        logger.info("done sleeping");                    }                    catch (InterruptedException e)                    {                        throw new RuntimeException(e);                    }                    */                    // re-connect to JNDI                    this.initJndiContext();                                        // now retry the lookup.                    return jndiContext.lookup(jndiName);                }                catch (InitialisationException ie)                {                    // this may actually never happen as we were connected to JNDI before                    throw new MuleRuntimeException(JmsMessages.errorInitializingJndi(), ie);                }            }        }    }    protected Connection createConnection() throws NamingException, JMSException, InitialisationException    {        ConnectionFactory cf = this.connectionFactory;        Connection connection;        try        {            if (cf instanceof XAConnectionFactory && muleContext.getTransactionManager() != null)            {                cf = new ConnectionFactoryWrapper(cf);            }        }        catch (Exception e)        {            throw new InitialisationException(e, this);        }        if (username != null)        {            connection = jmsSupport.createConnection(cf, username, password);        }        else        {            connection = jmsSupport.createConnection(cf);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -