⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 joramadapter.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 - 2006 Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): ScalAgent Distributed Technologies *                 Benoit Pelletier (Bull SA) */package org.objectweb.joram.client.connector;import fr.dyade.aaa.agent.AgentServer;import com.scalagent.jmx.JMXServer;import org.objectweb.joram.client.jms.Queue;import org.objectweb.joram.client.jms.Topic;import org.objectweb.joram.client.jms.admin.AdminException;import org.objectweb.joram.client.jms.admin.JoramAdmin;import org.objectweb.joram.client.jms.admin.User;import org.objectweb.joram.client.jms.admin.DeadMQueue;import org.objectweb.joram.client.jms.ha.local.XAHALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.local.HALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.HATcpConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.XAHATcpConnectionFactory;import org.objectweb.joram.client.jms.ha.local.XATopicHALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.local.TopicHALocalConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.TopicHATcpConnectionFactory;import org.objectweb.joram.client.jms.ha.tcp.XATopicHATcpConnectionFactory;import org.objectweb.joram.client.jms.local.LocalConnectionFactory;import org.objectweb.joram.client.jms.local.QueueLocalConnectionFactory;import org.objectweb.joram.client.jms.local.TopicLocalConnectionFactory;import org.objectweb.joram.client.jms.local.XALocalConnectionFactory;import org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory;import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;import org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory;import org.objectweb.joram.client.jms.tcp.XATcpConnectionFactory;import org.objectweb.joram.client.jms.ConnectionMetaData;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Method;import java.net.ConnectException;import java.util.Enumeration;import java.util.Hashtable;import java.util.Iterator;import java.util.List;import java.util.Properties;import java.util.StringTokenizer;import java.util.Vector;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Session;import javax.jms.TopicConnectionFactory;import javax.jms.XAConnection;import javax.jms.XAConnectionFactory;import javax.management.MBeanServer;import javax.management.MBeanServerFactory;import javax.management.ObjectName;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;import javax.resource.NotSupportedException;import javax.resource.ResourceException;import javax.resource.spi.ActivationSpec;import javax.resource.spi.BootstrapContext;import javax.resource.spi.CommException;import javax.resource.spi.IllegalStateException;import javax.resource.spi.ResourceAdapterInternalException;import javax.resource.spi.endpoint.MessageEndpointFactory;import javax.resource.spi.work.WorkManager;import javax.transaction.xa.XAResource;import org.objectweb.util.monolog.api.BasicLevel;/** * A <code>JoramAdapter</code> instance manages connectivities to an * underlying JORAM server: outbound connectivity (JCA connection * management contract) and inbound connectivity (asynchronous message * delivery as specified by the JCA message inflow contract). */public class JoramAdapter  implements javax.resource.spi.ResourceAdapter,             java.io.Serializable, JoramAdapterMBean {  /** <code>WorkManager</code> instance provided by the application server. */  private transient WorkManager workManager;  /**   * Table holding the adapter's <code>InboundConsumer</code> instances,   * for inbound messaging.   * <p>   * <b>Key:</b> <code>ActivationSpec</code> instance<br>   * <b>Value:</b> <code>InboundConsumer</code> instance   */  private transient Hashtable consumers;  /**   * Vector holding the <code>ManagedConnectionImpl</code> instances for   * managed outbound messaging.   */  private transient Vector producers;  /**   * Table holding the adapter's <code>XAConnection</code> instances used for   * recovering the XA resources.   * <p>   * <b>Key:</b> user name<br>   * <b>Value:</b> <code>XAConnection</code> instance   */  private transient Hashtable connections;  /** <code>true</code> if the adapter has been started. */  private boolean started = false;  /** <code>true</code> if the adapter has been stopped. */  private boolean stopped = false;  /** <code>true</code> if the underlying JORAM server is collocated. */  boolean collocated = false;  /** <code>true</code> if the underlying a JORAM HA server is defined */  boolean isHa = false;  /** Host name or IP of the underlying JORAM server. */  String hostName = "localhost";  /** Port number of the underlying JORAM server. */  int serverPort = 16010;  /** Identifier of the JORAM server to start. */  short serverId = 0;  /** Identifier of the JORAM replica to start in case of HA. */  short clusterId = AgentServer.NULL_ID;  /** Platform servers identifiers. */  List platformServersIds = null;  /**   * Path to the directory containing JORAM's configuration files   * (<code>a3servers.xml</code>, <code>a3debug.cfg</code>   * and admin file), needed when starting the collocated JORAM server.   */  private String platformConfigDir;  /** <code>true</code> if the JORAM server to start is persistent. */  private boolean persistentPlatform = false;  /**   * Path to the file containing a description of the administered objects to   * create and bind.   */  private String adminFile = "joram-admin.cfg";  private String adminFileXML = "joramAdmin.xml";  /**   * Path to the file containing a description of the exported administered objects (destination)   */  private String adminFileExportXML = "joramAdminExport.xml";  /** Name of the JORAM server to start. */  private String serverName = "s0";  /** Names of the bound objects. */  private static Vector boundNames = new Vector();  /** Standard JMSResource MBean ObjectName. */  private static ObjectName jmsResourceON;  /** Local MBean server. */  private static MBeanServer mbs = null;  /**   * Duration in seconds during which connecting is attempted (connecting   * might take time if the server is temporarily not reachable); the 0 value   * is set for connecting only once and aborting if connecting failed.   */  public int connectingTimer = 0;  /**   * Duration in seconds during which a JMS transacted (non XA) session might   * be pending; above that duration the session is rolled back and closed;   * the 0 value means "no timer".   */  public int txPendingTimer = 0;  /**   * Period in milliseconds between two ping requests sent by the client   * connection to the server; if the server does not receive any ping   * request during more than 2 * cnxPendingTimer, the connection is   * considered as dead and processed as required.   */  public int cnxPendingTimer = 0;  /**   * The maximum number of messages that can be   * read at once from a queue.   *   * Default value is 2 in order to compensate   * the former subscription mechanism.   */  public int queueMessageReadMax = 2;  /**   * The maximum number of acknowledgements   * that can be buffered in   * Session.DUPS_OK_ACKNOWLEDGE mode when listening to a topic.   * Default is 0.   */  public int topicAckBufferMax = 0;  /**   * This threshold is the maximum messages   * number over   * which the subscription is passivated.   * Default is Integer.MAX_VALUE.   */  public int topicPassivationThreshold = Integer.MAX_VALUE;  /**   * This threshold is the minimum   * messages number below which   * the subscription is activated.   * Default is 0.   */  public int topicActivationThreshold = 0;  /**   * Determines whether the produced messages are asynchronously   * sent or not (without or with acknowledgement)   * Default is false (with ack).   */  public boolean asyncSend = false;  /**   * Determines whether client threads   * which are using the same connection   * are synchronized in order to group   * together the requests they send.   * Default is false.   */  public boolean multiThreadSync = false;  /**   * The maximum time the threads hang if 'multiThreadSync' is true.   * Either they wake up (wait time out) or they are notified (by the   * first woken up thread).   *   * Default is 1 ms.   */  public int multiThreadSyncDelay = 1;  public JMXServer jmxServer;  private transient JoramAdmin joramAdmin;  /**   * Constructs a <code>JoramAdapter</code> instance.   */  public JoramAdapter() {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))      AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                    "JORAM adapter instantiated.");    consumers = new Hashtable();    producers = new Vector();    java.util.ArrayList array = MBeanServerFactory.findMBeanServer(null);    if (!array.isEmpty())      mbs = (MBeanServer) array.get(0);    jmxServer = new JMXServer(mbs,"JoramAdapter");  }  /**   * Initializes the adapter; starts, if needed, a collocated JORAM server,   * and if needed again, administers it.   *   * @exception ResourceAdapterInternalException  If the adapter could not be   *                                              initialized.   */  public synchronized void start(BootstrapContext ctx)                           throws ResourceAdapterInternalException  {      // set HA mode if needed      joramAdmin.setHa(isHa);    if (started)      throw new ResourceAdapterInternalException("Adapter already started.");    if (stopped)      throw new ResourceAdapterInternalException("Adapter has been stopped.");    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))      AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                    "JORAM adapter starting deployment...");    workManager = ctx.getWorkManager();    // Collocated mode: starting the JORAM server.    if (collocated) {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))        AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                      "  - Collocated JORAM server is starting...");      if (persistentPlatform) {        System.setProperty("Transaction", "fr.dyade.aaa.util.NTransaction");        System.setProperty("NTNoLockFile", "true");      } else {        System.setProperty("Transaction", "fr.dyade.aaa.util.NullTransaction");        System.setProperty("NbMaxAgents", "" + Integer.MAX_VALUE);      }      if (platformConfigDir != null) {        System.setProperty("fr.dyade.aaa.agent.A3CONF_DIR", platformConfigDir);        System.setProperty("fr.dyade.aaa.DEBUG_DIR", platformConfigDir);      }      try {        AgentServer.init(serverId, serverName, null, clusterId);        AgentServer.start();        if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))          AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                        "  - Collocated JORAM server has successfully started.");      } catch (Exception exc) {        AgentServer.stop();        AgentServer.reset(true);        throw new ResourceAdapterInternalException("Could not start "                                                   + "collocated JORAM "                                                   + " instance: " + exc);      }    }    // Starting admin.    try {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))        AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                      "  - Reading the provided admin file: " + adminFileXML);      JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileXML);    } catch (Exception exc) {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))        AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                      "JORAM ADMIN XML not found.");    }    // Starting an admin session...    try {      adminConnect();      serverId = (short) joramAdmin.getPlatformAdmin().getLocalServerId();    } catch (Exception exc) {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN))        AdapterTracing.dbgAdapter.log(BasicLevel.WARN,                                      "  - JORAM server not administerable: " + exc);    }    // Recreates the objects (backup) if the export file is present    if (joramAdmin != null) {        joramAdmin.setAdminFileExportXML(adminFileExportXML);        try {            if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))                AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                      "  - Reading the provided admin file: " + adminFileExportXML);            JoramAdmin.executeXMLAdmin(platformConfigDir, adminFileExportXML);            // redo the admin connection as the executeXMLAdmin has closed the session            adminConnect();        } catch (Exception exc) {            if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))                AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                        adminFileExportXML + " not found.");        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -