📄 proxyimpl.java
字号:
if (qId.getTo() == proxyAgent.getId().getTo()) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> local acking"); not.setPersistent(false); proxyAgent.sendNot(qId, not); } else { proxyAgent.sendNot(qId, not); } } else { String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) { sub.acknowledge(req.getIds().elements()); } } } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerDenyRequest</code> denying a message either on a queue * or on a subscription. * <p> * This request is acknowledged when destinated to a queue. */ private void doReact(ConsumerDenyRequest req) { if (req.getQueueMode()) { AgentId qId = AgentId.fromString(req.getTarget()); String id = req.getId(); proxyAgent.sendNot(qId, new DenyRequest(activeCtxId, req.getRequestId(), id)); // Acknowledging the request, unless forbidden: if (! req.getDoNotAck()) proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, new ServerReply(req))); } else { String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub == null) return; Vector ids = new Vector(); ids.add(req.getId()); sub.deny(ids.elements()); // Launching a delivery sequence: ConsumerMessages consM = sub.deliver(); // Delivering. if (consM != null && activeCtx.getActivated()) doReply(consM); else if (consM != null) activeCtx.addPendingDelivery(consM); } } /** * Method implementing the JMS proxy reaction to a * <code>TempDestDeleteRequest</code> request for deleting a temporary * destination. * <p> * This method sends a <code>fr.dyade.aaa.agent.DeleteNot</code> to the * destination and acknowledges the request. */ private void doReact(TempDestDeleteRequest req) { // Removing the destination from the context's list: AgentId tempId = AgentId.fromString(req.getTarget()); activeCtx.removeTemporaryDestination(tempId); // Sending the request to the destination: deleteTemporaryDestination(tempId); // Acknowledging the request: proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, new ServerReply(req))); } private void deleteTemporaryDestination(AgentId destId) { proxyAgent.sendNot(destId, new DeleteNot()); proxyAgent.sendNot(AdminTopic.getDefault(), new RegisterTmpDestNot(destId, false, false)); } /** * Method implementing the JMS proxy reaction to an * <code>XACnxPrepare</code> request holding messages and acknowledgements * produced in an XA transaction. * * @exception StateException If the proxy has already received a prepare * order for the same transaction. */ private void doReact(XACnxPrepare req) throws StateException { try { Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI()); activeCtx.registerTxPrepare(xid, req); doReply(new ServerReply(req)); } catch (Exception exc) { throw new StateException(exc.getMessage()); } } /** * Method implementing the JMS proxy reaction to an * <code>XACnxCommit</code> request commiting the operations performed * in a given transaction. * <p> * This method actually processes the objects sent at the prepare phase, * and acknowledges the request. * * @exception StateException If commiting an unknown transaction. */ private void doReact(XACnxCommit req) throws StateException { Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI()); XACnxPrepare prepare = activeCtx.getTxPrepare(xid); if (prepare == null) throw new StateException("Unknown transaction identifier."); Vector sendings = prepare.getSendings(); Vector acks = prepare.getAcks(); ProducerMessages pM; ClientMessages not; while (! sendings.isEmpty()) { pM = (ProducerMessages) sendings.remove(0); not = new ClientMessages(activeCtxId, pM.getRequestId(), pM.getMessages()); proxyAgent.sendNot(AgentId.fromString(pM.getTarget()), not); } while (! acks.isEmpty()) doReact((SessAckRequest) acks.remove(0)); doReply(new ServerReply(req)); } /** * Method implementing the JMS proxy reaction to an * <code>XACnxRollback</code> request rolling back the operations performed * in a given transaction. */ private void doReact(XACnxRollback req) { Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI()); String queueName; AgentId qId; Vector ids; for (Enumeration queues = req.getQueues(); queues.hasMoreElements();) { queueName = (String) queues.nextElement(); qId = AgentId.fromString(queueName); ids = req.getQueueIds(queueName); proxyAgent.sendNot(qId, new DenyRequest(activeCtxId, req.getRequestId(), ids)); } String subName; ClientSubscription sub; ConsumerMessages consM; for (Enumeration subs = req.getSubs(); subs.hasMoreElements();) { subName = (String) subs.nextElement(); sub = (ClientSubscription) subsTable.get(subName); if (sub != null) { sub.deny(req.getSubIds(subName).elements()); consM = sub.deliver(); if (consM != null && activeCtx.getActivated()) doReply(consM); else if (consM != null) activeCtx.addPendingDelivery(consM); } } XACnxPrepare prepare = activeCtx.getTxPrepare(xid); if (prepare != null) { Vector acks = prepare.getAcks(); SessAckRequest ack; while (! acks.isEmpty()) { ack = (SessAckRequest) acks.remove(0); doReact(new SessDenyRequest(ack.getTarget(), ack.getIds(), ack.getQueueMode(), true)); } } proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, new ServerReply(req))); } /** * Reacts to a <code>XACnxRecoverRequest</code> request requesting the * identifiers of the prepared transactions. * <p> * Returns the identifiers of the recovered transactions, puts the prepared * data into the active context for future commit or rollback. * * @exception StateException If a recovered transaction branch is already * present in the context. */ private void doReact(XACnxRecoverRequest req) throws StateException { // state change, so save. proxyAgent.setSave(); Vector bqs = new Vector(); Vector fis = new Vector(); Vector gtis = new Vector(); if (recoveredTransactions != null) { Enumeration keys = recoveredTransactions.keys(); Xid xid; while (keys.hasMoreElements()) { xid = (Xid) recoveredTransactions.get(keys.nextElement()); bqs.add(xid.bq); fis.add(new Integer(xid.fi)); gtis.add(xid.gti); try { activeCtx.registerTxPrepare(xid, (XACnxPrepare) recoveredTransactions.remove(xid)); } catch (Exception exc) { throw new StateException("Recovered transaction branch has already been prepared by the RM."); } } } recoveredTransactions = null; doReply(new XACnxRecoverReply(req, bqs, fis, gtis)); } /** * Method implementing the reaction to a <code>SetDMQRequest</code> * instance setting the dead message queue identifier for this proxy * and its subscriptions. */ private void doReact(AgentId from, SetDMQRequest not) { // state change, so save. proxyAgent.setSave(); dmqId = not.getDmqId(); for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();) ((ClientSubscription) subsTable.get(keys.nextElement())).setDMQId(dmqId); proxyAgent.sendNot(from, new AdminReply(not, true, "DMQ set: " + dmqId)); } /** * Method implementing the reaction to a <code>SetThreshRequest</code> * instance setting the threshold value for this proxy and its * subscriptions. */ private void doReact(AgentId from, SetThreshRequest not) { // state change, so save. proxyAgent.setSave(); threshold = not.getThreshold(); for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();) ((ClientSubscription) subsTable.get(keys.nextElement())).setThreshold(not.getThreshold()); proxyAgent.sendNot(from, new AdminReply(not, true, "Threshold set: " + threshold)); } /** * Method implementing the reaction to a <code>SetNbMaxMsgRequest</code> * instance setting the NbMaxMsg value for the subscription. */ protected void doReact(AgentId from, SetNbMaxMsgRequest not) { int nbMaxMsg = not.getNbMaxMsg(); String subName = not.getSubName(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) { sub.setNbMaxMsg(nbMaxMsg); proxyAgent.sendNot(from, new AdminReply(not, true, "NbMaxMsg set: " + nbMaxMsg + " on " + subName)); } else { proxyAgent.sendNot(from, new AdminReply(not, false, "NbMaxMsg not set: " + nbMaxMsg + " on " + subName)); } } /** * Method implementing the reaction to a * <code>Monit_GetNbMaxMsg</code> notification requesting the * number max of messages in the subscription. * * @exception AccessException If the requester is not the administrator. */ protected void doReact(AgentId from, Monit_GetNbMaxMsg not) { int nbMaxMsg = -1; String subName = not.getSubName(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) nbMaxMsg = sub.getNbMaxMsg(); Channel.sendTo(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg)); } /** * Returns the maximum number of message for identified subscription. * The subscription is identified by its unique name, if the limit is unset * the method returns -1. * * @param subName The subscription unique name. * @return the maximum number of message for subscription if set; * -1 otherwise. */ public int getNbMaxMsg(String subName) { int nbMaxMsg = -1; ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) nbMaxMsg = sub.getNbMaxMsg(); return nbMaxMsg; } /** * Sets the maximum number of message for identified subscription. * The subscription is identified by its unique name. * * @param subName The subscription unique name. * @param nbMaxMsg the maximum number of message for subscription (-1 set * no limit). */ public void setNbMaxMsg(String subName, int nbMaxMsg) { ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) sub.setNbMaxMsg(nbMaxMsg); } /** * Method implementing the reaction to a <code>Monit_GetDMQSettings</code> * instance requesting the DMQ settings of this proxy. */ private void doReact(AgentId from, Monit_GetDMQSettings not) { String id = null; if (dmqId != null) id = dmqId.toString(); proxyAgent.sendNot(from, new Monit_GetDMQSettingsRep(not, id, threshold)); } /** * Method implementing the JMS proxy reaction to a * <code>SyncReply</code> notification sent by itself, wrapping a reply * to be sent to a client. */ private void doReact(SyncReply not) { doReply(not.key, not.reply); } /** * The method closes a given context by denying the non acknowledged messages * delivered to this context, and deleting its temporary subscriptions and * destinations. */ private void doReact(int key, CnxCloseRequest req) { // state change, so save.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -