📄 sedaservice.java
字号:
return name + QUEUE_NAME_SUFFIX; } /** * While the service isn't stopped this runs a continuous loop checking for new * events in the queue. */ public void run() { DefaultMuleEvent event = null; QueueSession queueSession = muleContext.getQueueManager().getQueueSession(); while (!stopped.get()) { try { // Wait if the service is paused if (paused.get()) { paused.whenFalse(null); // If service is resumed as part of stopping if (stopping.get()) { if (!queueProfile.isPersistent() && (queueSession != null && getQueueSize() > 0)) { // Any messages in a non-persistent queue went paused service is stopped are lost logger.warn(CoreMessages.stopPausedSedaServiceNonPeristentQueueMessageLoss(getQueueSize(), this)); } stopping.set(false); break; } } // If we're doing a draining stop, read all events from the queue // before stopping if (stopping.get()) { if (queueProfile.isPersistent() || (queueSession == null || getQueueSize() <= 0)) { stopping.set(false); break; } } event = (DefaultMuleEvent) dequeue(); if (event != null) { if (stats.isEnabled()) { stats.decQueuedEvent(); } if (logger.isDebugEnabled()) { logger.debug("Service: " + name + " dequeued event on: " + event.getEndpoint().getEndpointURI()); } workManager.scheduleWork(new ComponentStageWorker(event), WorkManager.INDEFINITE, null, this); } } catch (Exception e) { if (e instanceof InterruptedException) { stopping.set(false); break; } if (e instanceof MuleException) { handleException(e); } else { handleException( new ServiceException( CoreMessages.eventProcessingFailedFor(name), (event == null ? null : event.getMessage()), this, e)); } } } } public void release() { stopping.set(false); } protected void enqueue(MuleEvent event) throws Exception { if (queue == null) { throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this); } if (logger.isDebugEnabled()) { logger.debug("Service " + name + " putting event on queue " + queue.getName() + ": " + event); } queue.put(event); } protected MuleEvent dequeue() throws Exception { if (queue == null) { throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for service " + name), this); } if (logger.isDebugEnabled()) { logger.debug("Service " + name + " polling queue " + queue.getName() + ", timeout = " + queueTimeout); } if (getQueueTimeout() == null) { throw new InitialisationException(CoreMessages.noServiceQueueTimeoutSet(this), this); } else { return (MuleEvent) queue.poll(getQueueTimeout().intValue()); } } public void workAccepted(WorkEvent event) { handleWorkException(event, "workAccepted"); } public void workRejected(WorkEvent event) { handleWorkException(event, "workRejected"); } public void workStarted(WorkEvent event) { handleWorkException(event, "workStarted"); } public void workCompleted(WorkEvent event) { handleWorkException(event, "workCompleted"); } protected void handleWorkException(WorkEvent event, String type) { Throwable e; if (event != null && event.getException() != null) { e = event.getException(); } else { return; } if (event.getException().getCause() != null) { e = event.getException().getCause(); } logger.error("Work caused exception on '" + type + "'. Work being executed was: " + event.getWork().toString()); if (e instanceof Exception) { handleException((Exception) e); } else { throw new MuleRuntimeException( CoreMessages.componentCausedErrorIs(this.getName()), e); } } protected ServiceStatistics createStatistics() { return new ServiceStatistics(getName(), threadingProfile.getMaxThreadsActive()); } public Object getInstance() throws MuleException { throw new UnsupportedOperationException("Direct access to underlying service object is not allowed in the SedaModel. If this is for a unit test, make sure you are using the TestSedaModel ('seda-test')"); } public QueueProfile getQueueProfile() { return queueProfile; } public void setQueueProfile(QueueProfile queueProfile) { this.queueProfile = queueProfile; } public Integer getQueueTimeout() { return queueTimeout; } public void setQueueTimeout(Integer queueTimeout) { this.queueTimeout = queueTimeout; } public ThreadingProfile getThreadingProfile() { return threadingProfile; } public void setThreadingProfile(ThreadingProfile threadingProfile) { this.threadingProfile = threadingProfile; } public WorkManager getWorkManager() { return workManager; } public void setWorkManager(WorkManager workManager) { this.workManager = workManager; } protected void dispatchToOutboundRouter(MuleEvent event, MuleMessage result) throws MessagingException { super.dispatchToOutboundRouter(event, result); // TODO MULE-3077 SedaService should use a SEDA queue to dispatch to outbound // routers } private class ComponentStageWorker implements Work { private MuleEvent event; public ComponentStageWorker(MuleEvent event) { this.event = event; } public void run() { try { event = OptimizedRequestContext.criticalSetEvent(event); Object replyTo = event.getMessage().getReplyTo(); ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), (InboundEndpoint) event.getEndpoint()); MuleMessage result = invokeComponent(event); dispatchToOutboundRouter(event, result); processReplyTo(event, result, replyToHandler, replyTo); } catch (Exception e) { event.getSession().setValid(false); if (e instanceof MessagingException) { handleException(e); } else { handleException(new MessagingException(CoreMessages.eventProcessingFailedFor(getName()), event.getMessage(), e)); } } } public void release() { // no-op } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -