⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientsubscription.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
  /**    * Reactivates the subscription.   *   * @param context  Re-activation context.   * @param reqId  Re-activation request identifier.   * @param topicId  Topic identifier.   * @param selector  Selector for filtering messages.   * @param noLocal  <code>true</code> for not consuming messages published   *          within the same proxy's context.   */  void reactivate(int contextId,                  int reqId,                  AgentId topicId,                  String selector,                  boolean noLocal)  {    this.contextId = contextId;    this.subRequestId = reqId;    this.topicId = topicId;    this.selector = selector;    this.noLocal = noLocal;    noFiltering = (! noLocal) && (selector == null || selector.equals(""));    active = true;    requestId = -1;    toListener = false;        // Some updated attributes are persistent    save();    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ": reactivated.");  }  /** De-activates the subscription, denies the non acknowledgded messages. */    void deactivate() {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              "ClientSubscription.deactivate()");    unsetListener();    unsetReceiver();    active = false;       // Denying all delivered messages:    deny(deliveredIds.keys());    deliveredIds.clear();        // deliveredIds is persistent    save();    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ": deactivated.");  }  void setActive(boolean active) {    this.active = active;  }  /**   * Sets a listener.   *   * @param requestId  Identifier of the listener request.   */     void setListener(int requestId)  {    this.requestId = requestId;    toListener = true;    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ": listener set.");  }  /** Unsets the listener. */  void unsetListener()  {    requestId = -1;    toListener = false;    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ": listener unset.");  }  /**   * Sets a receiver request.   *   * @param requestId  Identifier of the "receive" request.   * @param timeToLive  Request's time to live value.   */  void setReceiver(int requestId, long timeToLive) {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ".setReceiver(" + requestId +                               "," + timeToLive + ")");    this.requestId = requestId;    toListener = false;    if (timeToLive > 0)      requestExpTime = System.currentTimeMillis() + timeToLive;    else      requestExpTime = 0;  }  /** Unsets a receiver request. */  void unsetReceiver() {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ".unsetReceiver()");    requestId = -1;    requestExpTime = 0;  }  /** Sets the subscription's dead message queue identifier. */  void setDMQId(AgentId dmqId)  {    this.dmqId = dmqId;    save();  }  /** Sets the subscription's threshold value. */  void setThreshold(Integer threshold)  {    this.threshold = threshold;    save();  }    /**   * Browses messages and keeps those which will have to be delivered   * to the subscriber.   */  void browseNewMessages(Vector newMessages) {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                              this + ".browseNewMessages(" +                               newMessages + ')');    // Browsing the messages one by one.    Message message;    String msgId;    for (Enumeration e = newMessages.elements(); e.hasMoreElements();) {      message = (Message) e.nextElement();      msgId = message.getIdentifier();      // test nbMaxMsg      if (nbMaxMsg > -1 && nbMaxMsg <= messageIds.size()) {        ClientMessages deadMessages = new ClientMessages();        deadMessages.addMessage(message);        sendToDMQ(deadMessages);        continue;      }      // Keeping the message if filtering is successful.      if (noFiltering          || (Selector.matches(message, selector)              && (! noLocal                  || ! msgId.startsWith(proxyId.toString().substring(1) + "c" + contextId + "m", 3)))) {        // It's the first delivery, adds the message to the proxy's table        if (message.acksCounter == 0)          messagesTable.put(msgId, message);        message.acksCounter++;        if (durable)          message.durableAcksCounter++;        messageIds.add(msgId);        save();        if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))          MomTracing.dbgProxy.log(BasicLevel.DEBUG,                                  this + ": added msg " + msgId + " for delivery.");      }    }  }  /**   * Launches a delivery sequence, either for a listener, or for a receiver.   */  ConsumerMessages deliver() {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(        BasicLevel.DEBUG,        "ClientSubscription[" + proxyId + ',' +         topicId + ',' + name + "].deliver()");    // Returning null if no request exists:    if (requestId == -1)      return null;     // Returning null if a "receive" request has expired:    if (! toListener        && requestExpTime > 0        && System.currentTimeMillis() >= requestExpTime) {      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                      this + ": receive request " + requestId                                      + " expired.");      requestId = -1;      requestExpTime = 0;      return null;    }    String id;    Message message;    Integer deliveryAttempts = null;    int lastPrior = -1;    int insertionIndex = -1;    int prior;    Vector deliverables = new Vector();    ClientMessages deadMessages = null;    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,           " -> messageIds.size() = " + messageIds.size());        // Delivering to a listener.    if (toListener) {      // Browsing the identifiers of the messages to deliver.      while (! messageIds.isEmpty()) {        id = (String) messageIds.remove(0);        save();        message = (Message) messagesTable.get(id);        // Message still exists.        if (message != null) {          // Delivering it if valid.          if (message.isValid(System.currentTimeMillis())) {            deliveredIds.put(id, id);            // Setting the message's deliveryCount and denied fields.            deliveryAttempts = (Integer) deniedMsgs.get(id);            if (deliveryAttempts == null)              message.deliveryCount = 1;            else {              message.deliveryCount = deliveryAttempts.intValue() + 1;              message.denied = true;            }            // Inserting it according to its priority.            if (lastPrior == -1 || message.getPriority() == lastPrior)              insertionIndex++;            else {              insertionIndex = 0;              while (insertionIndex < deliverables.size()) {                prior =                  ((Message) deliverables.get(insertionIndex)).getPriority();                if (prior >= message.getPriority())                  insertionIndex++;                else                  break;              }            }            lastPrior = message.getPriority();            deliverables.insertElementAt(message.clone(), insertionIndex);            if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))              MomTracing.dbgProxy.log(BasicLevel.DEBUG,                                       this + ": message " + id                                      + " added for delivery.");          }          // Invalid message: removing and adding it to the vector of dead          // messages.          else {            messagesTable.remove(id);            // Deleting the message, if needed.            if (durable)              message.delete();            // Setting the message's deliveryCount, denied and expired fields.            deliveryAttempts = (Integer) deniedMsgs.remove(id);            if (deliveryAttempts != null) {              message.deliveryCount = deliveryAttempts.intValue();              message.denied = true;            }            message.expired = true;            if (deadMessages == null)              deadMessages = new ClientMessages();            deadMessages.addMessage(message);          }        }        // Message has already been deleted.        else          deniedMsgs.remove(id);      }    }    // Delivering to a receiver: getting the highest priority message.    else {      int highestP = -1;      Message keptMsg = null;      // Browsing the non delivered messages.      int i = 0;      while (i < messageIds.size()) {        id = (String) messageIds.elementAt(i);        message = (Message) messagesTable.get(id);        if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))          MomTracing.dbgProxy.log(            BasicLevel.DEBUG, " -> message = " + message);                // Message still exists.        if (message != null) {          // Checking valid message.          if (message.isValid(System.currentTimeMillis())) {            if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))              MomTracing.dbgProxy.log(                BasicLevel.DEBUG, " -> valid message");            // Higher priority: keeping the message.            if (message.getPriority() > highestP) {              highestP = message.getPriority();              keptMsg = message;            }            // get next message            i++;          }          // Invalid message: removing and adding it to the vector of dead          // messages.          else {            if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))              MomTracing.dbgProxy.log(                BasicLevel.DEBUG, " -> invalid message");            messageIds.remove(id);            save();            messagesTable.remove(id);            // Deleting the message, if needed.            if (durable)              message.delete();            // Setting the message's deliveryCount, denied and expired fields.            deliveryAttempts = (Integer) deniedMsgs.remove(id);            if (deliveryAttempts != null) {              message.deliveryCount = deliveryAttempts.intValue();              message.denied = true;            }            message.expired = true;            deadMessages = new ClientMessages();            deadMessages.addMessage(message);          }        }        // Message has already been deleted.        else {          if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))            MomTracing.dbgProxy.log(               BasicLevel.DEBUG, " -> deleted message " + id);          messageIds.remove(id);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -