⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sedaservice.java

📁 提供ESB 应用mule源代码 提供ESB 应用mule源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * $Id: SedaService.java 12882 2008-10-03 17:29:01Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.model.seda;import org.mule.DefaultMuleEvent;import org.mule.DefaultMuleMessage;import org.mule.FailedToQueueEventException;import org.mule.OptimizedRequestContext;import org.mule.RequestContext;import org.mule.api.ExceptionPayload;import org.mule.api.MessagingException;import org.mule.api.MuleEvent;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.MuleRuntimeException;import org.mule.api.config.ThreadingProfile;import org.mule.api.context.WorkManager;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.lifecycle.LifecycleException;import org.mule.api.service.ServiceException;import org.mule.api.transport.ReplyToHandler;import org.mule.config.QueueProfile;import org.mule.config.i18n.CoreMessages;import org.mule.config.i18n.MessageFactory;import org.mule.management.stats.ServiceStatistics;import org.mule.message.DefaultExceptionPayload;import org.mule.service.AbstractService;import org.mule.transport.NullPayload;import org.mule.util.queue.Queue;import org.mule.util.queue.QueueSession;import javax.resource.spi.work.Work;import javax.resource.spi.work.WorkEvent;import javax.resource.spi.work.WorkListener;/** * A Seda service runs inside a Seda Model and is responsible for managing a Seda * Queue and thread pool for a Mule sevice service. In Seda terms this is * equivilent to a stage. */public class SedaService extends AbstractService implements Work, WorkListener{    /**     * Serial version/     */    private static final long serialVersionUID = 7711976708670893015L;        private static final String QUEUE_NAME_SUFFIX = ".service";    protected WorkManager workManager;    /**     * The time out used for taking from the Seda Queue.     */    protected Integer queueTimeout;    /**     * The threading profile to use for this service. If this is not set a default     * will be provided by the server     */    protected ThreadingProfile threadingProfile;    /**     * The queue profile to use for this service. If this is not set a default     * will be provided by the server     */    protected QueueProfile queueProfile;        protected Queue queue;    /** For Spring only */    public SedaService()    {        super();    }        /**     * Initialise the service. The service will first create a Mule UMO from the     * UMODescriptor and then initialise a pool based on the attributes in the     * UMODescriptor.     *      * @throws org.mule.api.lifecycle.InitialisationException if the service fails     *             to initialise     * @see org.mule.api.UMODescriptor     */    protected synchronized void doInitialise() throws InitialisationException    {        if (threadingProfile == null)        {            // TODO MULE-2102 This should be configured in the default template.            threadingProfile = muleContext.getDefaultServiceThreadingProfile();        }        // Create thread pool        workManager = threadingProfile.createWorkManager(getName());        if (queueProfile == null)        {            // TODO MULE-2102 This should be configured in the default template.            queueProfile = ((SedaModel) model).getQueueProfile();        }                if (queueTimeout == null)        {            // TODO MULE-2102 This should be configured in the default template.            setQueueTimeout(new Integer(((SedaModel) model).getQueueTimeout()));        }                try        {            if (name == null)            {                throw new InitialisationException(MessageFactory.createStaticMessage("Service has no name to identify it"), this);            }            // Setup event Queue (used for VM execution).  The queue has the same name as the service.            queueProfile.configureQueue(getQueueName(), muleContext.getQueueManager());            queue = muleContext.getQueueManager().getQueueSession().getQueue(getQueueName());            if (queue == null)            {                throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this);            }        }        catch (InitialisationException e)        {            throw e;        }        catch (Throwable e)        {            throw new InitialisationException(                CoreMessages.objectFailedToInitialise("Service Queue"), e, this);        }    }    protected void doForceStop() throws MuleException    {        doStop();    }    protected void doStop() throws MuleException    {        if (queue != null && queue.size() > 0)        {            try            {                stopping.whenFalse(null);            }            catch (InterruptedException e)            {                // we can ignore this                // TODO MULE-863: Why?            }        }        workManager.dispose();    }    protected void doStart() throws MuleException    {        try        {            workManager.start();            workManager.scheduleWork(this, WorkManager.INDEFINITE, null, this);        }        catch (Exception e)        {            throw new LifecycleException(                CoreMessages.failedToStart("Service: " + name), e, this);        }    }    protected void doDispose()    {        queue = null;        // threadPool.awaitTerminationAfterShutdown();        if (workManager != null)        {            workManager.dispose();        }    }    protected void doDispatch(MuleEvent event) throws MuleException    {        if (logger.isDebugEnabled())        {            logger.debug("Service: " + name + " has received asynchronous event on: "                         + event.getEndpoint().getEndpointURI());        }        // Block until we can queue the next event        try        {            enqueue(event);            if (stats.isEnabled())            {                stats.incQueuedEvent();            }        }        catch (Exception e)        {            FailedToQueueEventException e1 = new FailedToQueueEventException(                CoreMessages.interruptedQueuingEventFor(this.getName()), event.getMessage(), this, e);            handleException(e1);        }        if (logger.isTraceEnabled())        {            logger.trace("MuleEvent added to queue for: " + name);        }    }    protected MuleMessage doSend(MuleEvent event) throws MuleException    {        MuleMessage result = null;        try        {            if (logger.isDebugEnabled())            {                logger.debug(this + " : got proxy for " + event.getId() + " = " + component);            }            Object replyTo = event.getMessage().getReplyTo();            ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), (InboundEndpoint) event.getEndpoint());            result = invokeComponent(event);            result = sendToOutboundRouter(event, result);            result = processAsyncReplyRouter(result);            processReplyTo(event, result, replyToHandler, replyTo);        }        catch (Exception e)        {            event.getSession().setValid(false);            if (e instanceof MessagingException)            {                handleException(e);            }            else            {                handleException(new MessagingException(CoreMessages.eventProcessingFailedFor(getName()),                    event.getMessage(), e));            }            if (result == null)            {                // important that we pull event from request context here as it may                // have been modified                // (necessary to avoid scribbling between threads)                result = new DefaultMuleMessage(NullPayload.getInstance(), RequestContext.getEvent().getMessage());            }            ExceptionPayload exceptionPayload = result.getExceptionPayload();            if (exceptionPayload == null)            {                exceptionPayload = new DefaultExceptionPayload(e);            }            result.setExceptionPayload(exceptionPayload);        }        return result;    }    public int getQueueSize()    {        if (queue == null)        {            logger.warn(new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this));            return -1;        }        return queue.size();    }        private String getQueueName()    {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -