⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vmconnector.java

📁 提供ESB 应用mule源代码 提供ESB 应用mule源代码
💻 JAVA
字号:
/* * $Id: VMConnector.java 11968 2008-06-06 04:06:18Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.vm;import org.mule.DefaultMuleMessage;import org.mule.api.MessagingException;import org.mule.api.MuleException;import org.mule.api.endpoint.EndpointException;import org.mule.api.endpoint.EndpointURI;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.service.Service;import org.mule.api.transaction.Transaction;import org.mule.api.transaction.TransactionException;import org.mule.api.transport.MessageAdapter;import org.mule.api.transport.MessageReceiver;import org.mule.config.QueueProfile;import org.mule.config.i18n.CoreMessages;import org.mule.endpoint.DynamicURIInboundEndpoint;import org.mule.endpoint.MuleEndpointURI;import org.mule.routing.filters.WildcardFilter;import org.mule.transaction.TransactionCoordination;import org.mule.transport.AbstractConnector;import org.mule.util.queue.QueueManager;import org.mule.util.queue.QueueSession;import java.util.Iterator;/** * <code>VMConnector</code> A simple endpoint wrapper to allow a Mule service to * <p/> be accessed from an endpoint *  */public class VMConnector extends AbstractConnector{    public static final String VM = "vm";    private boolean queueEvents = false;    private QueueProfile queueProfile;    private int queueTimeout = 1000;    /** The queue manager to use for vm queues only */    private QueueManager queueManager;    protected void doInitialise() throws InitialisationException    {        if (queueEvents)        {            if (queueManager == null)            {                queueManager = getMuleContext().getQueueManager();            }            if (queueProfile == null)            {                //create a default QueueProfile                queueProfile = new QueueProfile();                if(logger.isDebugEnabled())                {                    logger.debug("created default QueueProfile for VM connector: " + queueProfile);                }            }        }    }    protected void doDispose()    {        // template method    }    protected void doConnect() throws Exception    {        // template method    }    protected void doDisconnect() throws Exception    {        // template method    }    protected void doStart() throws MuleException    {        // template method    }    protected void doStop() throws MuleException    {        // template method    }    public MessageReceiver createReceiver(Service service, InboundEndpoint endpoint) throws Exception    {        if (queueEvents)        {            queueProfile.configureQueue(endpoint.getEndpointURI().getAddress(), queueManager);        }        return serviceDescriptor.createMessageReceiver(this, service, endpoint);    }    public MessageAdapter getMessageAdapter(Object message) throws MessagingException    {        if (message == null)        {            throw new NullPointerException(CoreMessages.objectIsNull("message").getMessage());        }        else if (message instanceof DefaultMuleMessage)        {            return ((DefaultMuleMessage)message).getAdapter();        }        else if (message instanceof MessageAdapter)        {            return (MessageAdapter)message;        }        else        {            throw new MessagingException(CoreMessages.objectNotOfCorrectType(message.getClass(), MessageAdapter.class), null);        }    }    public String getProtocol()    {        return "VM";    }    public boolean isQueueEvents()    {        return queueEvents;    }    public void setQueueEvents(boolean queueEvents)    {        this.queueEvents = queueEvents;    }    public QueueProfile getQueueProfile()    {        return queueProfile;    }    public void setQueueProfile(QueueProfile queueProfile)    {        this.queueProfile = queueProfile;    }    VMMessageReceiver getReceiver(EndpointURI endpointUri) throws EndpointException    {        return (VMMessageReceiver)getReceiverByEndpoint(endpointUri);    }    QueueSession getQueueSession() throws InitialisationException    {        Transaction tx = TransactionCoordination.getInstance().getTransaction();        if (tx != null)        {            if (tx.hasResource(queueManager))            {                final QueueSession queueSession = (QueueSession) tx.getResource(queueManager);                if (logger.isDebugEnabled())                {                    logger.debug("Retrieved VM queue session " + queueSession + " from current transaction " + tx);                }                return queueSession;            }        }        if (logger.isDebugEnabled())        {            logger.debug("Retrieving new VM queue session from queue manager");        }        QueueSession session = queueManager.getQueueSession();        if (tx != null)        {            if (logger.isDebugEnabled())            {                logger.debug("Binding VM queue session " + session + " to current transaction " + tx);            }            try            {                tx.bindResource(queueManager, session);            }            catch (TransactionException e)            {                throw new RuntimeException("Could not bind queue session to current transaction", e);            }        }        return session;    }    protected MessageReceiver getReceiverByEndpoint(EndpointURI endpointUri) throws EndpointException    {        if (logger.isDebugEnabled())        {            logger.debug("Looking up vm receiver for address: " + endpointUri.toString());        }        MessageReceiver receiver;        // If we have an exact match, use it        receiver = (MessageReceiver)receivers.get(endpointUri.getAddress());        if (receiver != null)        {            if (logger.isDebugEnabled())            {                logger.debug("Found exact receiver match on endpointUri: " + endpointUri);            }            return receiver;        }        // otherwise check each one against a wildcard match        for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();)        {            receiver = (MessageReceiver)iterator.next();            String filterAddress = receiver.getEndpointURI().getAddress();            WildcardFilter filter = new WildcardFilter(filterAddress);            if (filter.accept(endpointUri.getAddress()))            {                InboundEndpoint endpoint = receiver.getEndpoint();                EndpointURI newEndpointURI = new MuleEndpointURI(endpointUri, filterAddress);                receiver.setEndpoint(new DynamicURIInboundEndpoint(endpoint, newEndpointURI));                if (logger.isDebugEnabled())                {                    logger.debug("Found receiver match on endpointUri: " + receiver.getEndpointURI()                                 + " against " + endpointUri);                }                return receiver;            }        }        if (logger.isDebugEnabled())        {            logger.debug("No receiver found for endpointUri: " + endpointUri);        }        return null;    }    // //@Override    public boolean isRemoteSyncEnabled()    {        return true;    }                                                          public int getQueueTimeout()    {        return queueTimeout;    }    public void setQueueTimeout(int queueTimeout)    {        this.queueTimeout = queueTimeout;    }    public QueueManager getQueueManager()    {        return queueManager;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -