⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sedaservice.java

📁 提供ESB 应用mule源代码 提供ESB 应用mule源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        return name + QUEUE_NAME_SUFFIX;    }    /**     * While the service isn't stopped this runs a continuous loop checking for new     * events in the queue.     */    public void run()    {        DefaultMuleEvent event = null;        QueueSession queueSession = muleContext.getQueueManager().getQueueSession();        while (!stopped.get())        {            try            {                // Wait if the service is paused                if (paused.get())                {                    paused.whenFalse(null);                                        // If service is resumed as part of stopping                     if (stopping.get())                    {                        if (!queueProfile.isPersistent() && (queueSession != null && getQueueSize() > 0))                        {                            // Any messages in a non-persistent queue went paused service is stopped are lost                            logger.warn(CoreMessages.stopPausedSedaServiceNonPeristentQueueMessageLoss(getQueueSize(), this));                        }                        stopping.set(false);                        break;                    }                }                // If we're doing a draining stop, read all events from the queue                // before stopping                if (stopping.get())                {                    if (queueProfile.isPersistent() || (queueSession == null || getQueueSize() <= 0))                    {                        stopping.set(false);                        break;                    }                }                event = (DefaultMuleEvent) dequeue();                if (event != null)                {                    if (stats.isEnabled())                    {                        stats.decQueuedEvent();                    }                    if (logger.isDebugEnabled())                    {                        logger.debug("Service: " + name + " dequeued event on: "                                        + event.getEndpoint().getEndpointURI());                    }                    workManager.scheduleWork(new ComponentStageWorker(event), WorkManager.INDEFINITE, null, this);                }            }            catch (Exception e)            {                if (e instanceof InterruptedException)                {                    stopping.set(false);                    break;                }                if (e instanceof MuleException)                {                    handleException(e);                }                else                {                    handleException(                        new ServiceException(                            CoreMessages.eventProcessingFailedFor(name),                            (event == null ? null : event.getMessage()), this, e));                }            }        }    }    public void release()    {        stopping.set(false);    }    protected void enqueue(MuleEvent event) throws Exception    {        if (queue == null)        {            throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this);        }        if (logger.isDebugEnabled())        {            logger.debug("Service " + name + " putting event on queue " + queue.getName() + ": " + event);        }        queue.put(event);    }    protected MuleEvent dequeue() throws Exception    {        if (queue == null)        {            throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this);        }        if (logger.isDebugEnabled())        {            logger.debug("Service " + name + " polling queue " + queue.getName() + ", timeout = " + queueTimeout);        }        if (getQueueTimeout() == null)        {            throw new InitialisationException(CoreMessages.noServiceQueueTimeoutSet(this), this);        }        else        {            return (MuleEvent) queue.poll(getQueueTimeout().intValue());        }    }    public void workAccepted(WorkEvent event)    {        handleWorkException(event, "workAccepted");    }    public void workRejected(WorkEvent event)    {        handleWorkException(event, "workRejected");    }    public void workStarted(WorkEvent event)    {        handleWorkException(event, "workStarted");    }    public void workCompleted(WorkEvent event)    {        handleWorkException(event, "workCompleted");    }    protected void handleWorkException(WorkEvent event, String type)    {        Throwable e;        if (event != null && event.getException() != null)        {            e = event.getException();        }        else        {            return;        }        if (event.getException().getCause() != null)        {            e = event.getException().getCause();        }        logger.error("Work caused exception on '" + type + "'. Work being executed was: "                        + event.getWork().toString());        if (e instanceof Exception)        {            handleException((Exception) e);        }        else        {            throw new MuleRuntimeException(                CoreMessages.componentCausedErrorIs(this.getName()), e);        }    }    protected ServiceStatistics createStatistics()    {        return new ServiceStatistics(getName(), threadingProfile.getMaxThreadsActive());    }    public Object getInstance() throws MuleException    {        throw new UnsupportedOperationException("Direct access to underlying service object is not allowed in the SedaModel.  If this is for a unit test, make sure you are using the TestSedaModel ('seda-test')");    }    public QueueProfile getQueueProfile()    {        return queueProfile;    }    public void setQueueProfile(QueueProfile queueProfile)    {        this.queueProfile = queueProfile;    }    public Integer getQueueTimeout()    {        return queueTimeout;    }    public void setQueueTimeout(Integer queueTimeout)    {        this.queueTimeout = queueTimeout;    }    public ThreadingProfile getThreadingProfile()    {        return threadingProfile;    }    public void setThreadingProfile(ThreadingProfile threadingProfile)    {        this.threadingProfile = threadingProfile;    }    public WorkManager getWorkManager()    {        return workManager;    }    public void setWorkManager(WorkManager workManager)    {        this.workManager = workManager;    }    protected void dispatchToOutboundRouter(MuleEvent event, MuleMessage result) throws MessagingException    {        super.dispatchToOutboundRouter(event, result);        // TODO MULE-3077 SedaService should use a SEDA queue to dispatch to outbound        // routers    }    private class ComponentStageWorker implements Work    {        private MuleEvent event;        public ComponentStageWorker(MuleEvent event)        {            this.event = event;        }        public void run()        {            try            {                event = OptimizedRequestContext.criticalSetEvent(event);                Object replyTo = event.getMessage().getReplyTo();                ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(),                    (InboundEndpoint) event.getEndpoint());                MuleMessage result = invokeComponent(event);                dispatchToOutboundRouter(event, result);                processReplyTo(event, result, replyToHandler, replyTo);            }            catch (Exception e)            {                event.getSession().setValid(false);                if (e instanceof MessagingException)                {                    handleException(e);                }                else                {                    handleException(new MessagingException(CoreMessages.eventProcessingFailedFor(getName()),                        event.getMessage(), e));                }            }        }        public void release()        {            // no-op        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -