⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 joramadapter.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    }    // Administering as specified in the properties file.    try {      File file = null;      try {        if (platformConfigDir == null) {          java.net.URL url = ClassLoader.getSystemResource(adminFile);          file = new File(url.getFile());        }        else          file = new File(platformConfigDir, adminFile);      } catch (NullPointerException e) {        throw new java.io.FileNotFoundException();      }      FileReader fileReader = new FileReader(file);      BufferedReader reader = new BufferedReader(fileReader);      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))        AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                      "  - Reading the provided admin file: " + file);      boolean end = false;      String line;      StringTokenizer tokenizer;      String firstToken;      String name = null;      while (! end) {        try {          line = reader.readLine();          if (line == null)            end = true;          else {            tokenizer = new StringTokenizer(line);            if (tokenizer.hasMoreTokens()) {              firstToken = tokenizer.nextToken();              if (firstToken.equalsIgnoreCase("Host")) {                if (tokenizer.hasMoreTokens())                  hostName = tokenizer.nextToken();              }              else if (firstToken.equalsIgnoreCase("Port")) {                if (tokenizer.hasMoreTokens())                  serverPort = Integer.parseInt(tokenizer.nextToken());              }              else if (firstToken.equalsIgnoreCase("Queue")) {                if (tokenizer.hasMoreTokens()) {                  name = tokenizer.nextToken();                  createQueue(name);                }              }              else if (firstToken.equalsIgnoreCase("Topic")) {                if (tokenizer.hasMoreTokens()) {                  name = tokenizer.nextToken();                  createTopic(name);                }              }              else if (firstToken.equalsIgnoreCase("User")) {                if (tokenizer.hasMoreTokens())                  name = tokenizer.nextToken();                if (tokenizer.hasMoreTokens()) {                  String password = tokenizer.nextToken();                  createUser(name, password);                }                else                  if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))                    AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                                  "  - Missing password for user [" + name + "]");              }              else if (firstToken.equalsIgnoreCase("CF")) {                if (tokenizer.hasMoreTokens()) {                  name = tokenizer.nextToken();                  createCF(name);                }              }              else if (firstToken.equalsIgnoreCase("QCF")) {                if (tokenizer.hasMoreTokens()) {                  name = tokenizer.nextToken();                  createQCF(name);                }              }              else if (firstToken.equalsIgnoreCase("TCF")) {                if (tokenizer.hasMoreTokens()) {                  name = tokenizer.nextToken();                  createTCF(name);                }              }            }          }        }        // Error while reading one line.        catch (IOException exc) {        // Error while creating the destination.        } catch (AdminException exc) {          AdapterTracing.dbgAdapter.log(BasicLevel.ERROR,                                        "Creation failed",exc);        }      }    }    // No destination to deploy.    catch (java.io.FileNotFoundException fnfe) {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))        AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                      "  - No administration task requested.");    }    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))      AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                    "Server port is " + serverPort);    started = true;    // Registering MBeans...    try {      jmxServer.registerMBean(this,                              "joramClient",                              "type=JoramAdapter,version=" +                              ConnectionMetaData.providerVersion);    } catch (Exception e) {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN))        AdapterTracing.dbgAdapter.log(BasicLevel.WARN,                                      "  - Could not register JoramAdapterMBean",                                      e);    }    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))      AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                    "JORAM adapter " +                                    ConnectionMetaData.providerVersion +                                    " successfully deployed.");  }  /**   * Notifies the adapter to terminate the connections it manages, and if   * needed, to shut down the collocated JORAM server.   */  public synchronized void stop()  {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))      AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                    "JORAM adapter stopping...");    if (! started || stopped)      return;    // Unbinds the bound objects...    while (! boundNames.isEmpty())      unbind((String) boundNames.remove(0));    // Finishing the admin session.    joramAdmin.getPlatformAdmin().disconnect();    // Closing the outbound connections, if any.    while (! producers.isEmpty()) {      try {        ((ManagedConnectionImpl) producers.remove(0)).destroy();      }      catch (Exception exc) {}    }    // Closing the inbound connections, if any.    for (Enumeration keys = consumers.keys(); keys.hasMoreElements();)      ((InboundConsumer) consumers.get(keys.nextElement())).close();    // Browsing the recovery connections, if any.    if (connections != null) {      for (Enumeration keys = connections.keys(); keys.hasMoreElements();) {        try {          ((XAConnection) connections.get(keys.nextElement())).close();        }        catch (Exception exc) {}      }    }    // If JORAM server is collocated, stopping it.    if (collocated) {      try {        AgentServer.stop();      }      catch (Exception exc) {}    }    stopped = true;    try {      jmxServer.unregisterMBean("joramClient",                                "type=JoramAdapter,version=" +                                ConnectionMetaData.providerVersion);    } catch (Exception e) {      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.WARN))        AdapterTracing.dbgAdapter.log(BasicLevel.WARN,                                      "unregisterMBean",                                      e);    }    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.INFO))      AdapterTracing.dbgAdapter.log(BasicLevel.INFO,                                    "JORAM adapter successfully stopped.");  }  /**   * Notifies the adapter to setup asynchronous message delivery for an   * application server endoint.   *   * @exception IllegalStateException  If the adapter is either not started,   *                                   or stopped.   * @exception NotSupportedException  If the provided activation parameters   *                                   are invalid.   * @exception CommException          If the JORAM server is not reachable.   * @exception SecurityException      If connecting is not allowed.   * @exception ResourceException      Generic exception.   */  public void endpointActivation(MessageEndpointFactory endpointFactory,                                 ActivationSpec spec)              throws ResourceException {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                    this + " endpointActivation(" + endpointFactory +                                    ", " + spec + ")");    if (! started)      throw new IllegalStateException("Non started resource adapter.");    if (stopped)      throw new IllegalStateException("Stopped resource adapter.");    if (! (spec instanceof ActivationSpecImpl))      throw new ResourceException("Provided ActivationSpec instance is not "                                  + "a JORAM activation spec.");    ActivationSpecImpl specImpl = (ActivationSpecImpl) spec;    if (! specImpl.getResourceAdapter().equals(this))      throw new ResourceException("Supplied ActivationSpec instance "                                  + "associated to an other ResourceAdapter.");    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                    "Activating Endpoint on JORAM adapter.");    boolean durable =      specImpl.getSubscriptionDurability() != null      && specImpl.getSubscriptionDurability().equalsIgnoreCase("Durable");    boolean transacted = false;    try {      Class listenerClass = Class.forName("javax.jms.MessageListener");      Class[] parameters = { Class.forName("javax.jms.Message") };      Method meth = listenerClass.getMethod("onMessage", parameters);      transacted = endpointFactory.isDeliveryTransacted(meth);    }    catch (Exception exc) {      throw new ResourceException("Could not determine transactional "                                  + "context: " + exc);    }    int maxWorks = 10;    try {      maxWorks = Integer.parseInt(specImpl.getMaxNumberOfWorks());    } catch (Exception exc) {      throw new ResourceException("Invalid max number of works instances "                                  + "number: " + exc);    }    int maxMessages = 10;    try {      maxMessages = Integer.parseInt(specImpl.getMaxMessages());    } catch (Exception exc) {      throw new ResourceException("Invalid max messages "                                  + "number: " + exc);    }    int ackMode;    try {      if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl          .getAcknowledgeMode())) {        ackMode = Session.AUTO_ACKNOWLEDGE;      } else if (ActivationSpecImpl.AUTO_ACKNOWLEDGE.equals(specImpl          .getAcknowledgeMode())) {        ackMode = Session.DUPS_OK_ACKNOWLEDGE;      } else {        ackMode = Session.AUTO_ACKNOWLEDGE;      }    }  catch (Exception exc) {      throw new ResourceException("Invalid acknowledge mode: " + exc);    }    String destType = specImpl.getDestinationType();    String destName = specImpl.getDestination();    try {      Destination dest;      if (destType.equals("javax.jms.Queue"))        dest = createQueue(destName);      else if (destType.equals("javax.jms.Topic"))        dest = createTopic(destName);      else        throw new NotSupportedException("Invalid destination type provided "                                        + "as activation parameter: "                                        + destType);      String userName = specImpl.getUserName();      String password = specImpl.getPassword();      createUser(userName, password);      XAConnectionFactory connectionFactory = null;      if (isHa) {          if (collocated)              connectionFactory = XAHALocalConnectionFactory.create();          else {              String urlHa = "hajoram://" + hostName + ":" + serverPort;              connectionFactory = XAHATcpConnectionFactory.create(urlHa);          }      }  else {      if (collocated)        connectionFactory = XALocalConnectionFactory.create();      else              connectionFactory = XATcpConnectionFactory.create(hostName, serverPort);      }      ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().connectingTimer = connectingTimer;      ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().cnxPendingTimer = cnxPendingTimer;      ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory).getParameters().txPendingTimer = txPendingTimer;      if (queueMessageReadMax > 0) {        ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)            .getParameters().queueMessageReadMax = queueMessageReadMax;      }      if (topicAckBufferMax > 0) {        ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)            .getParameters().topicAckBufferMax = topicAckBufferMax;      }      if (topicPassivationThreshold > 0) {        ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)            .getParameters().topicPassivationThreshold = topicPassivationThreshold;      }      if (topicActivationThreshold > 0) {        ((org.objectweb.joram.client.jms.XAConnectionFactory) connectionFactory)            .getParameters().topicActivationThreshold = topicActivationThreshold;      }      XAConnection cnx =        connectionFactory.createXAConnection(userName, password);      if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))        AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                      this + " endpointActivation cnx = " + cnx);      // Creating and registering a consumer instance for this endpoint.      InboundConsumer consumer =        new InboundConsumer(workManager,                            endpointFactory,                            cnx,                            dest,                            specImpl.getMessageSelector(),                            durable,                            specImpl.getSubscriptionName(),                            transacted,                            maxWorks,                            maxMessages,                            ackMode);      consumers.put(specImpl, consumer);    }    catch (javax.jms.JMSSecurityException exc) {      throw new SecurityException("Invalid user identification: " + exc);    }    catch (javax.jms.JMSException exc) {      throw new CommException("Could not connect to the JORAM server: "

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -