⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 proxyimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      if (qId.getTo() == proxyAgent.getId().getTo()) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG,                                  " -> local acking");        not.setPersistent(false);        proxyAgent.sendNot(qId, not);      } else {        proxyAgent.sendNot(qId, not);      }    }    else {      String subName = req.getTarget();      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);      if (sub != null) {        sub.acknowledge(req.getIds().elements());      }    }  }  /**    * Method implementing the JMS proxy reaction to a   * <code>ConsumerDenyRequest</code> denying a message either on a queue   * or on a subscription.   * <p>   * This request is acknowledged when destinated to a queue.   */  private void doReact(ConsumerDenyRequest req)  {    if (req.getQueueMode()) {      AgentId qId = AgentId.fromString(req.getTarget());      String id = req.getId();      proxyAgent.sendNot(qId,                         new DenyRequest(activeCtxId, req.getRequestId(), id));      // Acknowledging the request, unless forbidden:      if (! req.getDoNotAck())        proxyAgent.sendNot(proxyAgent.getId(),                           new SyncReply(activeCtxId, new ServerReply(req)));    }    else {      String subName = req.getTarget();      ClientSubscription sub = (ClientSubscription) subsTable.get(subName);      if (sub == null)         return;      Vector ids = new Vector();      ids.add(req.getId());      sub.deny(ids.elements());      // Launching a delivery sequence:      ConsumerMessages consM = sub.deliver();      // Delivering.      if (consM != null && activeCtx.getActivated())        doReply(consM);      else if (consM != null)        activeCtx.addPendingDelivery(consM);    }  }  /**   * Method implementing the JMS proxy reaction to a    * <code>TempDestDeleteRequest</code> request for deleting a temporary   * destination.   * <p>   * This method sends a <code>fr.dyade.aaa.agent.DeleteNot</code> to the   * destination and acknowledges the request.   */  private void doReact(TempDestDeleteRequest req)  {    // Removing the destination from the context's list:    AgentId tempId = AgentId.fromString(req.getTarget());    activeCtx.removeTemporaryDestination(tempId);    // Sending the request to the destination:    deleteTemporaryDestination(tempId);    // Acknowledging the request:    proxyAgent.sendNot(proxyAgent.getId(),                       new SyncReply(activeCtxId, new ServerReply(req)));  }  private void deleteTemporaryDestination(AgentId destId) {    proxyAgent.sendNot(destId, new DeleteNot());    proxyAgent.sendNot(AdminTopic.getDefault(),                       new RegisterTmpDestNot(destId, false, false));  }  /**   * Method implementing the JMS proxy reaction to an   * <code>XACnxPrepare</code> request holding messages and acknowledgements   * produced in an XA transaction.   *   * @exception StateException  If the proxy has already received a prepare   *                              order for the same transaction.   */  private void doReact(XACnxPrepare req) throws StateException  {    try {      Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI());      activeCtx.registerTxPrepare(xid, req);      doReply(new ServerReply(req));    }    catch (Exception exc) {      throw new StateException(exc.getMessage());    }  }  /**   * Method implementing the JMS proxy reaction to an   * <code>XACnxCommit</code> request commiting the operations performed   * in a given transaction.   * <p>   * This method actually processes the objects sent at the prepare phase,   * and acknowledges the request.   *    * @exception StateException  If commiting an unknown transaction.   */  private void doReact(XACnxCommit req) throws StateException  {    Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI());    XACnxPrepare prepare = activeCtx.getTxPrepare(xid);    if (prepare == null)      throw new StateException("Unknown transaction identifier.");    Vector sendings = prepare.getSendings();    Vector acks = prepare.getAcks();    ProducerMessages pM;    ClientMessages not;    while (! sendings.isEmpty()) {      pM = (ProducerMessages) sendings.remove(0);      not = new ClientMessages(activeCtxId,                               pM.getRequestId(),                               pM.getMessages());      proxyAgent.sendNot(AgentId.fromString(pM.getTarget()), not);    }    while (! acks.isEmpty())      doReact((SessAckRequest) acks.remove(0));    doReply(new ServerReply(req));  }  /**   * Method implementing the JMS proxy reaction to an   * <code>XACnxRollback</code> request rolling back the operations performed   * in a given transaction.   */  private void doReact(XACnxRollback req)  {    Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI());    String queueName;    AgentId qId;    Vector ids;    for (Enumeration queues = req.getQueues(); queues.hasMoreElements();) {      queueName = (String) queues.nextElement();      qId = AgentId.fromString(queueName);      ids = req.getQueueIds(queueName);      proxyAgent.sendNot(qId,                         new DenyRequest(activeCtxId, req.getRequestId(), ids));    }    String subName;    ClientSubscription sub;    ConsumerMessages consM;    for (Enumeration subs = req.getSubs(); subs.hasMoreElements();) {      subName = (String) subs.nextElement();      sub = (ClientSubscription) subsTable.get(subName);      if (sub != null) {        sub.deny(req.getSubIds(subName).elements());        consM = sub.deliver();        if (consM != null && activeCtx.getActivated())          doReply(consM);        else if (consM != null)          activeCtx.addPendingDelivery(consM);      }    }   XACnxPrepare prepare = activeCtx.getTxPrepare(xid);    if (prepare != null) {      Vector acks = prepare.getAcks();      SessAckRequest ack;      while (! acks.isEmpty()) {        ack = (SessAckRequest) acks.remove(0);        doReact(new SessDenyRequest(ack.getTarget(),                                    ack.getIds(),                                    ack.getQueueMode(),                                    true));      }    }    proxyAgent.sendNot(proxyAgent.getId(),                       new SyncReply(activeCtxId, new ServerReply(req)));  }  /**   * Reacts to a <code>XACnxRecoverRequest</code> request requesting the    * identifiers of the prepared transactions.   * <p>   * Returns the identifiers of the recovered transactions, puts the prepared   * data into the active context for future commit or rollback.   *   * @exception StateException  If a recovered transaction branch is already   *                              present in the context.   */  private void doReact(XACnxRecoverRequest req)    throws StateException {    // state change, so save.    proxyAgent.setSave();    Vector bqs = new Vector();    Vector fis = new Vector();    Vector gtis = new Vector();    if (recoveredTransactions != null) {      Enumeration keys = recoveredTransactions.keys();      Xid xid;      while (keys.hasMoreElements()) {        xid = (Xid) recoveredTransactions.get(keys.nextElement());        bqs.add(xid.bq);        fis.add(new Integer(xid.fi));        gtis.add(xid.gti);        try {          activeCtx.registerTxPrepare(xid,                                      (XACnxPrepare) recoveredTransactions.remove(xid));        }        catch (Exception exc) {          throw new StateException("Recovered transaction branch has already been prepared by the RM.");        }      }    }    recoveredTransactions = null;    doReply(new XACnxRecoverReply(req, bqs, fis, gtis));  }  /**   * Method implementing the reaction to a <code>SetDMQRequest</code>   * instance setting the dead message queue identifier for this proxy   * and its subscriptions.   */  private void doReact(AgentId from, SetDMQRequest not) {    // state change, so save.    proxyAgent.setSave();        dmqId = not.getDmqId();    for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();)       ((ClientSubscription) subsTable.get(keys.nextElement())).setDMQId(dmqId);    proxyAgent.sendNot(from, new AdminReply(not, true, "DMQ set: " + dmqId));  }  /**   * Method implementing the reaction to a <code>SetThreshRequest</code>   * instance setting the threshold value for this proxy and its   * subscriptions.   */  private void doReact(AgentId from, SetThreshRequest not) {    // state change, so save.    proxyAgent.setSave();        threshold = not.getThreshold();    for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();)       ((ClientSubscription)         subsTable.get(keys.nextElement())).setThreshold(not.getThreshold());    proxyAgent.sendNot(from,                       new AdminReply(not,                                      true,                                      "Threshold set: " + threshold));  }  /**   * Method implementing the reaction to a <code>SetNbMaxMsgRequest</code>   * instance setting the NbMaxMsg value for the subscription.   */  protected void doReact(AgentId from, SetNbMaxMsgRequest not) {    int nbMaxMsg = not.getNbMaxMsg();    String subName = not.getSubName();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub != null) {      sub.setNbMaxMsg(nbMaxMsg);      proxyAgent.sendNot(from,                         new AdminReply(not,                                        true,                                        "NbMaxMsg set: " + nbMaxMsg + " on " + subName));    } else {      proxyAgent.sendNot(from,                         new AdminReply(not,                                        false,                                        "NbMaxMsg not set: " + nbMaxMsg + " on " + subName));    }  }  /**   * Method implementing the reaction to a   * <code>Monit_GetNbMaxMsg</code> notification requesting the   * number max of messages in the subscription.   *   * @exception AccessException  If the requester is not the administrator.   */  protected void doReact(AgentId from, Monit_GetNbMaxMsg not) {    int nbMaxMsg = -1;    String subName = not.getSubName();    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub != null)      nbMaxMsg = sub.getNbMaxMsg();    Channel.sendTo(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg));  }  /**   * Returns the maximum number of message for identified subscription.   * The subscription is identified  by its unique name, if the limit is unset   * the method returns -1.   *   * @param subName  The subscription unique name.   * @return the maximum number of message for subscription if set;   *	     -1 otherwise.   */  public int getNbMaxMsg(String subName) {    int nbMaxMsg = -1;    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub != null)      nbMaxMsg = sub.getNbMaxMsg();    return nbMaxMsg;  }  /**   * Sets the maximum number of message for identified subscription.   * The subscription is identified  by its unique name.   *   * @param subName  The subscription unique name.   * @param nbMaxMsg the maximum number of message for subscription (-1 set   *		     no limit).   */  public void setNbMaxMsg(String subName, int nbMaxMsg) {    ClientSubscription sub = (ClientSubscription) subsTable.get(subName);    if (sub != null)      sub.setNbMaxMsg(nbMaxMsg);  }  /**   * Method implementing the reaction to a <code>Monit_GetDMQSettings</code>   * instance requesting the DMQ settings of this proxy.   */  private void doReact(AgentId from, Monit_GetDMQSettings not)  {    String id = null;    if (dmqId != null)      id = dmqId.toString();    proxyAgent.sendNot(from, new Monit_GetDMQSettingsRep(not, id, threshold));  }  /**   * Method implementing the JMS proxy reaction to a   * <code>SyncReply</code> notification sent by itself, wrapping a reply   * to be sent to a client.   */  private void doReact(SyncReply not)  {    doReply(not.key, not.reply);  }  /**   * The method closes a given context by denying the non acknowledged messages   * delivered to this context, and deleting its temporary subscriptions and   * destinations.   */  private void doReact(int key, CnxCloseRequest req) {    // state change, so save.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -