⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 proxyimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
  /**   * Either forwards the <code>ConsumerReceiveRequest</code> request as a   * <code>ReceiveRequest</code> directly to the target queue, or wraps it   * and sends it to the proxy if destinated to a subscription.   */  private void reactToClientRequest(int key, ConsumerReceiveRequest req)  {    if (req.getQueueMode()) {      ReceiveRequest not = new ReceiveRequest(        key,        req.getRequestId(),        req.getSelector(),        req.getTimeToLive(),        req.getReceiveAck(),        null,        1);      AgentId to = AgentId.fromString(req.getTarget());      if (to.getTo() == proxyAgent.getId().getTo()) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  " -> local receiving");        not.setPersistent(false);        proxyAgent.sendNot(to, not);      } else {        proxyAgent.sendNot(to, not);      }    } else {      doReact(key, req);       }  }  /**   * Either forwards the <code>ConsumerSetListRequest</code> request as a   * <code>ReceiveRequest</code> directly to the target queue, or wraps it   * and sends it to the proxy if destinated to a subscription.   */  private void reactToClientRequest(int key, ConsumerSetListRequest req) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,           "ProxyImp.reactToClientRequest(" + key + ',' + req + ')');    if (req.getQueueMode()) {      ReceiveRequest not = new ReceiveRequest(key,                                              req.getRequestId(),                                              req.getSelector(),                                              0,                                              false,                                              req.getMessageIdsToAck(),                                              req.getMessageCount());          AgentId to = AgentId.fromString(req.getTarget());      if (to.getTo() == proxyAgent.getId().getTo()) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  " -> local sending");        not.setPersistent(false);        proxyAgent.sendNot(to, not);      } else {        proxyAgent.sendNot(to, not);      }    }    else {      doReact(key, req);       }  }  /**   * Forwards the client's <code>QBrowseRequest</code> request as   * a <code>BrowseRequest</code> MOM request directly to a destination.   */  private void reactToClientRequest(int key, QBrowseRequest req)  {    proxyAgent.sendNot(AgentId.fromString(req.getTarget()),                       new BrowseRequest(key,                                         req.getRequestId(),                                         req.getSelector()));  }    private void reactToClientRequest(int key, JmsRequestGroup request) {    AbstractJmsRequest[] requests = request.getRequests();    RequestBuffer rm = new RequestBuffer(proxyAgent);    for (int i = 0; i < requests.length; i++) {      if (requests[i] instanceof ProducerMessages) {        ProducerMessages pm =(ProducerMessages) requests[i];        rm.put(key, pm);      } else {        reactToClientRequest(key, requests[i]);      }    }        rm.flush();  }  /**   * Distributes the received notifications to the appropriate reactions.   * <p>   * A JMS proxy reacts to:   * <ul>   * <li><code>SyncReply</code> proxy synchronizing notification,</li>   * <li><code>SetDMQRequest</code> admin notification,</li>   * <li><code>SetThreshRequest</code> admin notification,</li>   * <li><code>SetNbMaxMsgRequest</code> admin notification,</li>   * <li><code>Monit_GetNbMaxMsg</code> admin notification,</li>   * <li><code>Monit_GetDMQSettings</code> monitoring notification,</li>   * <li><code>AbstractReply</code> destination replies,</li>   * <li><code>AdminReply</code> administration replies,</li>   * <li><code>fr.dyade.aaa.agent.UnknownAgent</code>.</li>   * </ul>   *    * @exception UnknownNotificationException   *              If the notification is not expected.   */   public void react(AgentId from, Notification not)              throws UnknownNotificationException  {    // Administration and monitoring requests:    if (not instanceof SetDMQRequest)      doReact(from, (SetDMQRequest) not);    else if (not instanceof SetThreshRequest)      doReact(from, (SetThreshRequest) not);    else if (not instanceof SetNbMaxMsgRequest)      doReact(from, (SetNbMaxMsgRequest) not);    else if (not instanceof Monit_GetNbMaxMsg)      doReact(from, (Monit_GetNbMaxMsg) not);    else if (not instanceof Monit_GetDMQSettings)      doReact(from, (Monit_GetDMQSettings) not);    // Synchronization notification:    else if (not instanceof SyncReply)      doReact((SyncReply) not);    // Notifications sent by a destination:    else if (not instanceof AbstractReply)       doFwd(from, (AbstractReply) not);    else if (not instanceof AdminReply)      doReact((AdminReply) not);    // Platform notifications:    else if (not instanceof UnknownAgent)      doReact((UnknownAgent) not);    else if (not instanceof UserAdminRequestNot)      doReact((UserAdminRequestNot) not);    else      throw new UnknownNotificationException("Unexpected notification: "                                              + not.getClass().getName());  }    /**   * Distributes the client requests to the appropriate reactions.   * <p>   * The proxy accepts the following requests:   * <ul>   * <li><code>GetAdminTopicRequest</code></li>   * <li><code>CnxConnectRequest</code></li>   * <li><code>CnxStartRequest</code></li>   * <li><code>CnxStopRequest</code></li>   * <li><code>SessCreateTQRequest</code></li>   * <li><code>SessCreateTTRequest</code></li>   * <li><code>ConsumerSubRequest</code></li>   * <li><code>ConsumerUnsubRequest</code></li>   * <li><code>ConsumerCloseSubRequest</code></li>   * <li><code>ConsumerSetListRequest</code></li>   * <li><code>ConsumerUnsetListRequest</code></li>   * <li><code>ConsumerReceiveRequest</code></li>   * <li><code>ConsumerAckRequest</code></li>   * <li><code>ConsumerDenyRequest</code></li>   * <li><code>SessAckRequest</code></li>   * <li><code>SessDenyRequest</code></li>   * <li><code>TempDestDeleteRequest</code></li>   * <li><code>XACnxPrepare</code></li>   * <li><code>XACnxCommit</code></li>   * <li><code>XACnxRollback</code></li>   * <li><code>XACnxRecoverRequest</code></li>   * </ul>   * <p>   * A <code>JmsExceptReply</code> is sent back to the client when an   * exception is thrown by the reaction.   */   private void doReact(int key, AbstractJmsRequest request)  {    try {      // Updating the active context if the request is not a new context      // request!      if (! (request instanceof CnxConnectRequest))        setCtx(key);      if (request instanceof GetAdminTopicRequest)        doReact(key, (GetAdminTopicRequest) request);      else if (request instanceof CnxConnectRequest)        doReact(key, (CnxConnectRequest) request);      else if (request instanceof CnxStartRequest)        doReact((CnxStartRequest) request);      else if (request instanceof CnxStopRequest)        doReact((CnxStopRequest) request);      else if (request instanceof SessCreateTQRequest)        doReact((SessCreateTQRequest) request);      else if (request instanceof SessCreateTTRequest)        doReact((SessCreateTTRequest) request);      else if (request instanceof ConsumerSubRequest)        doReact((ConsumerSubRequest) request);      else if (request instanceof ConsumerUnsubRequest)        doReact((ConsumerUnsubRequest) request);      else if (request instanceof ConsumerCloseSubRequest)        doReact((ConsumerCloseSubRequest) request);      else if (request instanceof ConsumerSetListRequest)        doReact((ConsumerSetListRequest) request);      else if (request instanceof ConsumerUnsetListRequest)        doReact((ConsumerUnsetListRequest) request);      else if (request instanceof ConsumerReceiveRequest)        doReact((ConsumerReceiveRequest) request);      else if (request instanceof ConsumerAckRequest)        doReact((ConsumerAckRequest) request);      else if (request instanceof ConsumerDenyRequest)        doReact((ConsumerDenyRequest) request);      else if (request instanceof SessAckRequest)        doReact((SessAckRequest) request);      else if (request instanceof SessDenyRequest)        doReact((SessDenyRequest) request);      else if (request instanceof TempDestDeleteRequest)        doReact((TempDestDeleteRequest) request);      else if (request instanceof XACnxPrepare)        doReact((XACnxPrepare) request);      else if (request instanceof XACnxCommit)        doReact((XACnxCommit) request);      else if (request instanceof XACnxRollback)        doReact((XACnxRollback) request);      else if (request instanceof XACnxRecoverRequest)        doReact((XACnxRecoverRequest) request);      else if (request instanceof CnxCloseRequest)        doReact(key, (CnxCloseRequest) request);      else if (request instanceof ActivateConsumerRequest)        doReact(key, (ActivateConsumerRequest) request);      else if (request instanceof CommitRequest)        doReact(key, (CommitRequest)request);    }    catch (MomException mE) {      if (logger.isLoggable(BasicLevel.WARN))        logger.log(BasicLevel.WARN, mE);      // Sending the exception to the client:      doReply(new MomExceptionReply(request.getRequestId(), mE));    }  }  /**   * Method implementing the reaction to a <code>GetAdminTopicRequest</code>   * requesting the identifier of the local admin topic.   * <p>   * It simply sends back a <code>GetAdminTopicReply</code> holding the    * admin topic identifier.   *    * @exception AccessException  If the requester is not an administrator.   */  private void doReact(int key, GetAdminTopicRequest req)               throws AccessException  {//     if (! admin)//       throw new AccessException("Request forbidden to a non administrator.");    doReply(      key,      new GetAdminTopicReply(        req,        AdminTopicImpl.getReference().getId().toString()));  }  /**   * Method implementing the reaction to a <code>CnxConnectRequest</code>   * requesting the key of the active context.   * <p>   * It simply sends back a <code>ConnectReply</code> holding the active   * context's key.   *   * @exception DestinationException  In case of a first administrator    *              context, if the local administration topic reference   *              is not available.   */  private void doReact(int key, CnxConnectRequest req)    throws DestinationException {    // state change, so save.    proxyAgent.setSave();    setActiveCtxId(key);    activeCtx = new ClientContext(proxyAgent.getId(), key);    activeCtx.setProxyAgent(proxyAgent);    contexts.put(new Integer(key), activeCtx);        if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, "Connection " + key                              + " opened.");    doReply(new CnxConnectReply(req, key, proxyAgent.getId().toString()));  }  /**   * Method implementing the proxy reaction to a <code>CnxStartRequest</code>   * requesting to start a context.   * <p>   * This method sends the pending <code>ConsumerMessages</code> replies,   * if any.   */  private void doReact(CnxStartRequest req)  {    activeCtx.setActivated(true);    // Delivering the pending deliveries, if any:    for (Enumeration deliveries = activeCtx.getPendingDeliveries();         deliveries.hasMoreElements();)      doReply((AbstractJmsReply) deliveries.nextElement());    // Clearing the pending deliveries.    activeCtx.clearPendingDeliveries();  }  /**   * Method implementing the JMS proxy reaction to a   * <code>CnxStopRequest</code> requesting to stop a context.   * <p>   * This method sends a <code>ServerReply</code> back.   */  private void doReact(CnxStopRequest req) {    activeCtx.setActivated(false);    doReply(new ServerReply(req));  }  /**   * Method implementing the JMS proxy reaction to a   * <code>SessCreateTQRequest</code> requesting the creation of a temporary   * queue.   * <p>   * Creates the queue, sends it a <code>SetRightRequest</code> for granting   * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a   * <code>SyncReply</code> notification it sends to itself. This latest   * action's purpose is to preserve causality.   *   * @exception RequestException  If the queue could not be deployed.   */  private void doReact(SessCreateTQRequest req) throws RequestException {    try {      Queue queue = new Queue();      queue.init(proxyAgent.getId(), null);      AgentId qId = queue.getId();      queue.deploy();      // Setting free WRITE right on the queue:      proxyAgent.sendNot(qId, new SetRightRequest(null, null, 2));      activeCtx.addTemporaryDestination(qId);      SessCreateTDReply reply = new SessCreateTDReply(req, qId.toString());      proxyAgent.sendNot(proxyAgent.getId(),                         new SyncReply(activeCtxId, reply));      proxyAgent.sendNot(AdminTopic.getDefault(),                         new RegisterTmpDestNot(qId, false, true));      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Temporary queue "                                + qId + " created.");    }    catch (java.io.IOException iE) {      throw new RequestException("Could not create temporary queue: " + iE);    }   }  /**   * Method implementing the JMS proxy reaction to a   * <code>SessCreateTTRequest</code> requesting the creation of a temporary   * topic.   * <p>   * Creates the topic, sends it a <code>SetRightRequest</code> for granting   * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a   * <code>SyncReply</code> notification it sends to itself. This latest   * action's purpose is to preserve causality.   *   * @exception RequestException  If the topic could not be deployed.   */  private void doReact(SessCreateTTRequest req) throws RequestException {    Topic topic = new Topic();    topic.init(proxyAgent.getId(), null);    AgentId tId = topic.getId();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -