⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 inboundconsumer.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2004 - Bull SA * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (Bull SA) * Contributor(s): Nicolas Tachker (Bull SA) */package org.objectweb.joram.client.connector;import javax.jms.*;import javax.resource.NotSupportedException;import javax.resource.ResourceException;import javax.resource.spi.CommException;import javax.resource.spi.SecurityException;import javax.resource.spi.endpoint.MessageEndpointFactory;import javax.resource.spi.work.WorkManager;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;/** * An <code>InboundConsumer</code> instance is responsible for consuming * messages from a given JORAM destination and through a given JORAM * connection. */class InboundConsumer implements javax.jms.ServerSessionPool{  /** Application server's <code>WorkManager</code> instance. */  private WorkManager workManager;  /** Application's endpoints factory. */  private MessageEndpointFactory endpointFactory;   /** The provided connection to the underlying JORAM server. */  private XAConnection cnx;  /** The durable subscription name, if provided. */  private String subName = null;  /** <code>true</code> if message consumption occurs in a transaction. */  private boolean transacted;  /** Maximum number of Work instances to be submitted (0 for infinite). */  private int maxWorks;    private int ackMode;  /** Wrapped <code>ConnectionConsumer</code> instance. */  private ConnectionConsumer cnxConsumer;  /** Number of created server sessions. */  private int serverSessions = 0;  /** Pool of server sessions. */  private Vector pool;  /**   * Constructs an <code>InboundConsumer</code> instance.   *   * @param workManager      Application server's <code>WorkManager</code>   *                         instance.   * @param endpointFactory  Application's endpoints factory.    * @param cnx              Connection to the JORAM server.   * @param dest             Destination to get messages from.   * @param selector         Selector for filtering messages.   * @param durable          <code>true</code> for durably subscribing.   * @param subName          Durable subscription name.   * @param transacted       <code>true</code> if deliveries will occur in a   *                         XA transaction.   * @param maxWorks         Max number of Work instances to be submitted.   *   * @exception NotSupportedException  If the activation parameters are   *                                   invalid.   * @exception SecurityException      If the target destination is not   *                                   readable.   * @exception CommException          If the connection with the JORAM server   *                                   is lost.   * @exception ResourceException      Generic exception.   */  InboundConsumer(WorkManager workManager,                  MessageEndpointFactory endpointFactory,                  XAConnection cnx,                  Destination dest,                  String selector,                  boolean durable,                  String subName,                  boolean transacted,                  int maxWorks,                  int maxMessages,                  int ackMode) throws ResourceException {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, "InboundConsumer(" + workManager +                                     ", " + endpointFactory +                                    ", " + cnx +                                    ", " + dest +                                    ", " + selector +                                    ", " + durable +                                    ", " + subName +                                    ", " + transacted +                                    ", " + maxWorks +                                    ", " + maxMessages +                                     "," + ackMode + ")");        this.workManager = workManager;    this.endpointFactory = endpointFactory;    this.cnx = cnx;    this.transacted = transacted;    this.ackMode = ackMode;    if (maxWorks < 0) maxWorks = 0;    this.maxWorks = maxWorks;    pool = new Vector(maxWorks);    try {      if (durable) {        if (! (dest instanceof javax.jms.Topic))          throw new NotSupportedException("Can't set a durable subscription "                                          + "on a JMS queue.");        if (subName == null)          throw new NotSupportedException("Missing durable "                                          + "subscription name.");        this.subName = subName;        cnxConsumer =          cnx.createDurableConnectionConsumer((javax.jms.Topic) dest,                                              subName,                                              selector,                                              this,                                              maxMessages);      } else {        cnxConsumer = cnx.createConnectionConsumer(dest,                                                   selector,                                                   this,                                                   maxMessages);      }            cnx.start();    }    catch (JMSSecurityException exc) {      throw new SecurityException("Target destination not readble: "                                  + exc);    }    catch (javax.jms.IllegalStateException exc) {      throw new CommException("Connection with the JORAM server is lost.");    }    catch (JMSException exc) {      throw new ResourceException("Could not set asynchronous consumer: "                                  + exc);    }  }  /**   * Provides a new <code>InboundSession</code> instance for processing   * incoming messages.   *   * @exception JMSException  Never thrown.   */  public ServerSession getServerSession()     throws JMSException {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getServerSession()");    try {      synchronized (pool) {        if (pool.isEmpty()) {          if (maxWorks > 0) {            if (serverSessions < maxWorks) {              // Allocates a new ServerSession              return newSession();            } else {              // Wait for a free ServerSession              if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))                AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                              "ServerSessionPool waits for "                                              + "a free ServerSession.");              pool.wait();              return (ServerSession) pool.remove(0);            }          } else {            // Allocates a new ServerSession            return newSession();          }        } else {          return (ServerSession) pool.remove(0);        }      }    } catch (Exception exc) {      throw new JMSException("Error while getting server session from pool: "                             + exc);    }  }    private InboundSession newSession() {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,                                    "ServerSessionPool provides "                                    + "new ServerSession.");    serverSessions++;    return new InboundSession(this,                              workManager,                              endpointFactory,                              cnx,                              transacted,                              ackMode);  }  /** Releases an <code>InboundSession</code> instance. */  void releaseSession(InboundSession session) {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " releaseSession(" + session + ")");    try {      synchronized (pool) {        pool.add(session);        pool.notify();      }    } catch (Exception exc) {}  }  /** Closes the consumer. */  void close() {    if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))      AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " close()");    try {      cnxConsumer.close();      if (subName != null) {        Session session = cnx.createSession(true, 0);        session.unsubscribe(subName);      }      cnx.close();    }    catch (JMSException exc) {}  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -