📄 joramadapter.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 - 2006 Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): ScalAgent Distributed Technologies * Benoit Pelletier (Bull SA) */package org.objectweb.joram.client.connector;import fr.dyade.aaa.agent.AgentServer;import com.scalagent.jmx.JMXServer;import org.objectweb.joram.client.jms.Queue;import org.objectweb.joram.client.jms.Topic;import org.objectweb.joram.client.jms.admin.AdminException;import org.objectweb.joram.client.jms.admin.JoramAdmin;import org.objectweb.joram.client.jms.admin.User;import org.objectweb.joram.client.jms.admin.DeadMQueue;import org.objectweb.joram.client.jms.ha.local.XAHALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.local.HALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.HATcpConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.XAHATcpConnectionFactory;import org.objectweb.joram.client.jms.ha.local.XATopicHALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.local.TopicHALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.TopicHATcpConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.XATopicHATcpConnectionFactory;import org.objectweb.joram.client.jms.local.LocalConnectionFactory;import org.objectweb.joram.client.jms.local.QueueLocalConnectionFactory;import org.objectweb.joram.client.jms.local.TopicLocalConnectionFactory;import org.objectweb.joram.client.jms.local.XALocalConnectionFactory;import org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory;import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;import org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory;import org.objectweb.joram.client.jms.tcp.XATcpConnectionFactory;import org.objectweb.joram.client.jms.ConnectionMetaData;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Method;import java.net.ConnectException;import java.util.Enumeration;import java.util.Hashtable;import java.util.Iterator;import java.util.List;import java.util.Properties;import java.util.StringTokenizer;import java.util.Vector;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Session;import javax.jms.TopicConnectionFactory;import javax.jms.XAConnection;import javax.jms.XAConnectionFactory;import javax.management.MBeanServer;import javax.management.MBeanServerFactory;import javax.management.ObjectName;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;import javax.resource.NotSupportedException;import javax.resource.ResourceException;import javax.resource.spi.ActivationSpec;import javax.resource.spi.BootstrapContext;import javax.resource.spi.CommException;import javax.resource.spi.IllegalStateException;import javax.resource.spi.ResourceAdapterInternalException;import javax.resource.spi.endpoint.MessageEndpointFactory;import javax.resource.spi.work.WorkManager;import javax.transaction.xa.XAResource;import org.objectweb.util.monolog.api.BasicLevel;/** * A <code>JoramAdapter</code> instance manages connectivities to an * underlying JORAM server: outbound connectivity (JCA connection * management contract) and inbound connectivity (asynchronous message * delivery as specified by the JCA message inflow contract). */public class JoramAdapter implements javax.resource.spi.ResourceAdapter, java.io.Serializable, JoramAdapterMBean { /** <code>WorkManager</code> instance provided by the application server. */ private transient WorkManager workManager; /** * Table holding the adapter's <code>InboundConsumer</code> instances, * for inbound messaging. * <p> * <b>Key:</b> <code>ActivationSpec</code> instance<br> * <b>Value:</b> <code>InboundConsumer</code> instance */ private transient Hashtable consumers; /** * Vector holding the <code>ManagedConnectionImpl</code> instances for * managed outbound messaging. */ private transient Vector producers; /** * Table holding the adapter's <code>XAConnection</code> instances used for * recovering the XA resources. * <p> * <b>Key:</b> user name<br> * <b>Value:</b> <code>XAConnection</code> instance */ private transient Hashtable connections; /** <code>true</code> if the adapter has been started. */ private boolean started = false; /** <code>true</code> if the adapter has been stopped. */ private boolean stopped = false; /** <code>true</code> if the underlying JORAM server is collocated. */ boolean collocated = false; /** <code>true</code> if the underlying a JORAM HA server is defined */ boolean isHa = false; /** Host name or IP of the underlying JORAM server. */ String hostName = "localhost"; /** Port number of the underlying JORAM server. */ int serverPort = 16010; /** Identifier of the JORAM server to start. */ short serverId = 0; /** Identifier of the JORAM replica to start in case of HA. */ short clusterId = AgentServer.NULL_ID; /** Platform servers identifiers. */ List platformServersIds = null; /** * Path to the directory containing JORAM's configuration files * (<code>a3servers.xml</code>, <code>a3debug.cfg</code> * and admin file), needed when starting the collocated JORAM server. */ private String platformConfigDir; /** <code>true</code> if the JORAM server to start is persistent. */ private boolean persistentPlatform = false; /** * Path to the file containing a description of the administered objects to * create and bind. */ private String adminFile = "joram-admin.cfg"; private String adminFileXML = "joramAdmin.xml"; /** * Path to the file containing a description of the exported administered objects (destination) */ private String adminFileExportXML = "joramAdminExport.xml"; /** Name of the JORAM server to start. */ private String serverName = "s0"; /** Names of the bound objects. */ private static Vector boundNames = new Vector(); /** Standard JMSResource MBean ObjectName. */ private static ObjectName jmsResourceON; /** Local MBean server. */ private static MBeanServer mbs = null; /** * Duration in seconds during which connecting is attempted (connecting * might take time if the server is temporarily not reachable); the 0 value * is set for connecting only once and aborting if connecting failed. */ public int connectingTimer = 0; /** * Duration in seconds during which a JMS transacted (non XA) session might * be pending; above that duration the session is rolled back and closed; * the 0 value means "no timer". */ public int txPendingTimer = 0; /** * Period in milliseconds between two ping requests sent by the client * connection to the server; if the server does not receive any ping * request during more than 2 * cnxPendingTimer, the connection is * considered as dead and processed as required. */ public int cnxPendingTimer = 0; /** * The maximum number of messages that can be * read at once from a queue. * * Default value is 2 in order to compensate * the former subscription mechanism. */ public int queueMessageReadMax = 2; /** * The maximum number of acknowledgements * that can be buffered in * Session.DUPS_OK_ACKNOWLEDGE mode when listening to a topic. * Default is 0. */ public int topicAckBufferMax = 0; /** * This threshold is the maximum messages * number over * which the subscription is passivated. * Default is Integer.MAX_VALUE. */ public int topicPassivationThreshold = Integer.MAX_VALUE; /** * This threshold is the minimum * messages number below which * the subscription is activated. * Default is 0. */ public int topicActivationThreshold = 0; /** * Determines whether the produced messages are asynchronously * sent or not (without or with acknowledgement) * Default is false (with ack). */ public boolean asyncSend = false; /** * Determines whether client threads * which are using the same connection * are synchronized in order to group * together the requests they send. * Default is false. */ public boolean multiThreadSync = false; /** * The maximum time the threads hang if 'multiThreadSync' is true. * Either they wake up (wait time out) or they are notified (by the * first woken up thread). * * Default is 1 ms. */ public int multiThreadSyncDelay = 1; public JMXServer jmxServer; private transient JoramAdmin joramAdmin; /** * Constructs a <code>JoramAdapter</code> instance. */ public JoramAdapter() { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "JORAM adapter instantiated."); consumers = new Hashtable(); producers = new Vector(); java.util.ArrayList array = MBeanServerFactory.findMBeanServer(null); if (!array.isEmpty()) mbs = (MBeanServer) array.get(0); jmxServer = new JMXServer(mbs,"JoramAdapter"); } /** * Initializes the adapter; starts, if needed, a collocated JORAM server, * and if needed again, administers it. * * @exception ResourceAdapterInternalException If the adapter could not be * initialized. */ public synchronized void start(BootstrapContext ctx) throws ResourceAdapterInternalException { // set HA mode if needed joramAdmin.setHa(isHa); if (started) throw new ResourceAdapterInternalException("Adapter already started."); if (stopped) throw new ResourceAdapterInternalException("Adapter has been stopped."); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "JORAM adapter starting deployment..."); workManager = ctx.getWorkManager(); // Collocated mode: starting the JORAM server. if (collocated) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, " - Collocated JORAM server is starting..."); if (persistentPlatform) { System.setProperty("Transaction", "fr.dyade.aaa.util.NTransaction"); System.setProperty("NTNoLockFile", "true"); } else { System.setProperty("Transaction", "fr.dyade.aaa.util.NullTransaction"); System.setProperty("NbMaxAgents", "" + Integer.MAX_VALUE); } if (platformConfigDir != null) { System.setProperty("fr.dyade.aaa.agent.A3CONF_DIR", platformConfigDir); System.setProperty("fr.dyade.aaa.DEBUG_DIR", platformConfigDir); } try { AgentServer.init(serverId, serverName, null, clusterId); AgentServer.start(); if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, " - Collocated JORAM server has successfully started."); } catch (Exception exc) { AgentServer.stop(); AgentServer.reset(true); throw new ResourceAdapterInternalException("Could not start " + "collocated JORAM " + " instance: " + exc); } } // Starting admin. try { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, " - Reading the provided admin file: " + adminFileXML); JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileXML); } catch (Exception exc) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, "JORAM ADMIN XML not found."); } // Starting an admin session... try { adminConnect(); serverId = (short) joramAdmin.getPlatformAdmin().getLocalServerId(); } catch (Exception exc) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN)) AdapterTracing.dbgAdapter.log(BasicLevel.WARN, " - JORAM server not administerable: " + exc); } // Recreates the objects (backup) if the export file is present if (joramAdmin != null) { joramAdmin.setAdminFileExportXML(adminFileExportXML); try { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, " - Reading the provided admin file: " + adminFileExportXML); JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileExportXML); // redo the admin connection as the executeXMLAdmin has closed the session adminConnect(); } catch (Exception exc) { if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO)) AdapterTracing.dbgAdapter.log(BasicLevel.INFO, adminFileExportXML + " not found."); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -