📄 prefixhashtree.java
字号:
protected void processSplitMessage(SplitFurtherMessage message) {
if (numItems > nodeCapacity) { clearExpiredValues(); }
if (numItems > nodeCapacity) { initiateNodeSplit(); }
}
/**
* Method sendAck
*
* @param message
*/
protected void sendAck(PHTDataTransferMessage message) {
short transferDone;
if (message.transferDone()) { transferDone = 1; }
else { transferDone = 0; }
PHTMessageAck ack = PHTMessageAck.allocate(this.name, this.type,
message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), message.getSendingNode(),
message.getMessageID(), transferDone);
pierBackend.executorSend((BitID)message.getSendingNode(), this.indexManager.getIndexNS(),
"PHTack", 0, ack, 0, true, false);
}
/**
* Method initiateNodeSplit
*/
protected void initiateNodeSplit() {
// Split the fullest, then highest-up, PHT node at this PIER node
Iterator nodeKeys = PHTNodes.keySet().iterator();
BitID fullestNodeKey = null;
int fullestNodeCapacity = 0;
while (nodeKeys.hasNext()) {
BitID nextKey = (BitID) nodeKeys.next();
NodeState nextNode = (NodeState) PHTNodes.get(nextKey);
if (nextNode.getNumLocalItems() > fullestNodeCapacity && nextNode.getState() == LEAF_NODE) {
int keyLength = nextKey.bigIntegerValue().toByteArray().length-1;
if (keyLength < numBits && keyLength <=
(fullestNodeKey == null ? numBits : fullestNodeKey.bigIntegerValue().toByteArray().length-1)) {
fullestNodeKey = nextKey;
fullestNodeCapacity = nextNode.getNumLocalItems();
}
}
}
if (fullestNodeKey == null) {
DEBUG("All nodes are either empty or can't be split any further!");
return;
}
DEBUG("SPLITTING NODE " + KeyConverter.printBitID(fullestNodeKey) + "(" + fullestNodeCapacity + " items)");
NodeState theNode = (NodeState) PHTNodes.get(fullestNodeKey);
theNode.setState(SPLITTING_LEAF_NODE);
byte[] binaryKeyArray = fullestNodeKey.bigIntegerValue().toByteArray();
int placeInLeafKeys = binaryKeyArray.length;
byte[] leftLeafKey = new byte[placeInLeafKeys+1];
byte[] rightLeafKey = new byte[placeInLeafKeys+1];
System.arraycopy(binaryKeyArray, 0, leftLeafKey, 0, placeInLeafKeys);
System.arraycopy(binaryKeyArray, 0, rightLeafKey, 0, placeInLeafKeys);
leftLeafKey[placeInLeafKeys] = 0;
rightLeafKey[placeInLeafKeys] = 1;
theNode.setLeftLeafNode(new BitID(new BigInteger(leftLeafKey)));
theNode.setRightLeafNode(new BitID(new BigInteger(rightLeafKey)));
// Have to send messages last, because otherwise it all goes to hell when the new leaf PHT nodes
// are in the same PIER node as the parent!
LinkedList messagesToSend = new LinkedList();
Iterator listOfKeys = theNode.getKeys();
while (listOfKeys.hasNext()) {
BitID nextKey = (BitID) listOfKeys.next();
byte[] keyInBinary = KeyConverter.getKeyInBinary(nextKey.bigIntegerValue().toByteArray(), numBits);
byte differentBit = keyInBinary[placeInLeafKeys];
BitID nodeToSendData;
if (differentBit > 0) {
nodeToSendData = theNode.getRightLeafNode();
} else {
nodeToSendData = theNode.getLeftLeafNode();
}
Iterator iteratorOfValues = theNode.getValues(nextKey);
while (iteratorOfValues.hasNext()) {
PHTLocalStoreContainer nextValue = (PHTLocalStoreContainer) iteratorOfValues.next();
long messageID = this.counter.nextValue();
theNode.addAckToList(messageID);
PHTMessageData additionalData = PHTMessageData.allocate(new BitID(), 0, 0, nextValue.getIID());
PHTDataTransferMessage data = PHTDataTransferMessage.allocate(this.name, this.type,
nextValue.getRemainingLifetime(), this.selfSocketAddress,
messageID, additionalData, nextKey, nextValue.getValue(),
fullestNodeKey, nodeToSendData, (short)0);
messagesToSend.add(nodeToSendData);
messagesToSend.add(data);
}
}
DEBUG("CLEARING STORAGE FOR NODE " + KeyConverter.printBitID(fullestNodeKey));
this.numItems -= theNode.getNumLocalItems();
theNode.clearStorage();
Iterator dataMessages = messagesToSend.iterator();
while (dataMessages.hasNext()) {
BitID nodeToSendData = (BitID) dataMessages.next();
PHTDataTransferMessage data = (PHTDataTransferMessage) dataMessages.next();
pierBackend.executorSend(nodeToSendData, this.indexManager.getIndexNS(), "PHTdata", 0,
data, 0, true, false);
}
}
/**
* Method clearExpiredValues
* Removes any values from local storage in the index whose lifetime has been exceeded.
*/
protected void clearExpiredValues() {
int runningItemTotal = 0;
Iterator listOfNodes = PHTNodes.values().iterator();
while (listOfNodes.hasNext()) {
NodeState theNode = (NodeState) listOfNodes.next();
theNode.clearExpiredValues();
runningItemTotal += theNode.getNumLocalItems();
}
this.numItems = runningItemTotal;
}
/**
* Method insert
* Invoked from any node, value should be sent to the node currently
* responsible for key, and stored in the index until its lifetime expires.
* If the value already exists (same key and same iid) the value is renewed
* for lifetime. Invoking this function may involve multiple nodes and
* network traffic.
*
* @param key the key is data value used to index, maybe any valid DataType,
* although specific indexes may limit types further
* @param value the actual object to be stored in the index, contents are
* opaque to the index
* @param iid a uniquifer, used to determine whether two objects with the
* same key are the actually the same
* @param lifetime duration, in milliseconds, to store the value in the
* index
*/
public void insert(Payload key, Payload value, int iid, long lifetime) {
byte[] keyInBytes = ((BitID) key).bigIntegerValue().toByteArray();
byte[] keyInBinary = KeyConverter.getKeyInBinary(keyInBytes, numBits);
byte[] halfOfKey = new byte[(numBits/2)+1];
System.arraycopy(keyInBinary, 0, halfOfKey, 0, (numBits/2)+1);
BitID nextNodeKey = new BitID(new BigInteger(halfOfKey));
BitID binaryKey = new BitID(new BigInteger(keyInBinary));
DEBUG("INSERT: key=" + KeyConverter.printBitID(new BitID(new BigInteger(keyInBytes))) +
", binarykey=" + KeyConverter.printBitID(binaryKey));
PHTMessageData additionalData = PHTMessageData.allocate(binaryKey, numBits/2, 2, iid);
IndexInsertMessage message = IndexInsertMessage.allocate(this.name, this.type,
lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData,
key, value);
pierBackend.executorSend(nextNodeKey, this.indexManager.getIndexNS(), "PHTinsert", 0,
message, 0, true, false);
}
/**
* Method send
* Deliver a copy of sendMessage to every node that *may* store values
* encomposaed by a set of keys (number of keys and meaning depends on index
* type). At each of the nodes, deliver the message to the local DHT
* instance with storage ns, rid, iid and lifetime specified. The index
* should not store the message except while forwarding/delivering. Invoking
* this function may involve multiple nodes and network traffic.
*
* @param keys the set of keys (number of keys and meaning depends on index
* type) used to determine which set of nodes (one or more) should receive
* a copy of sendMessage
* @param sendMessage the actual data object to be delivered
* @param ns opaque value representing the DHT storage ns at each of the
* responsible nodes
* @param rid opaque value representing the DHT rid at each of the
* responsible nodes
* @param iid opaque value representing the DHT iid at each of the
* responsible nodes
* @param lifetime opaque value representing the DHT lifetime at each of the
* responsible node
*/
public void send(Payload[] keys, Payload sendMessage, String ns,
String rid, int iid, long lifetime) {
byte[] key1 = ((BitID) keys[0]).bigIntegerValue().toByteArray();
byte[] key2 = ((BitID) keys[1]).bigIntegerValue().toByteArray();
byte[] key1binary = KeyConverter.getKeyInBinary(key1, numBits);
byte[] key2binary = KeyConverter.getKeyInBinary(key2, numBits);
int commonPrefixEnd = 1;
while (commonPrefixEnd < key1binary.length) {
if (key1binary[commonPrefixEnd] != key2binary[commonPrefixEnd]) { break; }
else { commonPrefixEnd++; }
}
BitID binaryKey1 = new BitID(new BigInteger(key1binary));
BitID binaryKey2 = new BitID(new BigInteger(key2binary));
DEBUG("SEND:");
DEBUG("\t" + KeyConverter.printBitID(binaryKey1));
DEBUG("\t" + KeyConverter.printBitID(binaryKey2));
if (commonPrefixEnd > 1) {
byte[] prefix = new byte[commonPrefixEnd-1];
System.arraycopy(key1binary, 0, prefix, 0, commonPrefixEnd-1);
BitID nextNodeKey = new BitID(new BigInteger(prefix));
PHTMessageData additionalData = PHTMessageData.allocate(binaryKey1, commonPrefixEnd-1, 0, 0);
IndexSendMessage message = IndexSendMessage.allocate(this.name, this.type,
lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData,
keys, ns, rid, iid, sendMessage);
pierBackend.executorSend(nextNodeKey, this.indexManager.getIndexNS(), "PHTsend", 0,
message, 0, true, false);
} else {
// No common prefix, so send to both highest-up subtrees
BitID firstKey = new BitID(new BigInteger(new byte[]{'B',0}));
BitID secondKey = new BitID(new BigInteger(new byte[]{'B',1}));
PHTMessageData additionalData1 = PHTMessageData.allocate(binaryKey1, 1, 0, 0);
PHTMessageData additionalData2 = PHTMessageData.allocate(binaryKey2, 1, 0, 0);
IndexSendMessage message1 = IndexSendMessage.allocate(this.name, this.type,
lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData1,
keys, ns, rid, iid, sendMessage);
IndexSendMessage message2 = IndexSendMessage.allocate(this.name, this.type,
lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData2,
keys, ns, rid, iid, sendMessage);
pierBackend.executorSend(firstKey, this.indexManager.getIndexNS(), "PHTsend", 0,
message1, 0, true, false);
pierBackend.executorSend(secondKey, this.indexManager.getIndexNS(), "PHTsend", 0,
message2, 0, true, false);
}
}
/**
* Method retrieve
* Locally retrieve data that matches a set of keys. No network traffic is
* generated as a direct result of invoking this method
*
* @param keys the set of keys (number of keys and meaning depends on index
* type) to retrieve
* @param client a callback client to receive data once ready
*/
public void retrieve(Payload[] keys, IndexClient client) {
clearExpiredValues();
Iterator listOfNodes = PHTNodes.values().iterator();
while (listOfNodes.hasNext()) {
NodeState theNode = (NodeState) listOfNodes.next();
theNode.retrieve(keys, client);
}
}
/**
* Method printTree
*
* For debugging purposes only, sends a message to the tree's two root nodes and then
* perpetuates down, printing tree contents and node status along the way.
*/
public void printTree() {
byte[] subtreeZero = new byte[]{'B', 0};
byte[] subtreeOne = new byte[]{'B', 1};
PHTPrintTreeMessage messageZero = PHTPrintTreeMessage.allocate(this.name, this.type,
0, this.selfSocketAddress, this.counter.nextValue(), 1, subtreeZero);
PHTPrintTreeMessage messageOne = PHTPrintTreeMessage.allocate(this.name, this.type,
0, this.selfSocketAddress, this.counter.nextValue(), 1, subtreeOne);
pierBackend.executorSend(new BitID(new BigInteger(subtreeZero)),
this.indexManager.getIndexNS(), "PHTprint", 0, messageZero, 0, true, false);
pierBackend.executorSend(new BitID(new BigInteger(subtreeOne)),
this.indexManager.getIndexNS(), "PHTprint", 0, messageOne, 0, true, false);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -