📄 remotedispatcher.java
字号:
public MuleMessage sendRemote(String endpoint, Object payload, Map messageProperties, int timeout) throws MuleException { return doToRemote(endpoint, payload, messageProperties, true, timeout); } public MuleMessage sendRemote(String endpoint, Object payload, Map messageProperties) throws MuleException { return doToRemote(endpoint, payload, messageProperties, true, MuleServer.getMuleContext().getConfiguration().getDefaultSynchronousEventTimeout()); } public void dispatchRemote(String endpoint, Object payload, Map messageProperties) throws MuleException { doToRemote(endpoint, payload, messageProperties, false, -1); } public FutureMessageResult sendAsyncRemote(final String endpoint, final Object payload, final Map messageProperties) throws MuleException { Callable callable = new Callable() { public Object call() throws Exception { return doToRemote(endpoint, payload, messageProperties, true, -1); } }; FutureMessageResult result = new FutureMessageResult(callable); if (asyncExecutor != null) { result.setExecutor(asyncExecutor); } result.execute(); return result; } public MuleMessage receiveRemote(String endpoint, int timeout) throws MuleException { RemoteDispatcherNotification action = new RemoteDispatcherNotification(null, RemoteDispatcherNotification.ACTION_RECEIVE, endpoint); action.setProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, "true"); action.setProperty(MuleProperties.MULE_EVENT_TIMEOUT_PROPERTY, new Long(timeout)); return dispatchAction(action, true, timeout); } public FutureMessageResult asyncReceiveRemote(final String endpoint, final int timeout) throws MuleException { Callable callable = new Callable() { public Object call() throws Exception { return receiveRemote(endpoint, timeout); } }; FutureMessageResult result = new FutureMessageResult(callable); if (asyncExecutor != null) { result.setExecutor(asyncExecutor); } result.execute(); return result; } protected MuleMessage doToRemoteComponent(String component, Object payload, Map messageProperties, boolean synchronous) throws MuleException { MuleMessage message = new DefaultMuleMessage(payload, messageProperties); message.setBooleanProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, synchronous); setCredentials(message); RemoteDispatcherNotification action = new RemoteDispatcherNotification(message, RemoteDispatcherNotification.ACTION_INVOKE, "mule://" + component); return dispatchAction(action, synchronous, MuleServer.getMuleContext().getConfiguration().getDefaultSynchronousEventTimeout()); } protected MuleMessage doToRemote(String endpoint, Object payload, Map messageProperties, boolean synchronous, int timeout) throws MuleException { MuleMessage message = new DefaultMuleMessage(payload, messageProperties); message.setProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, String.valueOf(synchronous)); setCredentials(message); RemoteDispatcherNotification action = new RemoteDispatcherNotification(message, (synchronous ? RemoteDispatcherNotification.ACTION_SEND : RemoteDispatcherNotification.ACTION_DISPATCH), endpoint); return dispatchAction(action, synchronous, timeout); } protected MuleMessage dispatchAction(RemoteDispatcherNotification action, boolean synchronous, int timeout) throws MuleException { OutboundEndpoint serverEndpoint; if (synchronous) { serverEndpoint = syncServerEndpoint; } else { serverEndpoint = asyncServerEndpoint; } MuleMessage serializeMessage = new DefaultMuleMessage(action); updateContext(serializeMessage, serverEndpoint, synchronous); ByteArrayOutputStream out = new ByteArrayOutputStream(); wireFormat.write(out, serializeMessage, serverEndpoint.getEncoding()); byte[] payload = out.toByteArray(); MuleMessage message = action.getMessage(); if (message == null) { message = new DefaultMuleMessage(payload); } else { message = new DefaultMuleMessage(payload, message); } message.addProperties(action.getProperties()); MuleSession session = new DefaultMuleSession(message, ((AbstractConnector)serverEndpoint.getConnector()).getSessionHandler(), MuleServer.getMuleContext()); MuleEvent event = new DefaultMuleEvent(message, serverEndpoint, session, true); event.setTimeout(timeout); if (logger.isDebugEnabled()) { logger.debug("MuleClient sending remote call to: " + action.getResourceIdentifier() + ". At " + serverEndpoint.toString() + " . Event is: " + event); } MuleMessage result; try { if (synchronous) { result = serverEndpoint.send(event); } else { serverEndpoint.dispatch(event); return null; } if (result != null) { if (result.getPayload() != null) { Object response; if (result.getPayload() instanceof InputStream) { byte[] b = IOUtils.toByteArray((InputStream)result.getPayload()); if(b.length==0) return null; ByteArrayInputStream in = new ByteArrayInputStream(b); response = wireFormat.read(in); } else { ByteArrayInputStream in = new ByteArrayInputStream(result.getPayloadAsBytes()); response = wireFormat.read(in); } if (response instanceof RemoteDispatcherNotification) { response = ((RemoteDispatcherNotification)response).getMessage(); } return (MuleMessage)response; } } } catch (Exception e) { throw new DispatchException(event.getMessage(), event.getEndpoint(), e); } if (logger.isDebugEnabled()) { logger.debug("Result of MuleClient remote call is: " + (result == null ? "null" : result.getPayload())); } return result; } public void dispose() { // nothing to do here } protected void setCredentials(MuleMessage message) { if (credentials != null) { message.setProperty(MuleProperties.MULE_USER_PROPERTY, MuleCredentials.createHeader( credentials.getUsername(), credentials.getPassword())); } } public WireFormat getWireFormat() { return wireFormat; } public void setWireFormat(WireFormat wireFormat) { this.wireFormat = wireFormat; } protected void updateContext(MuleMessage message, ImmutableEndpoint endpoint, boolean synchronous) throws MuleException { RequestContext.setEvent(new DefaultMuleEvent(message, endpoint, new DefaultMuleSession(message, new MuleSessionHandler(), MuleServer.getMuleContext()), synchronous)); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -