⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 proxyimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    proxyAgent.setSave();    //setCtx(cKey);    // Denying the non acknowledged messages:    AgentId id;    for (Enumeration ids = activeCtx.getDeliveringQueues(); ids        .hasMoreElements();) {      id = (AgentId) ids.nextElement();      proxyAgent.sendNot(id, new DenyRequest(key));    }    // Removing or deactivating the subscriptions:    String subName = null;    ClientSubscription sub;    Vector topics = new Vector();    for (Enumeration subs = activeCtx.getActiveSubs(); subs.hasMoreElements();) {      subName = (String) subs.nextElement();      sub = (ClientSubscription) subsTable.get(subName);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Deactivate subscription "            + subName + ", topic id = " + sub.getTopicId());      if (sub.getDurable()) {        sub.deactivate();        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG, "Durable subscription"              + subName + " de-activated.");      } else {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG, " -> topicsTable = "              + topicsTable);        sub.delete();        subsTable.remove(subName);        TopicSubscription tSub = (TopicSubscription) topicsTable.get(sub            .getTopicId());        tSub.removeSubscription(subName);        if (!topics.contains(sub.getTopicId()))          topics.add(sub.getTopicId());        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG, "Temporary subscription"              + subName + " deleted.");      }    }    // Browsing the topics which at least have one subscription removed.    for (Enumeration topicIds = topics.elements(); topicIds.hasMoreElements();)      updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1);    // Deleting the temporary destinations:    AgentId destId;    for (Enumeration dests = activeCtx.getTempDestinations(); dests        .hasMoreElements();) {      destId = (AgentId) dests.nextElement();      activeCtx.removeTemporaryDestination(destId);      deleteTemporaryDestination(destId);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Deletes temporary"            + " destination " + destId.toString());    }    // Saving the prepared transactions.    Enumeration xids = activeCtx.getTxIds();    Xid xid;    XACnxPrepare recoveredPrepare;    XACnxPrepare prepare;    while (xids.hasMoreElements()) {      if (recoveredTransactions == null)        recoveredTransactions = new Hashtable();      xid = (Xid) xids.nextElement();      recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid);      prepare = activeCtx.getTxPrepare(xid);      if (recoveredPrepare == null)        recoveredTransactions.put(xid, prepare);      else {        recoveredPrepare.getSendings().addAll(prepare.getSendings());        recoveredPrepare.getAcks().addAll(prepare.getAcks());      }    }    // Finally, deleting the context:    contexts.remove(new Integer(key));    activeCtx = null;    setActiveCtxId(-1);    CnxCloseReply reply = new CnxCloseReply();    reply.setCorrelationId(req.getRequestId());    proxyAgent.sendToClient(key, reply);  }  private void doReact(int key, ActivateConsumerRequest req) {    String subName = req.getTarget();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    sub.setActive(req.getActivate());  }    private void doReact(int key, CommitRequest req) {    // The commit may involve some local agents    int asyncReplyCount = 0;        Enumeration pms = req.getProducerMessages();    if (pms != null) {      while (pms.hasMoreElements()) {        ProducerMessages pm = (ProducerMessages) pms.nextElement();        AgentId destId = AgentId.fromString(pm.getTarget());        ClientMessages not = new ClientMessages(key,             req.getRequestId(), pm.getMessages());        setDmq(not);            if (destId.getTo() == proxyAgent.getId().getTo()) {          // local sending          not.setPersistent(false);          if (req.getAsyncSend()) {            not.setAsyncSend(true);          } else {            asyncReplyCount++;          }        }        proxyAgent.sendNot(destId, not);      }    }        Enumeration acks = req.getAckRequests();    if (acks != null) {      while (acks.hasMoreElements()) {        SessAckRequest sar = (SessAckRequest) acks.nextElement();        if (sar.getQueueMode()) {          AgentId qId = AgentId.fromString(sar.getTarget());          Vector ids = sar.getIds();          AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, req              .getRequestId(), ids);          if (qId.getTo() == proxyAgent.getId().getTo()) {            // local sending            not.setPersistent(false);            // No reply to wait for          }          proxyAgent.sendNot(qId, not);        } else {          String subName = sar.getTarget();          ClientSubscription sub = (ClientSubscription) subsTable.get(subName);          if (sub != null) {            sub.acknowledge(sar.getIds().elements());            proxyAgent.setSave();          }        }      }    }       if (!req.getAsyncSend()) {      if (asyncReplyCount == 0) {        proxyAgent.sendNot(proxyAgent.getId(), new SendReplyNot(key, req            .getRequestId()));      } else {        // we need to wait for the replies        // from the local agents        // before replying to the client.        activeCtx.addMultiReplyContext(req.getRequestId(), asyncReplyCount);      }    }    // else the client doesn't expect any ack  }  /**   * Distributes the JMS replies to the appropriate reactions.   * <p>   * JMS proxies react the following replies:   * <ul>   * <li><code>QueueMsgReply</code></li>   * <li><code>BrowseReply</code></li>   * <li><code>SubscribeReply</code></li>   * <li><code>TopicMsgsReply</code></li>   * <li><code>ExceptionReply</code></li>   * </ul>   */  private void doFwd(AgentId from, AbstractReply rep)  {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                              "--- " + this + " got " +                              rep.getClass().getName() +                              " with id: " + rep.getCorrelationId() +                              " from: " + from);    if (rep instanceof QueueMsgReply)      doFwd(from, (QueueMsgReply) rep);    else if (rep instanceof BrowseReply)      doFwd((BrowseReply) rep);    else if (rep instanceof SubscribeReply)      doFwd((SubscribeReply) rep);    else if (rep instanceof TopicMsgsReply)      doFwd(from, (TopicMsgsReply) rep);    else if (rep instanceof ExceptionReply)      doReact(from, (ExceptionReply) rep);    else {      if (logger.isLoggable(BasicLevel.WARN))        logger.log(BasicLevel.WARN, "Unexpected reply!");    }  }  /**   * Actually forwards a <code>QueueMsgReply</code> coming from a destination   * as a <code>ConsumerMessages</code> destinated to the requesting client.   * <p>   * If the corresponding context is stopped, stores the   * <code>ConsumerMessages</code> for later delivery.   */  private void doFwd(AgentId from, QueueMsgReply rep)  {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                              "ProxyImpl.doFwd(" + from + ',' + rep + ')');        try {      // Updating the active context:      setCtx(rep.getClientContext());      // If the receive request being replied has been cancelled, denying      // the message.      if (rep.getCorrelationId() == activeCtx.getCancelledReceive()) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(            BasicLevel.DEBUG,            " -> cancelled receive: id=" +             activeCtx.getCancelledReceive());        if (rep.getSize() > 0) {          Vector msgList = rep.getMessages();          for (int i = 0; i < msgList.size(); i++) {            Message msg = (Message)msgList.elementAt(i);            String msgId = msg.getIdentifier();                        if (logger.isLoggable(BasicLevel.WARN))              logger.log(BasicLevel.WARN,                                      " -> denying message: " + msgId);                        proxyAgent.sendNot(              from,              new DenyRequest(                0,                rep.getCorrelationId(),                msgId));          }        }      } else {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG, " -> reply");        ConsumerMessages jRep;        // Building the reply and storing the wrapped message id for later        // denying in the case of a failure:        if (rep.getSize() > 0) {          jRep = new ConsumerMessages(            rep.getCorrelationId(),            rep.getMessages(),            from.toString(),            true);          activeCtx.addDeliveringQueue(from);        } else {          jRep = new ConsumerMessages(              rep.getCorrelationId(),              (Vector)null,              from.toString(),              true);        }        // If the context is started, delivering the message, or buffering it:        if (activeCtx.getActivated()) {          doReply(jRep);        } else {          if (logger.isLoggable(BasicLevel.DEBUG))            logger.log(BasicLevel.DEBUG, " -> buffer the reply");          activeCtx.addPendingDelivery(jRep);        }      }    } catch (StateException pE) {      // The context is lost: denying the message:      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "", pE);      if (rep.getMessages().size() > 0) {        Vector msgList = rep.getMessages();        for (int i = 0; i < msgList.size(); i++) {          Message msg = (Message)msgList.elementAt(i);          String msgId = msg.getIdentifier();                    if (logger.isLoggable(BasicLevel.WARN))            logger.log(BasicLevel.WARN,                                     "Denying message: " + msgId);                    proxyAgent.sendNot(            from,            new DenyRequest(0,rep.getCorrelationId(), msgId));        }      }    }  }  /**   * Actually forwards a <code>BrowseReply</code> coming from a   * destination as a <code>QBrowseReply</code> destinated to the   * requesting client.   */  private void doFwd(BrowseReply rep) {    try {      // Updating the active context:      setCtx(rep.getClientContext());      doReply(new QBrowseReply(rep.getCorrelationId(),                                rep.getMessages()));    } catch (StateException pE) {      // The context is lost; nothing to do.    }  }  /**   * Forwards the topic's <code>SubscribeReply</code> as a   * <code>ServerReply</code>.   */  private void doFwd(SubscribeReply rep) {    try {      setCtx(rep.getClientContext());      doReply(new ServerReply(rep.getCorrelationId()));    } catch (StateException pE) {      // The context is lost; nothing to do.    }  }  /**   * Method implementing the proxy reaction to a <code>TopicMsgsReply</code>   * holding messages published by a topic.   */  private void doFwd(AgentId from, TopicMsgsReply rep) {    // Browsing the target subscriptions:    TopicSubscription tSub = (TopicSubscription) topicsTable.get(from);    if (tSub == null || tSub.isEmpty()) return;    String subName;    ClientSubscription sub;    for (Enumeration names = tSub.getNames(); names.hasMoreElements();) {      subName = (String) names.nextElement();      sub = (ClientSubscription) subsTable.get(subName);      if (sub == null) continue;      // Browsing the delivered messages.      sub.browseNewMessages(rep.getMessages());    }    // Setting the arrival order of the messages and save message if it    // is MessageSoftRef.    for (Enumeration msgs = rep.getMessages().elements(); msgs.hasMoreElements();) {       Message message = (Message) msgs.nextElement();      message.order = arrivalsCounter++;            if ((message.durableAcksCounter > 0) ||          (message instanceof MessageSoftRef)) {                if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  " -> save message " + message);        message.save(proxyAgent.getId().toString());        proxyAgent.setSave();      }    }    for (Enumeration names = tSub.getNames(); names.hasMoreElements();) {      subName = 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -