📄 proxyimpl.java
字号:
proxyAgent.setSave(); //setCtx(cKey); // Denying the non acknowledged messages: AgentId id; for (Enumeration ids = activeCtx.getDeliveringQueues(); ids .hasMoreElements();) { id = (AgentId) ids.nextElement(); proxyAgent.sendNot(id, new DenyRequest(key)); } // Removing or deactivating the subscriptions: String subName = null; ClientSubscription sub; Vector topics = new Vector(); for (Enumeration subs = activeCtx.getActiveSubs(); subs.hasMoreElements();) { subName = (String) subs.nextElement(); sub = (ClientSubscription) subsTable.get(subName); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Deactivate subscription " + subName + ", topic id = " + sub.getTopicId()); if (sub.getDurable()) { sub.deactivate(); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Durable subscription" + subName + " de-activated."); } else { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> topicsTable = " + topicsTable); sub.delete(); subsTable.remove(subName); TopicSubscription tSub = (TopicSubscription) topicsTable.get(sub .getTopicId()); tSub.removeSubscription(subName); if (!topics.contains(sub.getTopicId())) topics.add(sub.getTopicId()); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Temporary subscription" + subName + " deleted."); } } // Browsing the topics which at least have one subscription removed. for (Enumeration topicIds = topics.elements(); topicIds.hasMoreElements();) updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1); // Deleting the temporary destinations: AgentId destId; for (Enumeration dests = activeCtx.getTempDestinations(); dests .hasMoreElements();) { destId = (AgentId) dests.nextElement(); activeCtx.removeTemporaryDestination(destId); deleteTemporaryDestination(destId); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Deletes temporary" + " destination " + destId.toString()); } // Saving the prepared transactions. Enumeration xids = activeCtx.getTxIds(); Xid xid; XACnxPrepare recoveredPrepare; XACnxPrepare prepare; while (xids.hasMoreElements()) { if (recoveredTransactions == null) recoveredTransactions = new Hashtable(); xid = (Xid) xids.nextElement(); recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid); prepare = activeCtx.getTxPrepare(xid); if (recoveredPrepare == null) recoveredTransactions.put(xid, prepare); else { recoveredPrepare.getSendings().addAll(prepare.getSendings()); recoveredPrepare.getAcks().addAll(prepare.getAcks()); } } // Finally, deleting the context: contexts.remove(new Integer(key)); activeCtx = null; setActiveCtxId(-1); CnxCloseReply reply = new CnxCloseReply(); reply.setCorrelationId(req.getRequestId()); proxyAgent.sendToClient(key, reply); } private void doReact(int key, ActivateConsumerRequest req) { String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); sub.setActive(req.getActivate()); } private void doReact(int key, CommitRequest req) { // The commit may involve some local agents int asyncReplyCount = 0; Enumeration pms = req.getProducerMessages(); if (pms != null) { while (pms.hasMoreElements()) { ProducerMessages pm = (ProducerMessages) pms.nextElement(); AgentId destId = AgentId.fromString(pm.getTarget()); ClientMessages not = new ClientMessages(key, req.getRequestId(), pm.getMessages()); setDmq(not); if (destId.getTo() == proxyAgent.getId().getTo()) { // local sending not.setPersistent(false); if (req.getAsyncSend()) { not.setAsyncSend(true); } else { asyncReplyCount++; } } proxyAgent.sendNot(destId, not); } } Enumeration acks = req.getAckRequests(); if (acks != null) { while (acks.hasMoreElements()) { SessAckRequest sar = (SessAckRequest) acks.nextElement(); if (sar.getQueueMode()) { AgentId qId = AgentId.fromString(sar.getTarget()); Vector ids = sar.getIds(); AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, req .getRequestId(), ids); if (qId.getTo() == proxyAgent.getId().getTo()) { // local sending not.setPersistent(false); // No reply to wait for } proxyAgent.sendNot(qId, not); } else { String subName = sar.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) { sub.acknowledge(sar.getIds().elements()); proxyAgent.setSave(); } } } } if (!req.getAsyncSend()) { if (asyncReplyCount == 0) { proxyAgent.sendNot(proxyAgent.getId(), new SendReplyNot(key, req .getRequestId())); } else { // we need to wait for the replies // from the local agents // before replying to the client. activeCtx.addMultiReplyContext(req.getRequestId(), asyncReplyCount); } } // else the client doesn't expect any ack } /** * Distributes the JMS replies to the appropriate reactions. * <p> * JMS proxies react the following replies: * <ul> * <li><code>QueueMsgReply</code></li> * <li><code>BrowseReply</code></li> * <li><code>SubscribeReply</code></li> * <li><code>TopicMsgsReply</code></li> * <li><code>ExceptionReply</code></li> * </ul> */ private void doFwd(AgentId from, AbstractReply rep) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "--- " + this + " got " + rep.getClass().getName() + " with id: " + rep.getCorrelationId() + " from: " + from); if (rep instanceof QueueMsgReply) doFwd(from, (QueueMsgReply) rep); else if (rep instanceof BrowseReply) doFwd((BrowseReply) rep); else if (rep instanceof SubscribeReply) doFwd((SubscribeReply) rep); else if (rep instanceof TopicMsgsReply) doFwd(from, (TopicMsgsReply) rep); else if (rep instanceof ExceptionReply) doReact(from, (ExceptionReply) rep); else { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, "Unexpected reply!"); } } /** * Actually forwards a <code>QueueMsgReply</code> coming from a destination * as a <code>ConsumerMessages</code> destinated to the requesting client. * <p> * If the corresponding context is stopped, stores the * <code>ConsumerMessages</code> for later delivery. */ private void doFwd(AgentId from, QueueMsgReply rep) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "ProxyImpl.doFwd(" + from + ',' + rep + ')'); try { // Updating the active context: setCtx(rep.getClientContext()); // If the receive request being replied has been cancelled, denying // the message. if (rep.getCorrelationId() == activeCtx.getCancelledReceive()) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, " -> cancelled receive: id=" + activeCtx.getCancelledReceive()); if (rep.getSize() > 0) { Vector msgList = rep.getMessages(); for (int i = 0; i < msgList.size(); i++) { Message msg = (Message)msgList.elementAt(i); String msgId = msg.getIdentifier(); if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, " -> denying message: " + msgId); proxyAgent.sendNot( from, new DenyRequest( 0, rep.getCorrelationId(), msgId)); } } } else { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> reply"); ConsumerMessages jRep; // Building the reply and storing the wrapped message id for later // denying in the case of a failure: if (rep.getSize() > 0) { jRep = new ConsumerMessages( rep.getCorrelationId(), rep.getMessages(), from.toString(), true); activeCtx.addDeliveringQueue(from); } else { jRep = new ConsumerMessages( rep.getCorrelationId(), (Vector)null, from.toString(), true); } // If the context is started, delivering the message, or buffering it: if (activeCtx.getActivated()) { doReply(jRep); } else { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> buffer the reply"); activeCtx.addPendingDelivery(jRep); } } } catch (StateException pE) { // The context is lost: denying the message: if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "", pE); if (rep.getMessages().size() > 0) { Vector msgList = rep.getMessages(); for (int i = 0; i < msgList.size(); i++) { Message msg = (Message)msgList.elementAt(i); String msgId = msg.getIdentifier(); if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, "Denying message: " + msgId); proxyAgent.sendNot( from, new DenyRequest(0,rep.getCorrelationId(), msgId)); } } } } /** * Actually forwards a <code>BrowseReply</code> coming from a * destination as a <code>QBrowseReply</code> destinated to the * requesting client. */ private void doFwd(BrowseReply rep) { try { // Updating the active context: setCtx(rep.getClientContext()); doReply(new QBrowseReply(rep.getCorrelationId(), rep.getMessages())); } catch (StateException pE) { // The context is lost; nothing to do. } } /** * Forwards the topic's <code>SubscribeReply</code> as a * <code>ServerReply</code>. */ private void doFwd(SubscribeReply rep) { try { setCtx(rep.getClientContext()); doReply(new ServerReply(rep.getCorrelationId())); } catch (StateException pE) { // The context is lost; nothing to do. } } /** * Method implementing the proxy reaction to a <code>TopicMsgsReply</code> * holding messages published by a topic. */ private void doFwd(AgentId from, TopicMsgsReply rep) { // Browsing the target subscriptions: TopicSubscription tSub = (TopicSubscription) topicsTable.get(from); if (tSub == null || tSub.isEmpty()) return; String subName; ClientSubscription sub; for (Enumeration names = tSub.getNames(); names.hasMoreElements();) { subName = (String) names.nextElement(); sub = (ClientSubscription) subsTable.get(subName); if (sub == null) continue; // Browsing the delivered messages. sub.browseNewMessages(rep.getMessages()); } // Setting the arrival order of the messages and save message if it // is MessageSoftRef. for (Enumeration msgs = rep.getMessages().elements(); msgs.hasMoreElements();) { Message message = (Message) msgs.nextElement(); message.order = arrivalsCounter++; if ((message.durableAcksCounter > 0) || (message instanceof MessageSoftRef)) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> save message " + message); message.save(proxyAgent.getId().toString()); proxyAgent.setSave(); } } for (Enumeration names = tSub.getNames(); names.hasMoreElements();) { subName =
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -