📄 sedaservice.java
字号:
/* * $Id: SedaService.java 12882 2008-10-03 17:29:01Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.model.seda;import org.mule.DefaultMuleEvent;import org.mule.DefaultMuleMessage;import org.mule.FailedToQueueEventException;import org.mule.OptimizedRequestContext;import org.mule.RequestContext;import org.mule.api.ExceptionPayload;import org.mule.api.MessagingException;import org.mule.api.MuleEvent;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.MuleRuntimeException;import org.mule.api.config.ThreadingProfile;import org.mule.api.context.WorkManager;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.lifecycle.LifecycleException;import org.mule.api.service.ServiceException;import org.mule.api.transport.ReplyToHandler;import org.mule.config.QueueProfile;import org.mule.config.i18n.CoreMessages;import org.mule.config.i18n.MessageFactory;import org.mule.management.stats.ServiceStatistics;import org.mule.message.DefaultExceptionPayload;import org.mule.service.AbstractService;import org.mule.transport.NullPayload;import org.mule.util.queue.Queue;import org.mule.util.queue.QueueSession;import javax.resource.spi.work.Work;import javax.resource.spi.work.WorkEvent;import javax.resource.spi.work.WorkListener;/** * A Seda service runs inside a Seda Model and is responsible for managing a Seda * Queue and thread pool for a Mule sevice service. In Seda terms this is * equivilent to a stage. */public class SedaService extends AbstractService implements Work, WorkListener{ /** * Serial version/ */ private static final long serialVersionUID = 7711976708670893015L; private static final String QUEUE_NAME_SUFFIX = ".service"; protected WorkManager workManager; /** * The time out used for taking from the Seda Queue. */ protected Integer queueTimeout; /** * The threading profile to use for this service. If this is not set a default * will be provided by the server */ protected ThreadingProfile threadingProfile; /** * The queue profile to use for this service. If this is not set a default * will be provided by the server */ protected QueueProfile queueProfile; protected Queue queue; /** For Spring only */ public SedaService() { super(); } /** * Initialise the service. The service will first create a Mule UMO from the * UMODescriptor and then initialise a pool based on the attributes in the * UMODescriptor. * * @throws org.mule.api.lifecycle.InitialisationException if the service fails * to initialise * @see org.mule.api.UMODescriptor */ protected synchronized void doInitialise() throws InitialisationException { if (threadingProfile == null) { // TODO MULE-2102 This should be configured in the default template. threadingProfile = muleContext.getDefaultServiceThreadingProfile(); } // Create thread pool workManager = threadingProfile.createWorkManager(getName()); if (queueProfile == null) { // TODO MULE-2102 This should be configured in the default template. queueProfile = ((SedaModel) model).getQueueProfile(); } if (queueTimeout == null) { // TODO MULE-2102 This should be configured in the default template. setQueueTimeout(new Integer(((SedaModel) model).getQueueTimeout())); } try { if (name == null) { throw new InitialisationException(MessageFactory.createStaticMessage("Service has no name to identify it"), this); } // Setup event Queue (used for VM execution). The queue has the same name as the service. queueProfile.configureQueue(getQueueName(), muleContext.getQueueManager()); queue = muleContext.getQueueManager().getQueueSession().getQueue(getQueueName()); if (queue == null) { throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this); } } catch (InitialisationException e) { throw e; } catch (Throwable e) { throw new InitialisationException( CoreMessages.objectFailedToInitialise("Service Queue"), e, this); } } protected void doForceStop() throws MuleException { doStop(); } protected void doStop() throws MuleException { if (queue != null && queue.size() > 0) { try { stopping.whenFalse(null); } catch (InterruptedException e) { // we can ignore this // TODO MULE-863: Why? } } workManager.dispose(); } protected void doStart() throws MuleException { try { workManager.start(); workManager.scheduleWork(this, WorkManager.INDEFINITE, null, this); } catch (Exception e) { throw new LifecycleException( CoreMessages.failedToStart("Service: " + name), e, this); } } protected void doDispose() { queue = null; // threadPool.awaitTerminationAfterShutdown(); if (workManager != null) { workManager.dispose(); } } protected void doDispatch(MuleEvent event) throws MuleException { if (logger.isDebugEnabled()) { logger.debug("Service: " + name + " has received asynchronous event on: " + event.getEndpoint().getEndpointURI()); } // Block until we can queue the next event try { enqueue(event); if (stats.isEnabled()) { stats.incQueuedEvent(); } } catch (Exception e) { FailedToQueueEventException e1 = new FailedToQueueEventException( CoreMessages.interruptedQueuingEventFor(this.getName()), event.getMessage(), this, e); handleException(e1); } if (logger.isTraceEnabled()) { logger.trace("MuleEvent added to queue for: " + name); } } protected MuleMessage doSend(MuleEvent event) throws MuleException { MuleMessage result = null; try { if (logger.isDebugEnabled()) { logger.debug(this + " : got proxy for " + event.getId() + " = " + component); } Object replyTo = event.getMessage().getReplyTo(); ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), (InboundEndpoint) event.getEndpoint()); result = invokeComponent(event); result = sendToOutboundRouter(event, result); result = processAsyncReplyRouter(result); processReplyTo(event, result, replyToHandler, replyTo); } catch (Exception e) { event.getSession().setValid(false); if (e instanceof MessagingException) { handleException(e); } else { handleException(new MessagingException(CoreMessages.eventProcessingFailedFor(getName()), event.getMessage(), e)); } if (result == null) { // important that we pull event from request context here as it may // have been modified // (necessary to avoid scribbling between threads) result = new DefaultMuleMessage(NullPayload.getInstance(), RequestContext.getEvent().getMessage()); } ExceptionPayload exceptionPayload = result.getExceptionPayload(); if (exceptionPayload == null) { exceptionPayload = new DefaultExceptionPayload(e); } result.setExceptionPayload(exceptionPayload); } return result; } public int getQueueSize() { if (queue == null) { logger.warn(new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this)); return -1; } return queue.size(); } private String getQueueName() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -