📄 splitstreamscribepolicy.java
字号:
scribe.removeChild(new Topic((Id) res.elementAt(1)), (NodeHandle) res.elementAt(0)); return true; } else { return false; } } } } } /** * This method adds the parent and child in such a way that the nodes who have * this stripe as their primary strpe are examined first. * * @param message The anycast message in question * @param parent Our current parent for this message's topic * @param children Our current children for this message's topic */ public void directAnycast(AnycastMessage message, NodeHandle parent, NodeHandle[] children) { /* * we add parent first if it shares prefix match */ if (parent != null) { if (SplitStreamScribePolicy.getPrefixMatch(message.getTopic().getId(), parent.getId(), splitStream.getStripeBaseBitLength()) > 0) { message.addFirst(parent); } else { message.addLast(parent); } } /* * if it's a subscribe */ if (message instanceof SubscribeMessage) { /* * First add children which match prefix with the stripe, then those which dont. * Introduce some randomness so that load is balanced among children. */ Vector good = new Vector(); Vector bad = new Vector(); for (int i = 0; i < children.length; i++) { if (SplitStreamScribePolicy.getPrefixMatch(message.getTopic().getId(), children[i].getId(), splitStream.getStripeBaseBitLength()) > 0) { good.add(children[i]); } else { bad.add(children[i]); } } int index; /* * introduce randomness to child order */ while (good.size() > 0) { index = scribe.getEnvironment().getRandomSource().nextInt(good.size()); message.addFirst((NodeHandle) (good.elementAt(index))); good.remove((NodeHandle) (good.elementAt(index))); } while (bad.size() > 0) { index = scribe.getEnvironment().getRandomSource().nextInt(bad.size()); message.addLast((NodeHandle) (bad.elementAt(index))); bad.remove((NodeHandle) (bad.elementAt(index))); } NodeHandle nextHop = message.getNext(); /* * make sure that the next node is alive */ while ((nextHop != null) && (!nextHop.isAlive())) { nextHop = message.getNext(); } if (nextHop == null) { /* * if nexthop is null, then we are in 3rd stage of algorithm for locating parent. * two cases, either * a. local node is a leaf * send message to our parent for dropping us and taking new subscriber * b. local node is root for non-prefix match topic, * drop a child from non-primary, non-root stripe and accept the new subscriber */ if (this.scribe.isRoot(message.getTopic())) { Vector res = freeBandwidthUltimate(message.getTopic().getId()); if (res != null) { scribe.removeChild(new Topic((Id) res.elementAt(1)), (NodeHandle) res.elementAt(0)); scribe.addChild(message.getTopic(), ((SubscribeMessage) message).getSubscriber()); return; } } else { SplitStreamSubscribeContent ssc = new SplitStreamSubscribeContent(SplitStreamSubscribeContent.STAGE_FINAL); message.remove(parent); message.addFirst(parent); message.setContent(ssc); } } else { message.addFirst(nextHop); } } } /** * This method makes an attempt to free up bandwidth from non-primary, * non-root stripes (for which local node is not root). * * @param stripeId DESCRIBE THE PARAMETER * @return A vector containing the child to be dropped and the corresponding * stripeId */ public Vector freeBandwidthUltimate(Id stripeId) { Channel channel = getChannel(new Topic(stripeId)); Stripe[] stripes = channel.getStripes(); // find those stripes which are // a) non-primary // b) i am not root for them // c) have at least one child Vector candidateStripes = new Vector(); Id victimStripeId = null; Topic tp; /* * find all candidate stripes */ for (int i = 0; i < stripes.length; i++) { tp = new Topic(stripes[i].getStripeId().getId()); if (!channel.getPrimaryStripe().getStripeId().getId().equals(stripes[i].getStripeId().getId()) && !this.scribe.isRoot(tp) && (scribe.getChildren(tp).length > 0)) { candidateStripes.add(stripes[i].getStripeId().getId()); } } /* * if there are no candidates, find somewhere where i am the root */ if (candidateStripes.size() == 0) { for (int i = 0; i < stripes.length; i++) { tp = new Topic(stripes[i].getStripeId().getId()); if ((!channel.getPrimaryStripe().getStripeId().getId().equals(stripes[i].getStripeId().getId())) && (scribe.getChildren(tp).length > 0) && (!stripes[i].getStripeId().getId().equals(stripeId))) { candidateStripes.add(stripes[i].getStripeId().getId()); } } } /* * hopefully, there is a candidate stripe */ if (candidateStripes.size() > 0) { victimStripeId = (Id) candidateStripes.elementAt(scribe.getEnvironment().getRandomSource().nextInt(candidateStripes.size())); NodeHandle[] children; children = this.scribe.getChildren(new Topic(victimStripeId)); NodeHandle child = children[scribe.getEnvironment().getRandomSource().nextInt(children.length)]; Vector result = new Vector(); result.addElement(child); result.addElement(victimStripeId); return result; } return null; } /** * This method attempts to free bandwidth from our primary stripe. It selects * a child whose prefix match with the stripe is minimum, and drops it. If * multiple such child exist and newChild has same prefix match as them, then * new child is not taken, otherwise a random selection is made. Otherwise, * new child is taken and victim child is dropped. * * @param channel DESCRIBE THE PARAMETER * @param newChild DESCRIBE THE PARAMETER * @param stripeId DESCRIBE THE PARAMETER * @return The victim child to drop. */ public NodeHandle freeBandwidth(Channel channel, NodeHandle newChild, Id stripeId) { Stripe primaryStripe = channel.getPrimaryStripe(); Id localId = channel.getLocalId(); /* * We have to drop one of child of the primary stripe */ NodeHandle[] children = scribe.getChildren(new Topic(primaryStripe.getStripeId().getId())); /* * Now, select that child which doesnt share least prefix with local node */ int minPrefixMatch = getPrefixMatch(stripeId, newChild.getId(), channel.getStripeBase()); /* * find all potential victims */ Vector victims = new Vector(); for (int j = 0; j < children.length; j++) { NodeHandle c = (NodeHandle) children[j]; int match = getPrefixMatch(stripeId, c.getId(), channel.getStripeBase()); if (match < minPrefixMatch) { victims.addElement(c); } } /* * check if new child is our best victim */ if (victims.size() == 0) { return newChild; } else { return (NodeHandle) victims.elementAt(scribe.getEnvironment().getRandomSource().nextInt(victims.size())); } } /** * Informs this policy that a child was added to a topic - the topic is free * to ignore this upcall if it doesn't care. * * @param topic The topic to unsubscribe from * @param child The child that was added */ public void childAdded(Topic topic, NodeHandle child) { } /** * Informs this policy that a child was removed from a topic - the topic is * free to ignore this upcall if it doesn't care. * * @param topic The topic to unsubscribe from * @param child The child that was removed */ public void childRemoved(Topic topic, NodeHandle child) { } /** * Helper method for finding prefix match between two Ids. * * @param target DESCRIBE THE PARAMETER * @param sample DESCRIBE THE PARAMETER * @param digitLength DESCRIBE THE PARAMETER * @return The number of most significant digits that match. */ public static int getPrefixMatch(Id target, Id sample, int digitLength) {// int digitLength = RoutingTable.baseBitLength(); int numDigits = rice.pastry.Id.IdBitLength / digitLength - 1; return (numDigits - ((rice.pastry.Id) target).indexOfMSDD((rice.pastry.Id) sample, digitLength)); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -