xatransactedjmsmessagereceiver.java
来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 392 行
JAVA
392 行
/* * $Id: XaTransactedJmsMessageReceiver.java 12606 2008-09-03 17:09:08Z tcarlson $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.jms;import org.mule.DefaultMuleMessage;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.CreateException;import org.mule.api.service.Service;import org.mule.api.transaction.Transaction;import org.mule.api.transaction.TransactionCallback;import org.mule.api.transport.Connector;import org.mule.api.transport.MessageAdapter;import org.mule.retry.policies.NoRetryPolicyTemplate;import org.mule.transaction.TransactionCoordination;import org.mule.transaction.TransactionTemplate;import org.mule.transaction.XaTransaction;import org.mule.transport.ConnectException;import org.mule.transport.TransactedPollingMessageReceiver;import org.mule.transport.jms.filters.JmsSelectorFilter;import org.mule.util.ClassUtils;import org.mule.util.MapUtils;import java.util.Iterator;import java.util.List;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;public class XaTransactedJmsMessageReceiver extends TransactedPollingMessageReceiver{ public static final long DEFAULT_JMS_POLL_FREQUENCY = 100; public static final TimeUnit DEFAULT_JMS_POLL_TIMEUNIT = TimeUnit.MILLISECONDS; protected final JmsConnector connector; protected boolean reuseConsumer; protected boolean reuseSession; protected final ThreadContextLocal context = new ThreadContextLocal(); protected final long timeout; protected final RedeliveryHandler redeliveryHandler; /** * Holder receiving the session and consumer for this thread. */ protected static class JmsThreadContext { public Session session; public MessageConsumer consumer; } /** * Strongly typed ThreadLocal for ThreadContext. */ protected static class ThreadContextLocal extends ThreadLocal { public JmsThreadContext getContext() { return (JmsThreadContext)get(); } protected Object initialValue() { return new JmsThreadContext(); } } public XaTransactedJmsMessageReceiver(Connector umoConnector, Service service, InboundEndpoint endpoint) throws CreateException { super(umoConnector, service, endpoint); // TODO AP: find appropriate value for polling frequency with the scheduler; // see setFrequency/setTimeUnit & VMMessageReceiver for more this.setFrequency(DEFAULT_JMS_POLL_FREQUENCY); this.setTimeUnit(DEFAULT_JMS_POLL_TIMEUNIT); this.connector = (JmsConnector) umoConnector; this.timeout = endpoint.getTransactionConfig().getTimeout(); // If reconnection is set, default reuse strategy to false // as some jms brokers will not detect lost connections if the // same consumer / session is used if (retryTemplate != null && !(retryTemplate instanceof NoRetryPolicyTemplate)) { this.reuseConsumer = true; this.reuseSession = true; } // User may override reuse strategy if necessary this.reuseConsumer = MapUtils.getBooleanValue(endpoint.getProperties(), "reuseConsumer", this.reuseConsumer); this.reuseSession = MapUtils.getBooleanValue(endpoint.getProperties(), "reuseSession", this.reuseSession); // Do extra validation, XA Topic & reuse are incompatible. See MULE-2622 boolean topic = connector.getTopicResolver().isTopic(getEndpoint()); if (topic && (reuseConsumer || reuseSession)) { logger.warn("Destination " + getEndpoint().getEndpointURI() + " is a topic and XA transaction was " + "configured. Forcing 'reuseSession' and 'reuseConsumer' to false. Set these " + "on endpoint to avoid the message."); reuseConsumer = false; reuseSession = false; } // Check if the destination is a queue and // if we are in transactional mode. // If true, set receiveMessagesInTransaction to true. // It will start multiple threads, depending on the threading profile. // If we're using topics we don't want to use multiple receivers as we'll get // the same message multiple times this.setUseMultipleTransactedReceivers(!topic); try { redeliveryHandler = this.connector.getRedeliveryHandlerFactory().create(); redeliveryHandler.setConnector(this.connector); } catch (Exception e) { throw new CreateException(e, this); } } protected void doDispose() { // template method } protected void doConnect() throws Exception { // template method } protected void doDisconnect() throws Exception { if (connector.isConnected()) { // TODO All resources will be close by transaction or by connection close closeResource(true); } } /** * The poll method is overriden from the {@link TransactedPollingMessageReceiver} */ public void poll() throws Exception { TransactionTemplate tt = new TransactionTemplate(endpoint.getTransactionConfig(), connector.getExceptionListener(), connector.getMuleContext()); TransactionCallback cb = new TransactionCallback() { public Object doInTransaction() throws Exception { try { List messages = getMessages(); if (messages != null && messages.size() > 0) { for (Iterator it = messages.iterator(); it.hasNext();) { processMessage(it.next()); } } return null; } catch (Exception e) { // There is not a need to close resources here, // they will be close by XaTransaction, JmsThreadContext ctx = context.getContext(); ctx.consumer = null; Transaction tx = TransactionCoordination.getInstance().getTransaction(); if (ctx.session != null && tx instanceof XaTransaction.MuleXaObject) { ((XaTransaction.MuleXaObject) ctx.session).setReuseObject(false); } ctx.session = null; throw e; } } }; tt.execute(cb); } protected List getMessages() throws Exception { Session session = this.connector.getSessionFromTransaction(); Transaction tx = TransactionCoordination.getInstance().getTransaction(); MessageConsumer consumer = createConsumer(); // Retrieve message Message message = null; try { message = consumer.receive(timeout); } catch (JMSException e) { // If we're being disconnected, ignore the exception if (!this.isConnected()) { // ignore } else { throw e; } } if (message == null) { if (tx != null) { tx.setRollbackOnly(); } return null; } message = connector.preProcessMessage(message, session); // Process message if (logger.isDebugEnabled()) { logger.debug("Message received it is of type: " + ClassUtils.getSimpleName(message.getClass())); if (message.getJMSDestination() != null) { logger.debug("Message received on " + message.getJMSDestination() + " (" + message.getJMSDestination().getClass().getName() + ")"); } else { logger.debug("Message received on unknown destination"); } logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID()); logger.debug("Jms Message Id is: " + message.getJMSMessageID()); } if (message.getJMSRedelivered()) { if (logger.isDebugEnabled()) { logger.debug("Message with correlationId: " + message.getJMSCorrelationID() + " is redelivered. handing off to Exception Handler"); } redeliveryHandler.handleRedelivery(message); } MessageAdapter adapter = connector.getMessageAdapter(message); routeMessage(new DefaultMuleMessage(adapter)); return null; } protected void processMessage(Object msg) throws Exception { // This method is never called as the // message is processed when received } /** * Close Sesison and consumer */ protected void closeResource(boolean force) { JmsThreadContext ctx = context.getContext(); if (ctx == null) { return; } // Close consumer if (force || !reuseSession || !reuseConsumer) { connector.closeQuietly(ctx.consumer); ctx.consumer = null; } // Do not close session if a transaction is in progress // the session will be closed by the transaction if (force || !reuseSession) { connector.closeQuietly(ctx.session); ctx.session = null; } } /** * Create a consumer for the jms destination * * @throws Exception */ protected MessageConsumer createConsumer() throws Exception { logger.debug("Create a consumer for the jms destination"); try { JmsSupport jmsSupport = this.connector.getJmsSupport(); JmsThreadContext ctx = context.getContext(); if (ctx == null) { ctx = new JmsThreadContext(); } Session session; Transaction tx = TransactionCoordination.getInstance().getTransaction(); if (this.reuseSession && ctx.session != null) { session = ctx.session; tx.bindResource(this.connector.getConnection(), session); } else { session = this.connector.getSession(endpoint); if (tx != null) { ((XaTransaction.MuleXaObject) session).setReuseObject(reuseSession); } } if (reuseSession) { ctx.session = session; } // TODO How can I verify that the consumer is active? if (this.reuseConsumer && ctx.consumer != null) { return ctx.consumer; } // Create destination final boolean topic = connector.getTopicResolver().isTopic(endpoint); Destination dest = jmsSupport.createDestination(session, endpoint.getEndpointURI() .getAddress(), topic); // Extract jms selector String selector = null; if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter) { selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression(); } else if (endpoint.getProperties() != null) { // still allow the selector to be set as a property on the endpoint // to be backward compatible selector = (String)endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY); } String tempDurable = (String)endpoint.getProperties().get("durable"); boolean durable = connector.isDurable(); if (tempDurable != null) { durable = Boolean.valueOf(tempDurable).booleanValue(); } // Get the durable subscriber name if there is one String durableName = (String)endpoint.getProperties().get("durableName"); if (durableName == null && durable && topic) { durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress(); logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: " + durableName); } // Create consumer MessageConsumer consumer = jmsSupport.createConsumer(session, dest, selector, connector.isNoLocal(), durableName, topic); if (reuseConsumer) { ctx.consumer = consumer; } return consumer; } catch (JMSException e) { throw new ConnectException(e, this); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?