📄 topicimpl.java
字号:
if (adminRequest instanceof GetSubscriberIds) { doReact((GetSubscriberIds)adminRequest, not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId()); } } private void doReact(GetSubscriberIds request, AgentId replyTo, String requestMsgId, String replyMsgId) { GetSubscriberIdsRep reply = new GetSubscriberIdsRep(getSubscriberIds()); replyToTopic(reply, replyTo, requestMsgId, replyMsgId); } /** * Returns the list of unique identifiers of all subscribers. Each user * appears once even if there is multiples subscriptions, the different * subscriptions can be enumerate through the proxy MBean. * * @return the list of unique identifiers of all subscribers. */ public String[] getSubscriberIds() { String[] res = new String[subscribers.size()]; for (int i = 0; i < res.length; i++) { AgentId aid = (AgentId)subscribers.elementAt(i); res[i] = aid.toString(); } return res; } private void replyToTopic( org.objectweb.joram.shared.admin.AdminReply reply, AgentId replyTo, String requestMsgId, String replyMsgId) { Message message = Message.create(); message.setCorrelationId(requestMsgId); message.setTimestamp(System.currentTimeMillis()); message.setDestination(replyTo.toString(), Topic.TOPIC_TYPE); message.setIdentifier(replyMsgId); try { message.setObject(reply); Vector messages = new Vector(); messages.add(message); ClientMessages clientMessages = new ClientMessages(-1, -1, messages); Channel.sendTo(replyTo, clientMessages); } catch (Exception exc) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) MomTracing.dbgDestination.log(BasicLevel.ERROR, "", exc); throw new Error(exc.getMessage()); } } /** * The <code>DestinationImpl</code> class calls this method for passing * notifications which have been partly processed, so that they are * specifically processed by the <code>TopicImpl</code> class. */ protected void specialProcess(Notification not) { if (not instanceof SetRightRequest) doProcess((SetRightRequest) not); else if (not instanceof ClientMessages) doProcess((ClientMessages) not); else if (not instanceof UnknownAgent) doProcess((UnknownAgent) not); else if (not instanceof DeleteNot) doProcess((DeleteNot) not); } /** * Method specifically processing a <code>SetRightRequest</code> instance. * <p> * When a reader is removed, deleting this reader's subscription if any, * and sending an <code>ExceptionReply</code> notification to the client. */ protected void doProcess(SetRightRequest not) { // If the request does not unset a reader, doing nothing. if (not.getRight() != -READ) return; AgentId user = not.getClient(); AccessException exc = new AccessException("READ right removed."); // Identified user: removing it. if (user != null) { // state change, so save. setSave(); subscribers.remove(user); selectors.remove(user); Channel.sendTo(user, new ExceptionReply(exc)); } // Free reading right removed: removing all non readers. else { for (Enumeration subs = subscribers.elements(); subs.hasMoreElements();) { user = (AgentId) subs.nextElement(); if (! isReader(user)) { // state change, so save. setSave(); subscribers.remove(user); selectors.remove(user); Channel.sendTo(user, new ExceptionReply(exc)); } } } } /** * Method specifically processing a <code>ClientMessages</code> instance. * <p> * This method may forward the messages to the topic father if any, or * to the cluster fellows if any.It may finally send * <code>TopicMsgsReply</code> instances to the valid subscribers. */ protected void doProcess(ClientMessages not) { // Forwarding the messages to the father or the cluster fellows, if any: forwardMessages(not); // Processing the messages: processMessages(not); } /** * Method specifically processing an <code>UnknownAgent</code> instance. * <p> * This method notifies the administrator of the failing cluster or * hierarchy building request, if needed, or removes the subscriptions of * the deleted client, if any, or sets the father identifier to null if it * comes from a deleted father. */ protected void doProcess(UnknownAgent uA) { AgentId agId = uA.agent; Notification not = uA.not; // Deleted topic was requested to join the cluster: notifying the // requester: String info = null; if (not instanceof ClusterTest) { ClusterTest cT = (ClusterTest) not; info = strbuf.append("Topic [").append(agId) .append("] can't join cluster as it does not exist").toString(); strbuf.setLength(0); Channel.sendTo(cT.requester, new AdminReply(cT.request, false, info)); } else if (not instanceof FatherTest) { // Deleted topic was requested as a father: notifying the requester: FatherTest fT = (FatherTest) not; info = strbuf.append("Topic [").append(agId) .append("] can't join hierarchy as it does not exist").toString(); strbuf.setLength(0); Channel.sendTo(fT.requester, new AdminReply(fT.request, false, info)); } else { // state change, so save. setSave(); // Removing the deleted client's subscriptions, if any. subscribers.remove(agId); selectors.remove(agId); // Removing the father identifier, if needed. if (fatherId != null && agId.equals(fatherId)) { // state change, so save. setSave(); fatherId = null; } } } /** * Method specifically processing a * <code>fr.dyade.aaa.agent.DeleteNot</code> instance. * <p> * <code>UnknownAgent</code> notifications are sent to each subscriber * and <code>UnclusterNot</code> notifications to the cluster * fellows. */ protected void doProcess(DeleteNot not) { AgentId clientId; Vector subs; SubscribeRequest sub; // For each subscriber... for (int i = 0; i < subscribers.size(); i++) { clientId = (AgentId) subscribers.get(i); Channel.sendTo(clientId, new UnknownAgent(destId, null)); } // For each cluster fellow if any... if (friends != null) { AgentId topicId; while (! friends.isEmpty()) { // state change, so save. setSave(); topicId = (AgentId) friends.remove(0); Channel.sendTo(topicId, new UnclusterNot()); } } } /** * Actually forwards a vector of messages to the father or the cluster * fellows, if any. */ protected void forwardMessages(ClientMessages messages) { if (friends != null && ! friends.isEmpty()) { AgentId topicId; for (int i = 0; i < friends.size(); i++) { topicId = (AgentId) friends.get(i); Channel.sendTo(topicId, new TopicForwardNot(messages, false)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages " + "forwarded to fellow " + topicId.toString()); } } else if (fatherId != null) { Channel.sendTo(fatherId, new TopicForwardNot(messages, true)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages " + "forwarded to father " + fatherId.toString()); } } /** * Actually processes the distribution of the received messages to the * valid subscriptions by sending a <code>TopicMsgsReply</code> notification * to the valid subscribers. */ protected void processMessages(ClientMessages not) { Vector messages = not.getMessages(); AgentId subscriber; boolean local; String selector; Vector deliverables; Message message; nbMsgsReceiveSinceCreation = nbMsgsReceiveSinceCreation + messages.size(); setNoSave(); boolean persistent = false; // Browsing the subscribers. for (Enumeration subs = subscribers.elements(); subs.hasMoreElements();) { subscriber = (AgentId) subs.nextElement(); local = (subscriber.getTo() == AgentServer.getServerId()); selector = (String) selectors.get(subscriber); // Current subscriber does not filter messages: all messages will be // sent. if (selector == null || selector.equals("")) { // Subscriber not local, or no other sending occured locally: directly // sending the messages. if (! local) { deliverables = messages; persistent = true; } else if (! alreadySentLocally) { deliverables = messages; alreadySentLocally = true; } // A local sending already occured: cloning the messages. else { deliverables = new Vector(); for (Enumeration msgs = messages.elements(); msgs.hasMoreElements();) deliverables.add(((Message) msgs.nextElement()).clone()); } } // Current subscriber filters messages; sending the matching messages. else { deliverables = new Vector(); for (int i = 0; i < messages.size(); i++) { message = (Message) messages.get(i); if (Selector.matches(message, selector)) { // Subscriber not local, or no other sending occured locally: // directly sending the message. if (! local) { deliverables.add(message); persistent = true; } else if (! alreadySentLocally) { deliverables.add(message); alreadySentLocally = true; } // A local sending already occured: cloning the message. else deliverables.add(message.clone()); } } } // There are messages to send. if (! deliverables.isEmpty()) { TopicMsgsReply topicMsgsReply = new TopicMsgsReply(deliverables); topicMsgsReply.setPersistent(persistent); Channel.sendTo(subscriber, topicMsgsReply); nbMsgsDeliverSinceCreation = nbMsgsDeliverSinceCreation + deliverables.size(); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -