muleeventmulticaster.java
来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 818 行 · 第 1/2 页
JAVA
818 行
/* * $Id: MuleEventMulticaster.java 12606 2008-09-03 17:09:08Z tcarlson $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.module.spring.events;import org.mule.DefaultMuleEvent;import org.mule.DefaultMuleMessage;import org.mule.DefaultMuleSession;import org.mule.RequestContext;import org.mule.api.MuleContext;import org.mule.api.MuleEventContext;import org.mule.api.MuleException;import org.mule.api.MuleRuntimeException;import org.mule.api.MuleSession;import org.mule.api.config.MuleProperties;import org.mule.api.config.ThreadingProfile;import org.mule.api.context.MuleContextAware;import org.mule.api.endpoint.EndpointBuilder;import org.mule.api.endpoint.EndpointFactory;import org.mule.api.endpoint.EndpointURI;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.endpoint.MalformedEndpointException;import org.mule.api.endpoint.OutboundEndpoint;import org.mule.api.lifecycle.Callable;import org.mule.api.lifecycle.Initialisable;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.model.Model;import org.mule.api.routing.InboundRouterCollection;import org.mule.api.routing.filter.ObjectFilter;import org.mule.api.service.Service;import org.mule.api.transformer.Transformer;import org.mule.api.transformer.TransformerException;import org.mule.api.transport.Connector;import org.mule.component.DefaultJavaComponent;import org.mule.config.QueueProfile;import org.mule.endpoint.MuleEndpointURI;import org.mule.model.seda.SedaModel;import org.mule.model.seda.SedaService;import org.mule.module.spring.i18n.SpringMessages;import org.mule.object.SingletonObjectFactory;import org.mule.routing.filters.WildcardFilter;import org.mule.transport.AbstractConnector;import org.mule.util.ClassUtils;import java.beans.ExceptionListener;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.ApplicationEvent;import org.springframework.context.ApplicationListener;import org.springframework.context.event.ApplicationEventMulticaster;import org.springframework.context.event.ContextClosedEvent;import org.springframework.context.event.ContextRefreshedEvent;/** * <code>MuleEventMulticaster</code> is an implementation of a Spring * ApplicationeventMulticaster. This implementation allows Mule event to be sent and * received through the Spring ApplicationContext. This allows any Spring bean to * receive and send events from any transport that Mule supports such as Jms, Http, * Tcp, Pop3, Smtp, File, etc. All a bean needs to do to receive and send events is * to implement MuleEventListener. Beans can also have subscriptions to certain * events by implementing MuleSubscriptionEventListener, where the bean can provide a * list of endpoints on which to receive events i.e. <code> * <bean id="myListener" class="com.foo.MyListener"> * <property name="subscriptions"> * <list> * <value>jms://customer.support</value> * <value>pop3://support:123456@mail.mycompany.com</value> * </list> * </property> * </bean> * </code> * <p/> Endpoints are specified as a Mule Url which is used to register a listener * for the subscription In the previous version of the MuleEventMulticaster it was * possible to specify wildcard endpoints. This is still possible but you need to * tell the multicaster which specific endpoints to listen on and then your * subscription listeners can use wildcards. To register the specific endpoints on * the MuleEvent Multicaster you use the <i>subscriptions</i> property. <p/> <code> * <bean id="applicationEventMulticaster" class="org.mule.module.spring.events.MuleEventMulticaster"> * <property name="subscriptions"> * <list> * <value>jms://orders.queue</value> * <value>jms://another.orders.queue</value> * </list> * </property> * </bean> * <p/> * <bean id="myListener" class="com.foo.MyListener"> * <property name="subscriptions"> * <list> * <value>jms://*.orders.*.</value> * </list> * </property> * </bean> * <p/> * </code> * * @see MuleEventListener * @see MuleSubscriptionEventListener * @see ApplicationEventMulticaster */public class MuleEventMulticaster implements ApplicationEventMulticaster, ApplicationContextAware, MuleContextAware, Callable, Initialisable{ public static final String EVENT_MULTICASTER_DESCRIPTOR_NAME = "muleEventMulticasterDescriptor"; /** * logger used by this class */ protected static final Log logger = LogFactory.getLog(MuleEventMulticaster.class); /** * The set of listeners for this Multicaster */ protected final Set listeners = new CopyOnWriteArraySet(); /** * Determines whether events will be processed asynchronously */ protected boolean asynchronous = false; /** * An ExecutorService for handling asynchronous events */ protected ExecutorService asyncPool = null; /** * A list of endpoints the eventMulticaster will receive events on Note that if * this eventMulticaster has a Mule Descriptor associated with it, these * endpoints are ignored and the ones on the Mule Descriptor are used. These are * here for convenience, the event multicaster will use these to create a default * MuleDescriptor for itself at runtime */ protected String[] subscriptions = null; /** * The Spring acpplication context */ protected ApplicationContext applicationContext; /** * The mule instance compoennt for the Multicaster */ protected Service service; /** * The filter used to match subscriptions */ protected Class subscriptionFilter = WildcardFilter.class; /** * Used to store parsed endpoints */ protected ExceptionListener exceptionListener = new LoggingExceptionListener(); protected MuleContext muleContext; public void setMuleContext(MuleContext context) { this.muleContext = context; } public void initialise() throws InitialisationException { if (asynchronous) { if (asyncPool == null) { asyncPool = muleContext.getDefaultThreadingProfile().createPool("spring-events"); } } else { if (asyncPool != null) { asyncPool.shutdown(); asyncPool = null; } } } /** * Adds a listener to the the Multicaster. If asynchronous is set to true, an * <code>AsynchronousMessageListener</code> is used to wrap the listener. This * listener will be initialised with a threadpool. The configuration for the * threadpool can be set on this multicaster or inherited from the MuleManager * configuration, which is good for most cases. * * @param listener the ApplicationListener to register with this Multicaster * @see AsynchronousEventListener * @see ThreadingProfile */ public void addApplicationListener(ApplicationListener listener) { Object listenerToAdd = listener; if (asynchronous) { listenerToAdd = new AsynchronousEventListener(asyncPool, listener); } listeners.add(listenerToAdd); } /** * Removes a listener from the multicaster * * @param listener the listener to remove */ public void removeApplicationListener(ApplicationListener listener) { for (Iterator iterator = listeners.iterator(); iterator.hasNext();) { ApplicationListener applicationListener = (ApplicationListener) iterator.next(); if (applicationListener instanceof AsynchronousEventListener) { if (((AsynchronousEventListener) applicationListener).getListener().equals(listener)) { listeners.remove(applicationListener); return; } } else { if (applicationListener.equals(listener)) { listeners.remove(applicationListener); return; } } } listeners.remove(listener); } /** * Removes all the listeners from the multicaster */ public void removeAllListeners() { listeners.clear(); } /** * Method is used to dispatch events to listeners registered with the * EventManager or dispatches events to Mule depending on the type and state of * the event received. If the event is not a Mule event it will be dispatched to * any listeners registered that are NOT MuleEventListeners. If the event is a * Mule event and there is no source event attached to it, it is assumed that the * event was dispatched by an object in the context using context.publishEvent() * and will be dispatched by Mule. If the event does have a source event attached * to it, it is assumed that the event was dispatched by Mule and will be * delivered to any listeners subscribed to the event. * * @param e the application event received by the context */ public void multicastEvent(ApplicationEvent e) { MuleApplicationEvent muleEvent = null; // if the context gets refreshed we need to reinitialise if (e instanceof ContextRefreshedEvent) { try { registerMulticasterComponent(); } catch (MuleException ex) { throw new MuleRuntimeException(SpringMessages.failedToReinitMule(), ex); } } else if (e instanceof ContextClosedEvent) { if (!muleContext.isDisposing() && !muleContext.isDisposed()) { muleContext.dispose(); } return; } else if (e instanceof MuleApplicationEvent) { muleEvent = (MuleApplicationEvent) e; // If there is no Mule event the event didn't originate from Mule // so its an outbound event if (muleEvent.getMuleEventContext() == null) { try { dispatchEvent(muleEvent); } catch (ApplicationEventException e1) { exceptionListener.exceptionThrown(e1); } return; } } for (Iterator iterator = listeners.iterator(); iterator.hasNext();) { ApplicationListener listener = (ApplicationListener) iterator.next(); if (muleEvent != null) { // As the asynchronous listener wraps the real listener we need // to check the type of the wrapped listener, but invoke the Async // listener if (listener instanceof AsynchronousEventListener) { AsynchronousEventListener asyncListener = (AsynchronousEventListener) listener; if (asyncListener.getListener() instanceof MuleSubscriptionEventListener) { if (isSubscriptionMatch(muleEvent.getEndpoint(), ((MuleSubscriptionEventListener) asyncListener.getListener()).getSubscriptions())) { asyncListener.onApplicationEvent(muleEvent); } } else if (asyncListener.getListener() instanceof MuleEventListener) { asyncListener.onApplicationEvent(muleEvent); } else if (!(asyncListener.getListener() instanceof MuleEventListener)) { asyncListener.onApplicationEvent(e); } // Synchronous MuleEvent listener Checks } else if (listener instanceof MuleSubscriptionEventListener) { if (isSubscriptionMatch(muleEvent.getEndpoint(), ((MuleSubscriptionEventListener) listener).getSubscriptions())) { listener.onApplicationEvent(muleEvent); } } else if (listener instanceof MuleEventListener) { listener.onApplicationEvent(muleEvent); } } else if (listener instanceof AsynchronousEventListener && !(((AsynchronousEventListener) listener).getListener() instanceof MuleEventListener)) { listener.onApplicationEvent(e); } else if (!(listener instanceof MuleEventListener)) { listener.onApplicationEvent(e); } else { // Finally only propagate the Application event if the // ApplicationEvent interface is explicitly implemented for (int i = 0; i < listener.getClass().getInterfaces().length; i++) { if (listener.getClass().getInterfaces()[i].equals(ApplicationListener.class)) { listener.onApplicationEvent(e); break; } } } } } /** * Matches a subscription to the current event endpointUri * * @param endpoint endpoint * @param subscriptions subscriptions * @return true if there's a match */ private boolean isSubscriptionMatch(String endpoint, String[] subscriptions) { for (int i = 0; i < subscriptions.length; i++) { String subscription = subscriptions[i]; // Subscriptions can be full Mule Urls or resource specific such as // my.queue // if it is a MuleEndpointURI we need to extract the Resource // specific part // if (MuleEndpointURI.isMuleUri(subscription)) { // EndpointURI ep = (EndpointURI) endpointsCache.get(subscription); // if (ep == null) { // try { // ep = new MuleEndpointURI(subscription); // } catch (MalformedEndpointException e) { // throw new IllegalArgumentException(e.getMessage()); // } // endpointsCache.put(subscription, ep);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?