📄 bridgeunifiedmodule.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2003 - Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): */package org.objectweb.joram.mom.util;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.util.Daemon;import org.objectweb.joram.shared.messages.Message;import java.util.Enumeration;import java.util.Properties;import java.util.Vector;import javax.jms.*;import javax.jms.IllegalStateException;/** * The <code>BridgeUnifiedModule</code> class is a bridge module based on the * JMS 1.1 unified semantics and classes. */public class BridgeUnifiedModule implements javax.jms.ExceptionListener, javax.jms.MessageListener, java.io.Serializable{ /** Identifier of the agent using this module. */ protected AgentId agentId; /** Name of the JNDI factory class to use. */ protected String jndiFactory = null; /** JNDI URL. */ protected String jndiUrl = null; /** ConnectionFactory JNDI name. */ protected String cnxFactName; /** Destination JNDI name. */ protected String destName; /** Connection factory object for connecting to the foreign JMS server. */ protected ConnectionFactory cnxFact = null; /** Foreign JMS destination object. */ protected Destination dest = null; /** User identification for connecting to the foreign JMS server. */ protected String userName = null; /** User password for connecting to the foreign JMS server. */ protected String password = null; /** JMS clientID field. */ protected String clientID = null; /** Selector for filtering messages. */ protected String selector; /** <code>true</code> if the module is fully usable. */ protected boolean usable = true; /** Message explaining why the module is not usable. */ protected String notUsableMessage; /** Connection to the foreign JMS server. */ protected transient Connection cnx; /** Session for sending messages to the foreign JMS destination. */ protected transient Session producerSession; /** Session for getting messages from the foreign JMS destination. */ protected transient Session consumerSession; /** Producer object. */ protected transient MessageProducer producer; /** Consumer object. */ protected transient MessageConsumer consumer; /** <code>true</code> if a listener has been set on the JMS consumer. */ protected transient boolean listener; /** Vector holding the pending messages to send after reconnection. */ protected transient Vector qout; /** Daemon used for requesting messages. */ protected transient ConsumerDaemon consumerDaemon; /** Daemon used for the reconnection process. */ protected transient ReconnectionDaemon reconnectionDaemon; /** Constructs a <code>BridgeUnifiedModule</code> module. */ public BridgeUnifiedModule() {} /** * Initializes the module's parameters. * * @param agentId Identifier of the agent using the module. * @param prop JMS properties required for establishing the link with the * foreign JMS server. * * @exception IllegalArgumentException If the provided properties are * invalid. */ public void init(AgentId agentId, Properties prop) { this.agentId = agentId; jndiFactory = prop.getProperty("jndiFactory"); jndiUrl = prop.getProperty("jndiUrl"); cnxFactName = prop.getProperty("connectionFactoryName"); if (cnxFactName == null) throw new IllegalArgumentException("Missing ConnectionFactory JNDI name."); destName = prop.getProperty("destinationName"); if (destName == null) throw new IllegalArgumentException("Missing Destination JNDI name."); String userName = prop.getProperty("userName"); String password = prop.getProperty("password"); if (userName != null && password != null) { this.userName = userName; this.password = password; } clientID = prop.getProperty("clientId"); selector = prop.getProperty("selector"); } /** * Launches the connection process to the foreign JMS server. * * @exception javax.jms.IllegalStateException If the module can't access * the foreign JMS server. * @exception javax.jms.JMSException If the needed JMS resources can't be * created. */ public void connect() throws JMSException { if (! usable) throw new IllegalStateException(notUsableMessage); listener = false; // Creating the module's daemons. consumerDaemon = new ConsumerDaemon(); reconnectionDaemon = new ReconnectionDaemon(); // Administered objects have not been retrieved: launching the startup // daemon. if (cnxFact == null || dest == null) { StartupDaemon startup = new StartupDaemon(); startup.start(); } // Administered objects have been retrieved: connecting. else { try { doConnect(); } catch (JMSException exc) { reconnectionDaemon.reconnect(); } } } /** * Sets a message listener on the foreign JMS destination. * * @exception javax.jms.IllegalStateException If the module state does * not allow to set a listener. */ public void setMessageListener() throws IllegalStateException { if (! usable) throw new IllegalStateException(notUsableMessage); listener = true; try { setConsumer(); consumer.setMessageListener(this); cnx.start(); } catch (JMSException exc) {} } /** * Unsets the set message listener on the foreign JMS destination. */ public void unsetMessageListener() { try { cnx.stop(); consumer.setMessageListener(null); unsetConsumer(); } catch (JMSException exc) {} listener = false; } /** * Synchronous method requesting an immediate delivery from the foreign * JMS destination. * * @return The JMS message formatted into a JORAM MOM message, or * <code>null</code> if no message is available or if the request * fails. * * @exception javax.jms.IllegalStateException If the module state does * not allow to request a message. */ public Message receiveNoWait() throws IllegalStateException { if (! usable) throw new IllegalStateException(notUsableMessage); Message momMessage = null; try { setConsumer(); cnx.start(); try { momMessage = MessageConverterModule.convert(consumer.receiveNoWait()); consumerSession.commit(); } // Conversion error: denying the message. catch (MessageFormatException exc) { consumerSession.rollback(); } } // Connection start, or session commit/rollback failed: // setting the message to null. catch (JMSException commitExc) { momMessage = null; } return momMessage; } /** * Asynchronous method requesting a delivery from the foreign * JMS destination. * * @exception javax.jms.IllegalStateException If the module state does * not allow to request a message. */ public void receive() throws IllegalStateException { if (! usable) throw new IllegalStateException(notUsableMessage); consumerDaemon.receive(); } /** * Sends a message to the foreign JMS destination. * * @exception javax.jms.IllegalStateException If the module's state does * not permit message sendings. * @exception javax.jms.MessageFormatException If the MOM message could not * be converted into a foreign JMS message. */ public void send(org.objectweb.joram.shared.messages.Message message) throws JMSException { if (! usable) throw new IllegalStateException(notUsableMessage); try { producer.send(MessageConverterModule.convert(producerSession, message)); acknowledge(message); } catch (javax.jms.MessageFormatException exc) { throw exc; } // Connection failure? Keeping the message for later delivery. catch (javax.jms.JMSException exc) { qout.add(message); } } /** * Interrupts the daemons and closes the connection. */ public void close() { try { cnx.stop(); } catch (JMSException exc) {} unsetMessageListener(); try { consumerDaemon.interrupt(); } catch (Exception exc) {} try { reconnectionDaemon.interrupt(); } catch (Exception exc) {} try { cnx.close(); } catch (JMSException exc) {} } /** * Implements the <code>javax.jms.ExceptionListener</code> interface for * catching the failures of the connection to the remote JMS server. * <p> * Reacts by launching a reconnection process. */ public void onException(JMSException exc) { reconnectionDaemon.reconnect(); } /** * Implements the <code>javax.jms.MessageListener</code> interface for * processing the asynchronous deliveries coming from the foreign JMS * server. */ public void onMessage(javax.jms.Message jmsMessage) { try { try { Message momMessage = MessageConverterModule.convert(jmsMessage); consumerSession.commit(); Channel.sendTo(agentId, new BridgeDeliveryNot(momMessage)); } // Conversion error: denying the message. catch (MessageFormatException conversionExc) { consumerSession.rollback(); } } // Commit or rollback failed: nothing to do. catch (JMSException exc) {} } /** * Opens a connection with the foreign JMS server and creates the * JMS resources for interacting with the foreign JMS destination. * * @exception JMSException If the needed JMS resources could not be created. */ protected void doConnect() throws JMSException
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -