⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - ScalAgent Distributed Technologies * Copyright (C) 1996 - Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): Nicolas Tachker (ScalAgent) */package com.scalagent.kjoram;import com.scalagent.kjoram.jms.*;import com.scalagent.kjoram.util.TimerTask;import java.util.*;import com.scalagent.kjoram.excepts.IllegalStateException;import com.scalagent.kjoram.excepts.*;public class Session{  public static final int SESSION_TRANSACTED = 0;  public static final int AUTO_ACKNOWLEDGE = 1;  public static final int CLIENT_ACKNOWLEDGE = 2;  public static final int DUPS_OK_ACKNOWLEDGE = 3;    /** Task for closing the session if it becomes pending. */  private TimerTask closingTask = null;  /** <code>true</code> if the session's transaction is scheduled. */  private boolean scheduled = false;  /** Timer for replying to expired consumers' requests. */  private com.scalagent.kjoram.util.Timer consumersTimer = null;  /** The message listener of the session, if any. */  protected MessageListener messageListener = null;  /** The identifier of the session. */  String ident;  /** The connection the session belongs to. */  Connection cnx;  /** <code>true</code> if the session is transacted. */  boolean transacted;  /** The acknowledgement mode of the session. */  int acknowledgeMode;  /** <code>true</code> if the session is closed. */  boolean closed = false;  /** <code>true</code> if the session is started. */  boolean started = false;  /** <code>true</code> if the session's acknowledgements are automatic. */  boolean autoAck;  /** Vector of message consumers. */  Vector consumers;  /** Vector of message producers. */  Vector producers;  /** Vector of queue browsers. */  Vector browsers;  /** FIFO queue holding the asynchronous server deliveries. */  com.scalagent.kjoram.util.Queue repliesIn;  /** Daemon distributing asynchronous server deliveries. */  SessionDaemon daemon = null;  /** Counter of message listeners. */  int msgListeners = 0;  /**    * Table holding the <code>ProducerMessages</code> holding producers'   * messages and destinated to be sent at commit.   * <p>   * <b>Key:</b> destination name<br>   * <b>Object:</b> <code>ProducerMessages</code>   */  Hashtable sendings;  /**    * Table holding the identifiers of the messages delivered per   * destination or subscription, and not acknowledged.   * <p>   * <b>Key:</b> destination or subscription name<br>   * <b>Object:</b> <code>MessageAcks</code> instance   */  Hashtable deliveries;  /** The connection consumer delivering messages to the session, if any. */  ConnectionConsumer connectionConsumer = null;  /**   * Opens a session.   *   * @param cnx  The connection the session belongs to.   * @param transacted  <code>true</code> for a transacted session.   * @param acknowledgeMode  1 (auto), 2 (client) or 3 (dups ok).   *   * @exception JMSException  In case of an invalid acknowledge mode.   */  Session(Connection cnx, boolean transacted,          int acknowledgeMode) throws JMSException  {    if (! transacted         && acknowledgeMode != Session.AUTO_ACKNOWLEDGE        && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE        && acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE)      throw new JMSException("Can't create a non transacted session with an"                             + " invalid acknowledge mode.");    this.ident = cnx.nextSessionId();    this.cnx = cnx;    this.transacted = transacted;    this.acknowledgeMode = acknowledgeMode;    autoAck = ! transacted              && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE;    consumers = new Vector();    producers = new Vector();    browsers = new Vector();    repliesIn = new com.scalagent.kjoram.util.Queue();    sendings = new Hashtable();    deliveries = new Hashtable();    // If the session is transacted and the transactions limited by a timer,    // a closing task might be useful.    if (transacted && cnx.factoryParameters.txPendingTimer != 0)      closingTask = new SessionCloseTask();    cnx.sessions.addElement(this);    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG,this + ": created.");  }  /** Returns a String image of this session. */  public String toString()  {    return "Sess:" + ident;  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public int getAcknowledgeMode() throws JMSException  {    return acknowledgeMode;  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public boolean getTransacted() throws JMSException  {    return transacted;  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public void setMessageListener(MessageListener messageListener)              throws JMSException  {    this.messageListener = messageListener;  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public MessageListener getMessageListener() throws JMSException  {    return messageListener;  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public Message createMessage() throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");        return new Message();  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public TextMessage createTextMessage() throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");        return new TextMessage();  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public TextMessage createTextMessage(String text)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");       TextMessage message =  new TextMessage();    message.setText(text);    return message;  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public BytesMessage createBytesMessage()         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");        return new BytesMessage();  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public MapMessage createMapMessage()         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new MapMessage();  }  /**   * API method   *   * @exception IllegalStateException  If the session is closed.   */  public QueueBrowser         createBrowser(Queue queue, String selector)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new QueueBrowser(this, (Queue) queue, selector);  }  /**   * API method   *   * @exception IllegalStateException  If the session is closed.   */  public QueueBrowser createBrowser(Queue queue)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new QueueBrowser(this, (Queue) queue, null);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the    *              connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  public MessageProducer createProducer(Destination dest)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new MessageProducer(this, (Destination) dest);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the   *              connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  public MessageConsumer         createConsumer(Destination dest, String selector,                        boolean noLocal) throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new MessageConsumer(this, (Destination) dest, selector, null,                               noLocal);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the   *              connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  public MessageConsumer         createConsumer(Destination dest, String selector)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new MessageConsumer(this, (Destination) dest, selector);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the   *              connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  public MessageConsumer createConsumer(Destination dest)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new MessageConsumer(this, (Destination) dest, null);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the    *              connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  public TopicSubscriber         createDurableSubscriber(Topic topic, String name,                                 String selector,                                 boolean noLocal) throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new TopicSubscriber(this, (Topic) topic, name, selector, noLocal);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the    *              connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  public TopicSubscriber         createDurableSubscriber(Topic topic, String name)         throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new TopicSubscriber(this, (Topic) topic, name, null, false);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   */  public Queue createQueue(String queueName) throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    return new Queue(queueName);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed.   * @exception JMSException  If the topic creation failed.   */  public Topic createTopic(String topicName) throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    // Checks if the topic to retrieve is the administration topic:    if (topicName.equals("#AdminTopic")) {      try {        GetAdminTopicReply reply =            (GetAdminTopicReply) cnx.syncRequest(new GetAdminTopicRequest());        if (reply.getId() != null)          return new Topic(reply.getId());        else          throw new JMSException("AdminTopic could not be retrieved.");      }      catch (JMSException exc) {        throw exc;      }      catch (Exception exc) {        throw new JMSException("AdminTopic could not be retrieved: " + exc);      }    }    return new Topic(topicName);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the   *              connection is broken.   * @exception JMSException  If the request fails for any other reason.   */  public TemporaryQueue createTemporaryQueue() throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    SessCreateTDReply reply =      (SessCreateTDReply) cnx.syncRequest(new SessCreateTQRequest());    String tempDest = reply.getAgentId();    return new TemporaryQueue(tempDest, cnx);  }  /**   * API method.   *   * @exception IllegalStateException  If the session is closed or if the   *              connection is broken.   * @exception JMSException  If the request fails for any other reason.   */  public TemporaryTopic createTemporaryTopic() throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed session.");    SessCreateTDReply reply =      (SessCreateTDReply) cnx.syncRequest(new SessCreateTTRequest());    String tempDest = reply.getAgentId();    return new TemporaryTopic(tempDest, cnx);  }  /** API method. */  public synchronized void run()  {    int load = repliesIn.size();    com.scalagent.kjoram.messages.Message momMsg;    String msgId;    String targetName = connectionConsumer.targetName;    boolean queueMode = connectionConsumer.queueMode;    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, "-- " + this                       + ": loaded with " + load                       + " message(s) and started.");    try {       // Processing the current number of messages in the queue:      for (int i = 0; i < load; i++) {        momMsg = (com.scalagent.kjoram.messages.Message) repliesIn.pop();        msgId = momMsg.getIdentifier();        // If no message listener has been set for the session, denying the        // processed message:        if (messageListener == null) {          JoramTracing.log(JoramTracing.ERROR,this + ": an"                           + " asynchronous delivery arrived for"                           + " a non existing session listener:"

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -