📄 proxyimpl.java
字号:
try { topic.deploy(); // Setting free WRITE right on the topic: proxyAgent.sendNot(tId, new SetRightRequest(null, null, 2)); activeCtx.addTemporaryDestination(tId); SessCreateTDReply reply = new SessCreateTDReply(req, tId.toString()); proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, reply)); proxyAgent.sendNot(AdminTopic.getDefault(), new RegisterTmpDestNot(tId, true, true)); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Temporary topic" + tId + " created."); } catch (java.io.IOException iE) { topic = null; throw new RequestException("Could not deploy temporary topic " + tId + ": " + iE); } } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerSubRequest</code> requesting to subscribe to a topic. * * @exception StateException If activating an already active durable * subscription. */ private void doReact(ConsumerSubRequest req) throws StateException { AgentId topicId = AgentId.fromString(req.getTarget()); String subName = req.getSubName(); boolean newTopic = ! topicsTable.containsKey(topicId); boolean newSub = ! subsTable.containsKey(subName); TopicSubscription tSub; ClientSubscription cSub; // true if a SubscribeRequest has been sent to the topic. boolean sent = false; if (newTopic) { // New topic... tSub = new TopicSubscription(); topicsTable.put(topicId, tSub); } else { // Known topic... tSub = (TopicSubscription) topicsTable.get(topicId); } if (newSub) { // New subscription... // state change, so save. proxyAgent.setSave(); cSub = new ClientSubscription(proxyAgent.getId(), activeCtxId, req.getRequestId(), req.getDurable(), topicId, req.getSubName(), req.getSelector(), req.getNoLocal(), dmqId, threshold, messagesTable); cSub.setProxyAgent(proxyAgent); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Subscription " + subName + " created."); subsTable.put(subName, cSub); tSub.putSubscription(subName, req.getSelector()); sent = updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId()); } else { // Existing durable subscription... cSub = (ClientSubscription) subsTable.get(subName); if (cSub.getActive()) throw new StateException("The durable subscription " + subName + " has already been activated."); // Updated topic: updating the subscription to the previous topic. boolean updatedTopic = ! topicId.equals(cSub.getTopicId()); if (updatedTopic) { TopicSubscription oldTSub = (TopicSubscription) topicsTable.get(cSub.getTopicId()); oldTSub.removeSubscription(subName); updateSubscriptionToTopic(cSub.getTopicId(), -1, -1); } // Updated selector? boolean updatedSelector; if (req.getSelector() == null && cSub.getSelector() != null) updatedSelector = true; else if (req.getSelector() != null && cSub.getSelector() == null) updatedSelector = true; else if (req.getSelector() == null && cSub.getSelector() == null) updatedSelector = false; else updatedSelector = ! req.getSelector().equals(cSub.getSelector()); // Reactivating the subscription. cSub.reactivate(activeCtxId, req.getRequestId(), topicId, req.getSelector(), req.getNoLocal()); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Subscription " + subName + " reactivated."); // Updated subscription: updating subscription to topic. if (updatedTopic || updatedSelector) { tSub.putSubscription(subName, req.getSelector()); sent = updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId()); } } // Activating the subscription. activeCtx.addSubName(subName); // Acknowledging the request, if needed. if (! sent) proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, new ServerReply(req))); } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerSetListRequest</code> notifying the creation of a client * listener. * <p> * Sets the listener for the subscription, launches a delivery sequence. * * @exception DestinationException If the subscription does not exist. */ private void doReact(ConsumerSetListRequest req) throws DestinationException { // Getting the subscription: String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub == null) throw new DestinationException("Can't set a listener on the non existing subscription: " + subName); sub.setListener(req.getRequestId()); ConsumerMessages consM = sub.deliver(); if (consM != null) { if (activeCtx.getActivated()) doReply(consM); else activeCtx.addPendingDelivery(consM); } } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerUnsetListRequest</code> notifying that a consumer listener * is unset. * * @exception DestinationException If the subscription does not exist. */ private void doReact(ConsumerUnsetListRequest req) throws DestinationException { // If the listener was listening to a queue, cancelling any pending reply: if (req.getQueueMode()) { activeCtx.cancelReceive(req.getCancelledRequestId()); AgentId to = AgentId.fromString(req.getTarget()); proxyAgent.sendNot( to, new AbortReceiveRequest(activeCtx.getId(), req.getRequestId(), req.getCancelledRequestId())); } } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerCloseSubRequest</code> requesting to deactivate a durable * subscription. * * @exception DestinationException If the subscription does not exist. */ private void doReact(ConsumerCloseSubRequest req) throws DestinationException { // Getting the subscription: String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub == null) throw new DestinationException("Can't desactivate non existing subscription: " + subName); // De-activating the subscription: activeCtx.removeSubName(subName); sub.deactivate(); // Acknowledging the request: doReply(new ServerReply(req)); } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerUnsubRequest</code> requesting to remove a subscription. * * @exception DestinationException If the subscription does not exist. */ private void doReact(ConsumerUnsubRequest req) throws DestinationException { // state change, so save. proxyAgent.setSave(); // Getting the subscription. String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub == null) throw new DestinationException("Can't unsubscribe non existing subscription: " + subName); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Deleting subscription " + subName); // Updating the proxy's subscription to the topic. AgentId topicId = sub.getTopicId(); TopicSubscription tSub = (TopicSubscription) topicsTable.get(topicId); tSub.removeSubscription(subName); updateSubscriptionToTopic(topicId, -1, -1); // Deleting the subscription. sub.delete(); activeCtx.removeSubName(subName); subsTable.remove(subName); // Acknowledging the request: proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, new ServerReply(req))); } /** * Method implementing the proxy reaction to a * <code>ConsumerReceiveRequest</code> instance, requesting a message from a * subscription. * <p> * This method registers the request and launches a delivery sequence. * * @exception DestinationException If the subscription does not exist. */ private void doReact(ConsumerReceiveRequest req) throws DestinationException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "ProxyImpl.doReact(" + req + ')'); String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub == null) throw new DestinationException("Can't request a message from the unknown subscription: " + subName); // Getting a message from the subscription. sub.setReceiver(req.getRequestId(), req.getTimeToLive()); ConsumerMessages consM = sub.deliver(); if (consM != null && req.getReceiveAck()) { Vector messageList = consM.getMessages(); for (int i = 0; i < messageList.size(); i++) { Message msg = (Message)messageList.elementAt(i); sub.acknowledge(msg.getIdentifier()); } } // Nothing to deliver but immediate delivery request: building an empty // reply. if (consM == null && req.getTimeToLive() == -1) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> immediate delivery"); sub.unsetReceiver(); consM = new ConsumerMessages( req.getRequestId(), subName, false); } // Delivering. if (consM != null && activeCtx.getActivated()) { doReply(consM); } else if (consM != null) { activeCtx.addPendingDelivery(consM); } } /** * Method implementing the JMS proxy reaction to a * <code>SessAckRequest</code> acknowledging messages either on a queue * or on a subscription. */ private void doReact(SessAckRequest req) { if (req.getQueueMode()) { AgentId qId = AgentId.fromString(req.getTarget()); Vector ids = req.getIds(); AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, req.getRequestId(), ids); if (qId.getTo() == proxyAgent.getId().getTo()) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> local acking"); not.setPersistent(false); } proxyAgent.sendNot(qId, not); } else { String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub != null) sub.acknowledge(req.getIds().elements()); } } /** * Method implementing the JMS proxy reaction to a * <code>SessDenyRequest</code> denying messages either on a queue or on * a subscription. */ private void doReact(SessDenyRequest req) { if (req.getQueueMode()) { AgentId qId = AgentId.fromString(req.getTarget()); Vector ids = req.getIds(); proxyAgent.sendNot(qId, new DenyRequest(activeCtxId, req.getRequestId(), ids)); // Acknowledging the request unless forbidden: if (! req.getDoNotAck()) proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, new ServerReply(req))); } else { String subName = req.getTarget(); ClientSubscription sub = (ClientSubscription) subsTable.get(subName); if (sub == null) return; sub.deny(req.getIds().elements()); // Launching a delivery sequence: ConsumerMessages consM = sub.deliver(); // Delivering. if (consM != null && activeCtx.getActivated()) doReply(consM); else if (consM != null) activeCtx.addPendingDelivery(consM); } } /** * Method implementing the JMS proxy reaction to a * <code>ConsumerAckRequest</code> acknowledging a message either on a queue * or on a subscription. */ private void doReact(ConsumerAckRequest req) { if (req.getQueueMode()) { AgentId qId = AgentId.fromString(req.getTarget()); AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, req.getRequestId(), req.getIds());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -