jmsreplytohandler.java

来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 160 行

JAVA
160
字号
/* * $Id: JmsReplyToHandler.java 12189 2008-06-27 14:57:32Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.jms;import org.mule.api.MuleEvent;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.transformer.Transformer;import org.mule.api.transport.DispatchException;import org.mule.service.AbstractService;import org.mule.transport.DefaultReplyToHandler;import org.mule.transport.jms.i18n.JmsMessages;import org.mule.util.StringMessageUtils;import org.mule.util.StringUtils;import java.util.Iterator;import java.util.List;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.Topic;/** * <code>JmsReplyToHandler</code> will process a JMS replyTo or hand off to the * default replyTo handler if the replyTo is a URL. * The purpose of this class is to send a result on a ReplyTo destination if one * has been set. * Note that the {@link JmsMessageDispatcher} also contains logic for handling ReplyTo. However, * the dispatcher is responsible attaching the replyTo information to the message and also * receiving on the same replyTo if 'remoteSync' is set. The {@link JmsMessageDispatcher} never * writes to the 'replyTo' destination. */public class JmsReplyToHandler extends DefaultReplyToHandler{    private final JmsConnector connector;    public JmsReplyToHandler(JmsConnector connector, List transformers)    {        super(transformers);        this.connector = connector;    }    public void processReplyTo(MuleEvent event, MuleMessage returnMessage, Object replyTo) throws MuleException    {        Destination replyToDestination = null;        MessageProducer replyToProducer = null;        Session session = null;        try        {            // now we need to send the response            if (replyTo instanceof Destination)            {                replyToDestination = (Destination)replyTo;            }            if (replyToDestination == null)            {                super.processReplyTo(event, returnMessage, replyTo);                return;            }            //This is a work around for JmsTransformers where the current endpoint needs            //to be set on the transformer so that a JMSMEssage can be created correctly            Class srcType = returnMessage.getPayload().getClass();                for (Iterator iterator = getTransformers().iterator(); iterator.hasNext();)            {                Transformer t = (Transformer) iterator.next();                if (t.isSourceTypeSupported(srcType))                {                    if (t.getEndpoint() == null)                    {                        t.setEndpoint(getEndpoint(event, "jms://temporary"));                        break;                    }                }            }            returnMessage.applyTransformers(getTransformers());            Object payload = returnMessage.getPayload();            if (replyToDestination instanceof Topic && replyToDestination instanceof Queue                    && connector.getJmsSupport() instanceof Jms102bSupport)            {                logger.error(StringMessageUtils.getBoilerPlate("ReplyTo destination implements both Queue and Topic "                                                               + "while complying with JMS 1.0.2b specification. "                                                               + "Please report your application server or JMS vendor name and version "                                                               + "to dev<_at_>mule.codehaus.org or http://mule.mulesource.org/jira"));            }                        final boolean topic = connector.getTopicResolver().isTopic(replyToDestination);            session = connector.getSession(false, topic);            Message replyToMessage = JmsMessageUtils.toMessage(payload, session);            replyToMessage.setJMSReplyTo(null);            if (logger.isDebugEnabled())            {                logger.debug("Sending jms reply to: " + replyToDestination + "("                             + replyToDestination.getClass().getName() + ")");            }            replyToProducer = connector.getJmsSupport().createProducer(session, replyToDestination, topic);            // QoS support            MuleMessage eventMsg = event.getMessage();            String ttlString = (String)eventMsg.removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY);            String priorityString = (String)eventMsg.removeProperty(JmsConstants.PRIORITY_PROPERTY);            String persistentDeliveryString = (String)eventMsg.removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY);                        String correlationIDString = (String)eventMsg.getProperty(JmsConstants.JMS_MESSAGE_ID);            replyToMessage.setJMSCorrelationID(correlationIDString);                        if (ttlString == null && priorityString == null && persistentDeliveryString == null)            {                connector.getJmsSupport().send(replyToProducer, replyToMessage, topic);            }            else            {                long ttl = Message.DEFAULT_TIME_TO_LIVE;                int priority = Message.DEFAULT_PRIORITY;                if (ttlString != null)                {                    ttl = Long.parseLong(ttlString);                }                if (priorityString != null)                {                    priority = Integer.parseInt(priorityString);                }                boolean persistent = StringUtils.isNotBlank(persistentDeliveryString)                                ? Boolean.valueOf(persistentDeliveryString).booleanValue()                                : connector.isPersistentDelivery();                connector.getJmsSupport().send(replyToProducer, replyToMessage, persistent, priority, ttl,                    topic);            }            logger.info("Reply Message sent to: " + replyToDestination);            ((AbstractService) event.getService()).getStatistics().incSentReplyToEvent();        }        catch (Exception e)        {            throw new DispatchException(                JmsMessages.failedToCreateAndDispatchResponse(replyToDestination), returnMessage, null, e);        }        finally        {            connector.closeQuietly(replyToProducer);            connector.closeQuietly(session);        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?