📄 scribeimpl.java
字号:
CancellableTask task = endpoint.scheduleMessage(new SubscribeLostMessage(handle, topic, id), MESSAGE_TIMEOUT); lost.put(new Integer(id), task); } /** * Internal method which processes an ack message * * @param message The ackMessage */ private void ackMessageReceived(SubscribeAckMessage message) { ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId())); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Removing client " + client + " from list of outstanding for ack " + message.getId()); } CancellableTask task = (CancellableTask) lost.remove(new Integer(message.getId())); if (task != null) { task.cancel(); } } /** * Internal method which processes a subscribe failed message * * @param message THe lost message */ private void failedMessageReceived(SubscribeFailedMessage message) { ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId())); lost.remove(new Integer(message.getId())); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Telling client " + client + " about FAILURE for outstanding ack " + message.getId()); } if (client != null) { client.subscribeFailed(message.getTopic()); } } /** * Internal method which processes a subscribe lost message * * @param message THe lost message */ private void lostMessageReceived(SubscribeLostMessage message) { ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId())); lost.remove(new Integer(message.getId())); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Telling client " + client + " about LOSS for outstanding ack " + message.getId()); } if (client != null) { client.subscribeFailed(message.getTopic()); } } // ----- SCRIBE METHODS ----- /** * Subscribes the given client to the provided topic. Any message published to * the topic will be delivered to the Client via the deliver() method. * * @param topic The topic to subscribe to * @param client The client to give messages to */ public void subscribe(Topic topic, ScribeClient client) { subscribe(topic, client, null); } /** * Subscribes the given client to the provided topic. Any message published to * the topic will be delivered to the Client via the deliver() method. * * @param topic The topic to subscribe to * @param client The client to give messages to * @param content DESCRIBE THE PARAMETER */ public void subscribe(Topic topic, ScribeClient client, ScribeContent content) { subscribe(topic, client, content instanceof RawScribeContent ? (RawScribeContent) content : new JavaSerializedScribeContent(content)); } /** * DESCRIBE THE METHOD * * @param topic DESCRIBE THE PARAMETER * @param client DESCRIBE THE PARAMETER * @param content DESCRIBE THE PARAMETER */ public void subscribe(Topic topic, ScribeClient client, RawScribeContent content) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Subscribing client " + client + " to topic " + topic); } // if we don't know about this topic, subscribe // otherwise, we simply add the client to the list if (topics.get(topic) == null) { topics.put(topic, new TopicManager(topic, client)); sendSubscribe(topic, client, content); } else { TopicManager manager = (TopicManager) topics.get(topic); manager.addClient(client); if ((manager.getParent() == null) && (!isRoot(topic))) { sendSubscribe(topic, client, content); } } } /** * Unsubscribes the given client from the provided topic. * * @param topic The topic to unsubscribe from * @param client The client to unsubscribe */ public void unsubscribe(Topic topic, ScribeClient client) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Unsubscribing client " + client + " from topic " + topic); } if (topics.get(topic) != null) { TopicManager manager = (TopicManager) topics.get(topic); // if this is the last client and there are no children, // then we unsubscribe from the topic if (manager.removeClient(client)) { topics.remove(topic); NodeHandle parent = manager.getParent(); if (parent != null) { endpoint.route(null, new UnsubscribeMessage(handle, topic), parent); } } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Attempt to unsubscribe client " + client + " from unknown topic " + topic); } } } /** * Publishes the given message to the topic. * * @param topic The topic to publish to * @param content The content to publish */ public void publish(Topic topic, ScribeContent content) { publish(topic, content instanceof RawScribeContent ? (RawScribeContent) content : new JavaSerializedScribeContent(content)); } /** * DESCRIBE THE METHOD * * @param topic DESCRIBE THE PARAMETER * @param content DESCRIBE THE PARAMETER */ public void publish(Topic topic, RawScribeContent content) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Publishing content " + content + " to topic " + topic); } endpoint.route(topic.getId(), new PublishRequestMessage(handle, topic, content), null); } /** * Anycasts the given content to a member of the given topic * * @param topic The topic to anycast to * @param content The content to anycast */ public void anycast(Topic topic, ScribeContent content) { if (content instanceof RawScribeContent) { anycast(topic, (RawScribeContent) content); } else { anycast(topic, new JavaSerializedScribeContent(content)); } } /** * DESCRIBE THE METHOD * * @param topic DESCRIBE THE PARAMETER * @param content DESCRIBE THE PARAMETER */ public void anycast(Topic topic, RawScribeContent content) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Anycasting content " + content + " to topic " + topic); } endpoint.route(topic.getId(), new AnycastMessage(handle, topic, content), null); } /** * Adds a child to the given topic * * @param topic The topic to add the child to * @param child The child to add */ public void addChild(Topic topic, NodeHandle child) { addChild(topic, child, Integer.MAX_VALUE); } /** * Adds a child to the given topic, using the specified sequence number in the * ack message sent to the child. * * @param topic The topic * @param child THe child to add * @param id THe seuqnce number */ protected void addChild(Topic topic, NodeHandle child, int id) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Adding child " + child + " to topic " + topic); } TopicManager manager = (TopicManager) topics.get(topic); // if we don't know about the topic, we subscribe, otherwise, // we simply add the child to the list if (manager == null) { manager = new TopicManager(topic, child); topics.put(topic, manager); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Implicitly subscribing to topic " + topic); } sendSubscribe(topic, null, null); } else { manager.addChild(child); } // we send a confirmation back to the child endpoint.route(null, new SubscribeAckMessage(handle, topic, manager.getPathToRoot(), id), child); // and lastly notify the policy and all of the clients policy.childAdded(topic, child); ScribeClient[] clients = manager.getClients(); for (int i = 0; i < clients.length; i++) { clients[i].childAdded(topic, child); } } /** * Removes a child from the given topic * * @param topic The topic to remove the child from * @param child The child to remove */ public void removeChild(Topic topic, NodeHandle child) { removeChild(topic, child, true); } /** * Removes a child from the given topic * * @param topic The topic to remove the child from * @param child The child to remove * @param sendDrop Whether or not to send a drop message to the chil */ protected void removeChild(Topic topic, NodeHandle child, boolean sendDrop) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Removing child " + child + " from topic " + topic); } if (topics.get(topic) != null) { TopicManager manager = (TopicManager) topics.get(topic); // if this is the last child and there are no clients, then // we unsubscribe, if we are not the root if (manager.removeChild(child)) { topics.remove(topic); NodeHandle parent = manager.getParent(); if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": We no longer need topic " + topic + " - unsubscribing from parent " + parent); } if (parent != null) { endpoint.route(null, new UnsubscribeMessage(handle, topic), parent); } } if ((sendDrop) && (child.isAlive())) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Informing child " + child + " that he has been dropped from topic " + topic); } // now, we tell the child that he has been dropped endpoint.route(null, new DropMessage(handle, topic), child); } // and lastly notify the policy and all of the clients policy.childRemoved(topic, child); ScribeClient[] clients = manager.getClients(); for (int i = 0; i < clients.length; i++) { clients[i].childRemoved(topic, child); } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Unexpected attempt to remove child " + child + " from unknown topic " + topic); } } } // ----- COMMON API METHODS ----- /** * This method is invoked on applications when the underlying node is about to * forward the given message with the provided target to the specified next * hop. Applications can change the contents of the message, specify a * different nextHop (through re-routing), or completely terminate the * message. * * @param message The message being sent, containing an internal message along * with a destination key and nodeHandle next hop. * @return Whether or not to forward the message further */ public boolean forward(final RouteMessage message) { Message internalMessage; try { internalMessage = message.getMessage(endpoint.getDeserializer()); } catch (IOException ioe) { throw new RuntimeException(ioe); } if (logger.level <= Logger.FINEST) { logger.log(endpoint.getId() + ": Forward called with " + internalMessage); } if (internalMessage instanceof AnycastMessage) { AnycastMessage aMessage = (AnycastMessage) internalMessage; // get the topic manager associated with this topic TopicManager manager = (TopicManager) topics.get(aMessage.getTopic()); // if it's a subscribe message, we must handle it differently if (internalMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) internalMessage; // if this is our own subscribe message, ignore it if (sMessage.getSource().getId().equals(endpoint.getId())) { return true; } if (manager != null) { // first, we have to make sure that we don't create a loop, which would occur // if the subcribing node's previous parent is on our path to the root Id previousParent = sMessage.getPreviousParent(); List path = Arrays.asList(manager.getPathToRoot());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -