📄 vmmessagerequester.java
字号:
/* * $Id: VMMessageRequester.java 11967 2008-06-05 20:32:19Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.vm;import org.mule.api.MuleMessage;import org.mule.api.ThreadSafeAccess;import org.mule.api.endpoint.InboundEndpoint;import org.mule.transport.AbstractMessageRequester;import org.mule.util.queue.Queue;import org.mule.util.queue.QueueSession;/** * <code>VMMessageDispatcher</code> is used for providing in memory interaction * between components. */public class VMMessageRequester extends AbstractMessageRequester{ private final VMConnector connector; public VMMessageRequester(InboundEndpoint endpoint) { super(endpoint); this.connector = (VMConnector) endpoint.getConnector(); } /** * Make a specific request to the underlying transport * * @param timeout the maximum time the operation should block before returning. * The call should return immediately if there is data available. If * no data becomes available before the timeout elapses, null will be * returned * @return the result of the request wrapped in a MuleMessage object. Null will be * returned if no data was available * @throws Exception if the call to the underlying protocol causes an exception */ protected MuleMessage doRequest(long timeout) throws Exception { if (!connector.isQueueEvents()) { throw new UnsupportedOperationException("Receive requested on VM Connector, but queueEvents is false"); } try { QueueSession queueSession = connector.getQueueSession(); Queue queue = queueSession.getQueue(endpoint.getEndpointURI().getAddress()); if (queue == null) { if (logger.isDebugEnabled()) { logger.debug("No queue with name " + endpoint.getEndpointURI().getAddress()); } return null; } else { MuleMessage message = null; if (logger.isDebugEnabled()) { logger.debug("Waiting for a message on " + endpoint.getEndpointURI().getAddress()); } try { message = (MuleMessage) queue.poll(timeout); } catch (InterruptedException e) { logger.error("Failed to receive message from queue: " + endpoint.getEndpointURI()); } if (message != null) { //The message will contain old thread information, we need to reset it if(message instanceof ThreadSafeAccess) { //TODO: would it be ok just to reset access control here? i.e. ((ThreadSafeAccess)message).resetAccessControl();// message = (MuleMessage)((ThreadSafeAccess)message).newThreadCopy(); } if (logger.isDebugEnabled()) { logger.debug("Message received: " + message); } return message; } else { if (logger.isDebugEnabled()) { logger.debug("No event received after " + timeout + " ms"); } return null; } } } catch (Exception e) { throw e; } } protected void doDispose() { // template method } protected void doConnect() throws Exception { if (connector.isQueueEvents()) { // use the default queue profile to configure this queue. connector.getQueueProfile().configureQueue( endpoint.getEndpointURI().getAddress(), connector.getQueueManager()); } } protected void doDisconnect() throws Exception { // template method }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -