⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 proxyimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    try {      topic.deploy();      // Setting free WRITE right on the topic:      proxyAgent.sendNot(tId, new SetRightRequest(null, null, 2));      activeCtx.addTemporaryDestination(tId);      SessCreateTDReply reply = new SessCreateTDReply(req, tId.toString());      proxyAgent.sendNot(proxyAgent.getId(),                         new SyncReply(activeCtxId, reply));      proxyAgent.sendNot(AdminTopic.getDefault(),                         new RegisterTmpDestNot(tId, true, true));      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Temporary topic"                                + tId + " created.");    } catch (java.io.IOException iE) {      topic = null;      throw new RequestException("Could not deploy temporary topic "                                 + tId + ": " + iE);    }   }  /**   * Method implementing the JMS proxy reaction to a   * <code>ConsumerSubRequest</code> requesting to subscribe to a topic.   *   * @exception StateException  If activating an already active durable   *              subscription.   */  private void doReact(ConsumerSubRequest req) throws StateException {    AgentId topicId = AgentId.fromString(req.getTarget());    String subName = req.getSubName();        boolean newTopic = ! topicsTable.containsKey(topicId);    boolean newSub = ! subsTable.containsKey(subName);    TopicSubscription tSub;    ClientSubscription cSub;    // true if a SubscribeRequest has been sent to the topic.     boolean sent = false;    if (newTopic) { // New topic...      tSub = new TopicSubscription();      topicsTable.put(topicId, tSub);    } else { // Known topic...      tSub = (TopicSubscription) topicsTable.get(topicId);    }    if (newSub) { // New subscription...      // state change, so save.      proxyAgent.setSave();      cSub = new ClientSubscription(proxyAgent.getId(),                                    activeCtxId,                                    req.getRequestId(),                                    req.getDurable(),                                    topicId,                                    req.getSubName(),                                    req.getSelector(),                                    req.getNoLocal(),                                    dmqId,                                    threshold,                                    messagesTable);      cSub.setProxyAgent(proxyAgent);           if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG,                                "Subscription " + subName + " created.");      subsTable.put(subName, cSub);      tSub.putSubscription(subName, req.getSelector());      sent =        updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId());    }  else { // Existing durable subscription...      cSub = (ClientSubscription) subsTable.get(subName);      if (cSub.getActive())        throw new StateException("The durable subscription " + subName +                                 " has already been activated.");      // Updated topic: updating the subscription to the previous topic.      boolean updatedTopic = ! topicId.equals(cSub.getTopicId());      if (updatedTopic) {        TopicSubscription oldTSub =          (TopicSubscription) topicsTable.get(cSub.getTopicId());        oldTSub.removeSubscription(subName);        updateSubscriptionToTopic(cSub.getTopicId(), -1, -1);      }      // Updated selector?      boolean updatedSelector;      if (req.getSelector() == null && cSub.getSelector() != null)        updatedSelector = true;      else if (req.getSelector() != null && cSub.getSelector() == null)        updatedSelector = true;      else if (req.getSelector() == null && cSub.getSelector() == null)        updatedSelector = false;      else        updatedSelector = ! req.getSelector().equals(cSub.getSelector());      // Reactivating the subscription.      cSub.reactivate(activeCtxId,                      req.getRequestId(),                      topicId,                      req.getSelector(),                      req.getNoLocal());      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG,                                "Subscription " + subName + " reactivated.");      // Updated subscription: updating subscription to topic.        if (updatedTopic || updatedSelector) {        tSub.putSubscription(subName, req.getSelector());        sent = updateSubscriptionToTopic(topicId,                                         activeCtxId,                                         req.getRequestId());      }    }    // Activating the subscription.    activeCtx.addSubName(subName);    // Acknowledging the request, if needed.    if (! sent)      proxyAgent.sendNot(proxyAgent.getId(),                         new SyncReply(activeCtxId, new ServerReply(req)));  }  /**   * Method implementing the JMS proxy reaction to a   * <code>ConsumerSetListRequest</code> notifying the creation of a client   * listener.   * <p>   * Sets the listener for the subscription, launches a delivery sequence.   *   * @exception DestinationException  If the subscription does not exist.   */  private void doReact(ConsumerSetListRequest req) throws DestinationException  {    // Getting the subscription:    String subName = req.getTarget();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub == null)      throw new DestinationException("Can't set a listener on the non existing subscription: " + subName);    sub.setListener(req.getRequestId());    ConsumerMessages consM = sub.deliver();    if (consM != null) {      if (activeCtx.getActivated())        doReply(consM);      else        activeCtx.addPendingDelivery(consM);    }  }     /**   * Method implementing the JMS proxy reaction to a   * <code>ConsumerUnsetListRequest</code> notifying that a consumer listener   * is unset.   *   * @exception DestinationException  If the subscription does not exist.   */  private void doReact(ConsumerUnsetListRequest req)     throws DestinationException {    // If the listener was listening to a queue, cancelling any pending reply:    if (req.getQueueMode()) {      activeCtx.cancelReceive(req.getCancelledRequestId());      AgentId to = AgentId.fromString(req.getTarget());      proxyAgent.sendNot(        to,        new AbortReceiveRequest(activeCtx.getId(),                                 req.getRequestId(),                                req.getCancelledRequestId()));    }  }  /**   * Method implementing the JMS proxy reaction to a   * <code>ConsumerCloseSubRequest</code> requesting to deactivate a durable   * subscription.   *   * @exception DestinationException  If the subscription does not exist.    */  private void doReact(ConsumerCloseSubRequest req) throws DestinationException  {    // Getting the subscription:    String subName = req.getTarget();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub == null)      throw new DestinationException("Can't desactivate non existing subscription: " + subName);    // De-activating the subscription:    activeCtx.removeSubName(subName);    sub.deactivate();    // Acknowledging the request:    doReply(new ServerReply(req));  }  /**   * Method implementing the JMS proxy reaction to a   * <code>ConsumerUnsubRequest</code> requesting to remove a subscription.   *   * @exception DestinationException  If the subscription does not exist.   */  private void doReact(ConsumerUnsubRequest req)     throws DestinationException {    // state change, so save.    proxyAgent.setSave();    // Getting the subscription.    String subName = req.getTarget();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub == null)      throw new DestinationException("Can't unsubscribe non existing subscription: " + subName);    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                              "Deleting subscription " + subName);    // Updating the proxy's subscription to the topic.    AgentId topicId = sub.getTopicId();    TopicSubscription tSub = (TopicSubscription) topicsTable.get(topicId);    tSub.removeSubscription(subName);    updateSubscriptionToTopic(topicId, -1, -1);    // Deleting the subscription.    sub.delete();    activeCtx.removeSubName(subName);    subsTable.remove(subName);    // Acknowledging the request:    proxyAgent.sendNot(proxyAgent.getId(),                       new SyncReply(activeCtxId, new ServerReply(req)));  }  /**   * Method implementing the proxy reaction to a   * <code>ConsumerReceiveRequest</code> instance, requesting a message from a   * subscription.   * <p>   * This method registers the request and launches a delivery sequence.    *   * @exception DestinationException  If the subscription does not exist.    */  private void doReact(ConsumerReceiveRequest req)     throws DestinationException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,                              "ProxyImpl.doReact(" + req + ')');        String subName = req.getTarget();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub == null)      throw new DestinationException("Can't request a message from the unknown subscription: " + subName);    // Getting a message from the subscription.    sub.setReceiver(req.getRequestId(), req.getTimeToLive());    ConsumerMessages consM = sub.deliver();    if (consM != null && req.getReceiveAck()) {      Vector messageList = consM.getMessages();      for (int i = 0; i < messageList.size(); i++) {        Message msg = (Message)messageList.elementAt(i);        sub.acknowledge(msg.getIdentifier());      }    }    // Nothing to deliver but immediate delivery request: building an empty    // reply.    if (consM == null && req.getTimeToLive() == -1) {      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG,                                " -> immediate delivery");      sub.unsetReceiver();      consM = new ConsumerMessages(        req.getRequestId(),         subName,        false);    }        // Delivering.    if (consM != null && activeCtx.getActivated()) {      doReply(consM);    } else if (consM != null) {      activeCtx.addPendingDelivery(consM);    }  }  /**    * Method implementing the JMS proxy reaction to a   * <code>SessAckRequest</code> acknowledging messages either on a queue   * or on a subscription.   */  private void doReact(SessAckRequest req)   {    if (req.getQueueMode()) {      AgentId qId = AgentId.fromString(req.getTarget());      Vector ids = req.getIds();            AcknowledgeRequest not =        new AcknowledgeRequest(activeCtxId,                               req.getRequestId(),                               ids);      if (qId.getTo() == proxyAgent.getId().getTo()) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  " -> local acking");        not.setPersistent(false);      }            proxyAgent.sendNot(qId, not);    }    else {      String subName = req.getTarget();      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);      if (sub != null)        sub.acknowledge(req.getIds().elements());    }  }  /**    * Method implementing the JMS proxy reaction to a   * <code>SessDenyRequest</code> denying messages either on a queue or on   * a subscription.   */  private void doReact(SessDenyRequest req)  {    if (req.getQueueMode()) {      AgentId qId = AgentId.fromString(req.getTarget());      Vector ids = req.getIds();      proxyAgent.sendNot(qId,                         new DenyRequest(activeCtxId, req.getRequestId(), ids));      // Acknowledging the request unless forbidden:      if (! req.getDoNotAck())        proxyAgent.sendNot(proxyAgent.getId(),                           new SyncReply(activeCtxId, new ServerReply(req)));    }    else {      String subName = req.getTarget();      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);      if (sub == null)         return;      sub.deny(req.getIds().elements());      // Launching a delivery sequence:      ConsumerMessages consM = sub.deliver();      // Delivering.      if (consM != null && activeCtx.getActivated())        doReply(consM);      else if (consM != null)        activeCtx.addPendingDelivery(consM);    }  }  /**    * Method implementing the JMS proxy reaction to a   * <code>ConsumerAckRequest</code> acknowledging a message either on a queue   * or on a subscription.   */  private void doReact(ConsumerAckRequest req)  {    if (req.getQueueMode()) {      AgentId qId = AgentId.fromString(req.getTarget());      AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId,                                                      req.getRequestId(),                                                      req.getIds());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -