⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2004 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import org.objectweb.joram.client.jms.connection.RequestMultiplexer;import org.objectweb.joram.client.jms.connection.Requestor;import org.objectweb.joram.shared.client.*;import java.util.*;import javax.jms.JMSException;import javax.jms.TransactionRolledBackException;import javax.jms.IllegalStateException;import javax.jms.MessageFormatException;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.util.Debug;/** * Implements the <code>javax.jms.Session</code> interface. */public class Session implements javax.jms.Session {  public static Logger logger =     Debug.getLogger(Session.class.getName());      public static final String RECEIVE_ACK =      "org.objectweb.joram.client.jms.receiveAck";  public static boolean receiveAck =      Boolean.getBoolean(RECEIVE_ACK);  /**   * Status of the session   */  private static class Status {    /**     * Status of the session     * when the connection is stopped.     * This is the initial status.     */    public static final int STOP = 0;    /**     * Status of the session when the connection     * is started.     */    public static final int START = 1;    /**     * Status of the connection when it is closed.     */    public static final int CLOSE = 2;    private static final String[] names = {      "STOP", "START", "CLOSE"};    public static String toString(int status) {      return names[status];    }  }  /**   * The way the session is used.   */  private static class SessionMode {    /**     * The session is still not used.     * This is the initial mode.     */    public static final int NONE = 0;    /**     * The session is used to     * synchronously receive messages.     */    public static final int RECEIVE = 1;    /**     * The session is used to asynchronously listen     * to messages.     */    public static final int LISTENER = 2;    /**     * The session is used by an application server.     */    public static final int APP_SERVER = 3;    private static final String[] names = {      "NONE", "RECEIVE", "LISTENER", "APP_SERVER"};    public static String toString(int status) {      return names[status];    }  }  /**   * The status of the current request.   * Only valid in the mode RECEIVE.   */    private static class RequestStatus {    /**     * No request. This is the initial status.     */    public static final int NONE = 0;    /**     * A request is running (pending).     */    public static final int RUN = 1;    /**     * The request is done.     */    public static final int DONE = 2;    private static final String[] names = {      "NONE", "RUN", "DONE"};    public static String toString(int status) {      return names[status];    }  }  /** Task for closing the session if it becomes pending. */  private SessionCloseTask closingTask;  /** <code>true</code> if the session's transaction is scheduled. */  private boolean scheduled;  /** The message listener of the session, if any. */  protected javax.jms.MessageListener messageListener;  /** The identifier of the session. */  private String ident;  /** The connection the session belongs to. */  private Connection cnx;  /** <code>true</code> if the session is transacted. */  boolean transacted;  /** The acknowledgement mode of the session. */  private int acknowledgeMode;  /** <code>true</code> if the session's acknowledgements are automatic. */  private boolean autoAck;  /** Vector of message consumers. */  private Vector consumers;  /** Vector of message producers. */  private Vector producers;  /** Vector of queue browsers. */  private Vector browsers;  /** FIFO queue holding the asynchronous server deliveries. */  private fr.dyade.aaa.util.Queue repliesIn;  /** Daemon distributing asynchronous server deliveries. */  private SessionDaemon daemon;  /** Counter of message listeners. */  private int listenerCount;  /**    * Table holding the <code>ProducerMessages</code> holding producers'   * messages and destinated to be sent at commit.   * <p>   * <b>Key:</b> destination name<br>   * <b>Object:</b> <code>ProducerMessages</code>   */  Hashtable sendings;  /**    * Table holding the identifiers of the messages delivered per   * destination or subscription, and not acknowledged.   * <p>   * <b>Key:</b> destination or subscription name<br>   * <b>Object:</b> <code>MessageAcks</code> instance   */  Hashtable deliveries;  /**   * The request multiplexer used to communicate   * with the user proxy.   */  private RequestMultiplexer mtpx;  /**   * The requestor used by the session    * to communicate   * with the user proxy.   */  private Requestor requestor;  /**   * The requestor used by the session    * to make 'receive'   * with the user proxy. This second requestor    * is necessary because it must be closed   * during the session close (see method close).   */  private Requestor receiveRequestor;  /**   * Indicates that the session has been    * recovered by a message listener.   * Doesn't need to be volatile because   * it is only used by the SessionDaemon thread.   */  private boolean recover;  /**   * Status of the session:   * STOP, START, CLOSE   */  private int status;  /**   * Mode of the session:   * NONE, RECEIVE, LISTENER, APP_SERVER   */  private int sessionMode;  /**   * Status of the request:   * NONE, RUN, DONE.   */  private int requestStatus;  /**   * The message consumer currently   * making a request (null if none).   */  private MessageConsumer pendingMessageConsumer;  /**   * The current active control thread.   */  private Thread singleThreadOfControl;  /**   * Status boolean indicating whether   * the message input is activated or not   * for the message listeners.   */  private boolean passiveMsgInput;    /**   * Used to synchronize the   * method close()   */  private Closer closer;    /**   * Indicates whether the messages produced are asynchronously   * sent or not (without or with acknowledgement)   */  private boolean asyncSend;  /**   * Maximum number of messages that can be   * read at once from a queue.   */  private int queueMessageReadMax;    /**   * Maximum number of acknowledgements   * that can be buffered in   * Session.DUPS_OK_ACKNOWLEDGE mode.   * Default is 0.   */  private int topicAckBufferMax;    /**   * This threshold is the maximum messages    * number over   * which the subscription is passivated.   *    */  private int topicPassivationThreshold;    /**   * This threshold is the minimum    * messages number below which   * the subscription is activated.   *    */  private int topicActivationThreshold;    private MessageConsumerListener messageConsumerListener;    /**   * Opens a session.   *   * @param cnx  The connection the session belongs to.   * @param transacted  <code>true</code> for a transacted session.   * @param acknowledgeMode  1 (auto), 2 (client) or 3 (dups ok).   *   * @exception JMSException  In case of an invalid acknowledge mode.   */  Session(Connection cnx,           boolean transacted,          int acknowledgeMode,          RequestMultiplexer mtpx)    throws JMSException {    if (! transacted         && acknowledgeMode != javax.jms.Session.AUTO_ACKNOWLEDGE        && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE        && acknowledgeMode != javax.jms.Session.DUPS_OK_ACKNOWLEDGE        && !(cnx instanceof XAQueueConnection)        && !(cnx instanceof XATopicConnection)        && !(cnx instanceof XAConnection))      throw new JMSException("Can't create a non transacted session with an"                             + " invalid acknowledge mode.");    this.ident = cnx.nextSessionId();    this.cnx = cnx;    this.transacted = transacted;    this.acknowledgeMode = acknowledgeMode;    this.mtpx = mtpx;    requestor = new Requestor(mtpx);    receiveRequestor = new Requestor(mtpx);    autoAck = ! transacted      && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE;    consumers = new Vector();    producers = new Vector();    browsers = new Vector();    repliesIn = new fr.dyade.aaa.util.Queue();    sendings = new Hashtable();    deliveries = new Hashtable();        closer = new Closer();    // If the session is transacted and the transactions limited by a timer,    // a closing task might be useful.    if (transacted && cnx.getTxPendingTimer() > 0) {      closingTask = new SessionCloseTask(        cnx.getTxPendingTimer() * 1000);    }        asyncSend = cnx.getAsyncSend();    queueMessageReadMax = cnx.getQueueMessageReadMax();    topicAckBufferMax = cnx.getTopicAckBufferMax();    topicActivationThreshold = cnx.getTopicActivationThreshold();    topicPassivationThreshold = cnx.getTopicPassivationThreshold();    setStatus(Status.STOP);    setSessionMode(SessionMode.NONE);    setRequestStatus(RequestStatus.NONE);  }  /**   * Sets the status of the session.   */  private void setStatus(int status) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.setStatus(" +         Status.toString(status) + ')');    this.status = status;  }  boolean isStarted() {    return (status == Status.START);  }  /**   * Sets the session mode.   */  private void setSessionMode(int sessionMode) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.setSessionMode(" +         SessionMode.toString(sessionMode) + ')');    this.sessionMode = sessionMode;  }  /**   * Sets the request status.   */  private void setRequestStatus(int requestStatus) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.setRequestStatus(" +         RequestStatus.toString(requestStatus) + ')');    this.requestStatus = requestStatus;  }    /**   * Checks if the session is closed.    * If true, an IllegalStateException   * is raised.   */    protected synchronized void checkClosed()     throws IllegalStateException {    if (status == Status.CLOSE)      throw new IllegalStateException(        "Forbidden call on a closed session.");  }  /**   * Checks if the calling thread is    * the thread of control. If not,    * an IllegalStateException is raised.   */  private synchronized void checkThreadOfControl()     throws IllegalStateException {    if (singleThreadOfControl != null &&        Thread.currentThread() != singleThreadOfControl)      throw new IllegalStateException("Illegal control thread");  }  /**   * Checks the session mode. If it is not    * the expected session mode, raises an illegal state   * exception.   *   * @param expectedSessionMode the expected session mode.   */  private void checkSessionMode(    int expectedSessionMode)     throws IllegalStateException {    if (sessionMode == SessionMode.NONE) {      setSessionMode(sessionMode);    } else if (sessionMode != expectedSessionMode) {      throw new IllegalStateException("Bad session mode");    }  }  /** Returns a String image of this session. */  public String toString() {    return "Sess:" + ident;  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public final int getAcknowledgeMode() throws JMSException {    checkClosed();    return getAckMode();  }    int getAckMode() {    if (transacted)      return Session.SESSION_TRANSACTED;    return acknowledgeMode;  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public synchronized final boolean getTransacted()     throws JMSException {    checkClosed();    return transacted;  }  /**   * set transacted.   * see connector ManagedConnectionImpl (Connector).   */  public void setTransacted(boolean t) {    if (status != Status.CLOSE) {      transacted = t;    }    // else should throw an exception but not expected in    // the connector.  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public synchronized void setMessageListener(    javax.jms.MessageListener messageListener)    throws JMSException {    checkSessionMode(SessionMode.APP_SERVER);    this.messageListener = messageListener;  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public synchronized javax.jms.MessageListener       getMessageListener()     throws JMSException {    return messageListener;  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -