⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientsubscription.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
          deniedMsgs.remove(id);          save();        }      }      // Putting the kept message in the vector.      if (keptMsg != null) {        messageIds.remove(keptMsg.getIdentifier());        deliveredIds.put(keptMsg.getIdentifier(), keptMsg.getIdentifier());        save();        // Setting the message's deliveryCount and denied fields.        deliveryAttempts = (Integer) deniedMsgs.get(keptMsg.getIdentifier());        if (deliveryAttempts == null)          keptMsg.deliveryCount = 1;        else {          keptMsg.deliveryCount = deliveryAttempts.intValue() + 1;          keptMsg.denied = true;        }        deliverables.add(keptMsg.clone());        if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))          MomTracing.dbgProxy.log(BasicLevel.DEBUG,                                   this + ": message " + keptMsg.getIdentifier()                                  + " added for delivery.");      } else {        i++;      }    }       // Sending the dead messages to the DMQ, if any:    if (deadMessages != null)      sendToDMQ(deadMessages);    // Finally, returning the reply or null:    if (! deliverables.isEmpty()) {      ConsumerMessages consM = new ConsumerMessages(requestId,                                                    deliverables,                                                    name,                                                    false);      if (! toListener)        requestId = -1;      return consM;    }    return null;  }  /**   * Acknowledges messages.   */  void acknowledge(Enumeration acks) {    while (acks.hasMoreElements()) {      String id = (String) acks.nextElement();      acknowledge(id);    }  }  void acknowledge(String id) {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                               this + ": acknowledges message: " + id);        deliveredIds.remove(id);    deniedMsgs.remove(id);    save();    Message msg = (Message) messagesTable.get(id);        // Message may be null if it is not valid anymore    if (msg != null) {      msg.acksCounter--;      if (msg.acksCounter == 0)        messagesTable.remove(id);      if (durable) {        msg.durableAcksCounter--;                if (msg.durableAcksCounter == 0)          msg.delete();      }    }  }  /**   * Denies messages.   */  void deny(Enumeration denies) {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(BasicLevel.DEBUG,                               this + ".deny(" + denies + ')');    String id;    Message msg;    ClientMessages deadMessages = null;    int deliveryAttempts = 1;    int i;    String currentId;    long currentO;    denyLoop:    while (denies.hasMoreElements()) {      id = (String) denies.nextElement();      String deliveredMsgId = (String)deliveredIds.remove(id);      if (deliveredMsgId == null) {        if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))          MomTracing.dbgProxy.log(BasicLevel.DEBUG,                                   this + ": cannot denies message: " + id);        continue denyLoop;      }      save();            if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgProxy.log(BasicLevel.DEBUG,                                 this + ": denies message: " + id);            msg = (Message) messagesTable.get(id);            // Message may be null if it is not valid anymore      if (msg == null) continue denyLoop;            Integer value = (Integer) deniedMsgs.get(id);      if (value != null)        deliveryAttempts = value.intValue() + 1;            // If maximum delivery attempts reached, the message is no more      // deliverable to this sbscriber.      if (isUndeliverable(deliveryAttempts)) {        deniedMsgs.remove(id);        msg.deliveryCount = deliveryAttempts;        msg.undeliverable = true;        if (deadMessages == null)          deadMessages = new ClientMessages();        deadMessages.addMessage(msg);                msg.acksCounter--;        if (msg.acksCounter == 0)          messagesTable.remove(id);                if (durable) {          msg.durableAcksCounter--;          if (msg.durableAcksCounter == 0)            msg.delete();        }      }      // Else, putting it back to the deliverables vector according to its      // original delivery order, and adding a new entry for it in the      // denied messages table.      else {        if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))          MomTracing.dbgProxy.log(BasicLevel.DEBUG,                                   " -> put back to the messages to deliver");                i = 0;        insertLoop:        while (i < messageIds.size()) {          currentId = (String) messageIds.elementAt(i);          Message currentMessage = (Message) messagesTable.get(currentId);                      // Message may be null if it is not valid anymore          if (currentMessage != null) {            currentO = currentMessage.order;            if (currentO > msg.order) {              break insertLoop;            } else {              i++;            }          } else {            // Remove the invalid message            messageIds.removeElementAt(i);          }        }                messageIds.insertElementAt(id, i);        deniedMsgs.put(id, new Integer(deliveryAttempts));      }    }    // Sending dead messages to the DMQ, if needed:    if (deadMessages != null)      sendToDMQ(deadMessages);  }  /**   * Decreases the subscription's messages acknowledgement expectations,   * deletes those not to be consumed anymore.   */  void delete()  {    for (Enumeration e = deliveredIds.keys(); e.hasMoreElements();)      messageIds.add(e.nextElement());    save();        String id;    Message msg;    for (Enumeration allMessageIds = messageIds.elements();         allMessageIds.hasMoreElements();) {      id = (String) allMessageIds.nextElement();      msg = (Message) messagesTable.get(id);      if (msg != null) {        msg.acksCounter--;        if (msg.acksCounter == 0)          messagesTable.remove(id);        if (durable) {          msg.durableAcksCounter--;          if (msg.durableAcksCounter == 0)            msg.delete();        }      }    }  }    /**   * Returns <code>true</code> if a given value matches the threshold value   * for this user.   */  private boolean isUndeliverable(int deliveryAttempts)  {    if (threshold != null)      return deliveryAttempts == threshold.intValue();    else if (DeadMQueueImpl.getDefaultThreshold() != null)      return deliveryAttempts == DeadMQueueImpl.getDefaultThreshold().intValue();    return false;  }  /**   * Method used for sending messages to the appropriate dead message queue.   */  private void sendToDMQ(ClientMessages messages)  {    if (dmqId != null)      Channel.sendTo(dmqId, messages);    else if (DeadMQueueImpl.getId() != null)      Channel.sendTo(DeadMQueueImpl.getId(), messages);  }  Message getMessage(String msgId) {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(        BasicLevel.DEBUG,         "ClientSubscription.getMessage(" + msgId + ')');    int index = messageIds.indexOf(msgId);    if (index < 0) {      // The message has been delivered      if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgProxy.log(          BasicLevel.DEBUG, " -> message not found");      return null;    } else {      return (Message) messagesTable.get(msgId);    }  }  void deleteMessage(String msgId) {    messageIds.remove(msgId);    save();    Message msg = removeMessage(msgId);    if (msg != null) {      ClientMessages deadMessages = new ClientMessages();      deadMessages.addMessage(msg);      sendToDMQ(deadMessages);    }  }  void clear() {    ClientMessages deadMessages = null;    for (int i = 0; i < messageIds.size(); i++) {      String msgId = (String)messageIds.elementAt(i);      Message msg = removeMessage(msgId);      if (msg != null) {        if (deadMessages == null) {          deadMessages = new ClientMessages();        }        deadMessages.addMessage(msg);      }    }    if (deadMessages != null) {      sendToDMQ(deadMessages);    }    messageIds.clear();    save();  }  /**   * Removes a particular pending message in the subscription.   * The message is pointed out through its unique identifier.   *   * @param msgId    The unique message's identifier.   */  Message removeMessage(String msgId) {    Message msg = (Message) messagesTable.get(msgId);    if (msg != null) {      msg.acksCounter--;      if (msg.acksCounter == 0)        messagesTable.remove(msgId);      if (durable) {        msg.durableAcksCounter--;        if (msg.durableAcksCounter == 0)          msg.delete();      }    }    return msg;  }    private void save() {    if (durable) proxy.setSave();  }  public void readBag(ObjectInputStream in)     throws IOException, ClassNotFoundException {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(        BasicLevel.DEBUG,        "ClientSubscription[" +         proxyId +         "].readbag()");    contextId = in.readInt();    subRequestId = in.readInt();    noLocal = in.readBoolean();    noFiltering = in.readBoolean();    active = in.readBoolean();    requestId = in.readInt();    toListener = in.readBoolean();    requestExpTime = in.readLong();  }  public void writeBag(ObjectOutputStream out)    throws IOException {    if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgProxy.log(        BasicLevel.DEBUG,        "ClientSubscription[" +         proxyId +         "].writeBag()");    out.writeInt(contextId);    out.writeInt(subRequestId);    out.writeBoolean(noLocal);    out.writeBoolean(noFiltering);    out.writeBoolean(active);    out.writeInt(requestId);    out.writeBoolean(toListener);    out.writeLong(requestExpTime);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -