⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connection.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    }    // Catching an IllegalStateException if the connection is broken:    catch (IllegalStateException caughtISE) {      isE = caughtISE;    }    // At this point, the server won't deliver messages anymore,    // the connection just waits for the sessions to have finished their    // processings.    Session session;    for (int i = 0; i < sessions.size(); i++) {      session = (Session) sessions.elementAt(i);      try {        session.repliesIn.stop();      }      catch (InterruptedException iE) {}      session.stop();    }    started = false;    if (isE != null) {      JoramTracing.log(JoramTracing.ERROR, isE);      throw isE;    }    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": is stopped.");   }  /**   * API method for closing the connection; even if the connection appears   * to be broken, closes the sessions.   *   * @exception JMSException  Actually never thrown.   */  public void close() throws JMSException  {    // Ignoring the call if the connection is closed:    if (closed)      return;    closing = true;    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, "--- " + this                        + ": closing...");    // Finishing the timer, if any:    if (sessionsTimer != null)      sessionsTimer.cancel();    // Stopping the connection:    try {      stop();    }    // Catching a JMSException if the connection is broken:    catch (JMSException jE) {}    // Closing the sessions:    Session session;    while (! sessions.isEmpty()) {      session = (Session) sessions.elementAt(0);      try {        session.close();      }      // Catching a JMSException if the connection is broken:      catch (JMSException jE) {}    }    // Closing the connection consumers:    if (cconsumers != null) {      ConnectionConsumer cc;      while (! cconsumers.isEmpty()) {        cc = (ConnectionConsumer) cconsumers.elementAt(0);        cc.close();      }    }        // Closing the connection:    connectionImpl.close();    // Shutting down the driver, if needed:    if (! driver.stopping)      driver.stop();    requestsTable.clear();    requestsTable = null;    repliesTable.clear();    repliesTable = null;    closed = true;    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");  }  /** Returns a new request identifier. */  synchronized int nextRequestId()  {    if (requestsC == Integer.MAX_VALUE)      requestsC = 0;    return requestsC++;  }  /** Returns a new session identifier. */  synchronized String nextSessionId()  {    if (sessionsC == Integer.MAX_VALUE)      sessionsC = 0;    sessionsC++;    return "c" + key + "s" + sessionsC;  }   /** Returns a new message identifier. */  synchronized String nextMessageId()  {    if (messagesC == Integer.MAX_VALUE)      messagesC = 0;    messagesC++;    return "ID:" + proxyId + "c" + key + "m" + messagesC;  }  /** Returns a new subscription name. */  synchronized String nextSubName()  {    if (subsC == Integer.MAX_VALUE)      subsC = 0;    subsC++;    return "c"  + key + "sub" + subsC;  }  /** Schedules a session task to the connection's timer. */  synchronized void schedule(com.scalagent.kjoram.util.TimerTask task)  {    if (sessionsTimer == null)      return;    try {      sessionsTimer.schedule(task, factoryParameters.txPendingTimer * 1000);    }    catch (Exception exc) {}  }    /**   * Method sending a synchronous request to the server and waiting for an   * answer.   *   * @exception IllegalStateException  If the connection is closed or broken.   * @exception JMSSecurityException  When sending a request to a destination   *              not accessible because of security.   * @exception InvalidDestinationException  When sending a request to a   *              destination that no longer exists.   * @exception JMSException  If the request failed for any other reason.   */  AbstractJmsReply syncRequest(AbstractJmsRequest request) throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed"                                      + " connection.");    if (request.getRequestId() == -1)      request.setRequestId(nextRequestId());    int requestId = request.getRequestId();    try {      if (JoramTracing.dbgClient)        JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "                         + request.getClass().getName()                         + " with id: " + requestId);      Lock lock = new Lock();      requestsTable.put(request.getKey(), lock);      synchronized(lock) {        connectionImpl.send(request);        while (true) {          try {            lock.wait();            break;          }          catch (InterruptedException iE) {            if (JoramTracing.dbgClient)              JoramTracing.log(JoramTracing.WARN,this                               + ": caught InterruptedException");            continue;          }        }        requestsTable.remove(request.getKey());      }    }    // Catching an exception because of...    catch (Exception e) {      JMSException jE = null;      if (e instanceof JMSException)        throw (JMSException) e;      else        jE = new JMSException("Exception while getting a reply.");      jE.setLinkedException(e);      // Unregistering the request:      if (requestsTable != null)        requestsTable.remove(request.getKey());      JoramTracing.log(JoramTracing.ERROR, jE);      throw jE;    }    // Finally, returning the reply:    AbstractJmsReply reply =      (AbstractJmsReply) repliesTable.remove(request.getKey());    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": got reply.");    // If the reply is null, it means that the requester has been unlocked    // by the driver because it detected a connection failure:    if (reply == null)      throw new IllegalStateException("Connection is broken.");    // Else, if the reply notifies of an error: throwing the appropriate exc:     else if (reply instanceof MomExceptionReply) {      MomException mE = ((MomExceptionReply) reply).getException();      if (mE instanceof AccessException)        throw new JMSSecurityException(mE.getMessage());      else if (mE instanceof DestinationException)        throw new InvalidDestinationException(mE.getMessage());      else        throw new JMSException(mE.getMessage());    }    // Else: returning the reply:    else      return reply;  }  /**   * Actually sends an asynchronous request to the server.   *   * @exception IllegalStateException  If the connection is closed or broken.   */  void asyncRequest(AbstractJmsRequest request) throws IllegalStateException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed"                                      + " connection.");    if (request.getRequestId() == -1)      request.setRequestId(nextRequestId());    try {      if (JoramTracing.dbgClient)        JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: "                         + request.getClass().getName()                         + " with id: " + request.getRequestId());      connectionImpl.send(request);    }    // In the case of a broken connection:    catch (IllegalStateException exc) {      // Removes the potentially stored requester:      requestsTable.remove(request.getKey());      JoramTracing.log(JoramTracing.ERROR, exc);      throw exc;    }  }  /**   * Method called by the driver for distributing the server replies   * it gets on the connection.   * <p>   * Server replies are either synchronous replies to client requests,   * or asynchronous message deliveries, or asynchronous exceptions   * notifications.   */  void distribute(AbstractJmsReply reply)  {    // Getting the correlation identifier:    int correlationId = reply.getCorrelationId();    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": got reply: "                       + correlationId);    Object obj = null;    if (correlationId != -1)      obj = requestsTable.get(reply.getKey());    // If the request is a synchronous request, putting the reply in the    // replies table and unlocking the requester:    if (obj instanceof Lock) {      repliesTable.put(reply.getKey(), reply);      synchronized(obj) {        obj.notify();      }    }    // If the reply is an asynchronous exception, passing it:    else if (reply instanceof MomExceptionReply) {      // Removing the potential consumer object from the table:      requestsTable.remove(reply.getKey());      MomException mE = ((MomExceptionReply) reply).getException();      JMSException jE = null;      if (mE instanceof AccessException)        jE = new JMSSecurityException(mE.getMessage());      else if (mE instanceof DestinationException)        jE = new InvalidDestinationException(mE.getMessage());      else        jE = new JMSException(mE.getMessage());      onException(jE);    }    // Else, if the reply is an asynchronous delivery:    else if (obj != null) {      try {        // Passing the reply to its consumer:        if (obj instanceof ConnectionConsumer)          ((ConnectionConsumer) obj).repliesIn.push(reply);        else if (obj instanceof MessageConsumer)          ((MessageConsumer) obj).sess.repliesIn.push(reply);      }      catch (StoppedQueueException sqE) {        denyDelivery((ConsumerMessages) reply);      }    }    // Finally, if the requester disappeared, denying the delivery:    else if (reply instanceof ConsumerMessages)      denyDelivery((ConsumerMessages) reply);  }  /** Actually denies a non deliverable delivery. */  private void denyDelivery(ConsumerMessages delivery)  {    Vector msgs = delivery.getMessages();    com.scalagent.kjoram.messages.Message msg;    Vector ids = new Vector();    for (int i = 0; i < msgs.size(); i++) {      msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i);      ids.addElement(msg.getIdentifier());    }    if (ids.isEmpty())      return;    try {      // Sending the denying as an asynchronous request, as no synchronous      // behaviour is expected here:      asyncRequest(new SessDenyRequest(delivery.comesFrom(), ids,                                       delivery.getQueueMode(), true));    }    // If sthg goes wrong while denying, nothing more can be done!    catch (JMSException jE) {}  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -