prefixhashtree.java
来自「High performance DB query」· Java 代码 · 共 858 行 · 第 1/3 页
JAVA
858 行
DEBUG("PROCESSSEND: KEY = " + KeyConverter.printBitID(theNodeKey));
if (theNode == null) {
theNode = new NodeState(NOT_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
PHTNodes.put(theNodeKey, theNode);
}
if (messageData.getPlaceInKey() == 1 && theNode.getState() == NOT_NODE) {
theNode.setState(LEAF_NODE);
byte firstBit = keyInBinary[1];
if (firstBit == 0) { firstBit = 1; }
else { firstBit = 0; }
BitID otherRootKey = new BitID(new BigInteger(new byte[]{'B', firstBit}));
NewPHTMessage newMessage = NewPHTMessage.allocate(this.name, this.type,
this.selfSocketAddress, this.counter.nextValue(), otherRootKey);
pierBackend.executorSend(otherRootKey, this.indexManager.getIndexNS(), "PHTnew", 0,
newMessage, 0, true, false);
}
int newPIK = messageData.getPlaceInKey();
switch(theNode.getState()) {
case NOT_NODE:
newPIK -= 1;
byte[] shorterPrefix = new byte[newPIK+1];
System.arraycopy(keyInBinary, 0, shorterPrefix, 0, newPIK+1);
BitID parentNodeKey = new BitID(new BigInteger(shorterPrefix));
PHTMessageData additionalData = PHTMessageData.allocate(new BitID(), newPIK, 0, 0);
IndexSendMessage newMessage = IndexSendMessage.allocate(this.name, this.type,
message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData,
message.getKeys(), message.getNS(), message.getRID(), message.getIID(),
message.getSendMessage());
pierBackend.executorSend(parentNodeKey, this.indexManager.getIndexNS(), "PHTsend", 0,
newMessage, 0, true, false);
break;
case INTERIOR_NODE:
newPIK += 1;
byte[] longerPrefix0 = new byte[newPIK+1];
byte[] longerPrefix1 = new byte[newPIK+1];
System.arraycopy(keyInBinary, 0, longerPrefix0, 0, newPIK);
System.arraycopy(keyInBinary, 0, longerPrefix1, 0, newPIK);
longerPrefix0[newPIK] = 0;
longerPrefix1[newPIK] = 1;
BitID leafNodeKey0 = new BitID(new BigInteger(longerPrefix0));
BitID leafNodeKey1 = new BitID(new BigInteger(longerPrefix1));
byte[] leftRange = ((BitID)message.getKeys()[0]).bigIntegerValue().toByteArray();
BitID leftRangeKey = new BitID(new BigInteger(KeyConverter.getKeyInBinary(leftRange, numBits)));
byte[] rightRange = ((BitID)message.getKeys()[1]).bigIntegerValue().toByteArray();
BitID rightRangeKey = new BitID(new BigInteger(KeyConverter.getKeyInBinary(rightRange, numBits)));
if (KeyConverter.keyIsInRange(leafNodeKey0, leftRangeKey, rightRangeKey)) {
PHTMessageData additionalData0 = PHTMessageData.allocate(leafNodeKey0, newPIK, 0, 0);
IndexSendMessage newMessage0 = IndexSendMessage.allocate(this.name, this.type,
message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData0,
message.getKeys(), message.getNS(), message.getRID(), message.getIID(),
message.getSendMessage());
pierBackend.executorSend(leafNodeKey0, this.indexManager.getIndexNS(), "PHTsend", 0,
newMessage0, 0, true, false);
}
if (KeyConverter.keyIsInRange(leafNodeKey1, leftRangeKey, rightRangeKey)) {
PHTMessageData additionalData1 = PHTMessageData.allocate(leafNodeKey1, newPIK, 0, 0);
IndexSendMessage newMessage1 = IndexSendMessage.allocate(this.name, this.type,
message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData1,
message.getKeys(), message.getNS(), message.getRID(), message.getIID(),
message.getSendMessage());
pierBackend.executorSend(leafNodeKey1, this.indexManager.getIndexNS(), "PHTsend", 0,
newMessage1, 0, true, false);
}
break;
case LEAF_NODE:
case SPLITTING_LEAF_NODE:
DEBUG("Inserting " + message + " into DHT at node " + KeyConverter.printBitID(selfID));
this.indexManager.deliverMessage(message);
break;
default:
throw new PHTException("The method processSendMessage found node in unexpected state!");
}
}
/**
* Method processInsertMessage
* Called by processMessage when it receives a message of type IndexInsertMessage
*
* @param message message of IndexInsertMessage type from another node
* @param ns same as the Pier Index namespace
* @param rid value specified by sending node, opaque to the PierIndexManager
*/
protected void processInsertMessage(IndexInsertMessage message, String ns, String rid) {
PHTMessageData messageData = (PHTMessageData) message.getAdditionalData();
byte[] keyInBinary = messageData.getBinaryKey().bigIntegerValue().toByteArray();
byte[] newKey = new byte[messageData.getPlaceInKey()+1];
System.arraycopy(keyInBinary, 0, newKey, 0, newKey.length);
BitID theNodeKey = new BitID(new BigInteger(newKey));
NodeState theNode = (NodeState) PHTNodes.get(theNodeKey);
if (theNode == null) {
theNode = new NodeState(NOT_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
PHTNodes.put(theNodeKey, theNode);
}
if (messageData.getPlaceInKey() == 1 && theNode.getState() == NOT_NODE) {
theNode.setState(LEAF_NODE);
byte firstBit = keyInBinary[1];
if (firstBit == 0) { firstBit = 1; }
else { firstBit = 0; }
BitID otherRootKey = new BitID(new BigInteger(new byte[]{'B', firstBit}));
NewPHTMessage newMessage = NewPHTMessage.allocate(this.name, this.type,
this.selfSocketAddress, this.counter.nextValue(), otherRootKey);
pierBackend.executorSend(otherRootKey, this.indexManager.getIndexNS(), "PHTnew", 0,
newMessage, 0, true, false);
}
switch (theNode.getState()) {
case NOT_NODE:
case INTERIOR_NODE:
int newFOS = messageData.getFactorOfSplit() * 2;
int newPIK;
if (theNode.getState() == NOT_NODE) {
newPIK = (int)(messageData.getPlaceInKey() - Math.ceil((double)numBits/newFOS));
} else {
newPIK = (int)(messageData.getPlaceInKey() + Math.ceil((double)numBits/newFOS));
}
byte[] prefix = new byte[newPIK+1];
System.arraycopy(keyInBinary, 0, prefix, 0, newPIK+1);
BitID nextNodeKey = new BitID(new BigInteger(prefix));
PHTMessageData additionalData = PHTMessageData.allocate(messageData.getBinaryKey(),
newPIK, newFOS, messageData.getIID());
IndexInsertMessage newMessage = IndexInsertMessage.allocate(this.name, this.type,
message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData,
message.getKey(), message.getValue());
pierBackend.executorSend(nextNodeKey, this.indexManager.getIndexNS(), "PHTinsert", 0,
newMessage, 0, true, false);
break;
case LEAF_NODE:
DEBUG("Storing <" + message.getKey() + "," + message.getValue() + "> at node " +
KeyConverter.printBitID((BitID)theNodeKey) + "(" +
KeyConverter.printBitID((BitID)selfID) + ")");
theNode.storeLocally(message.getKey(), message.getValue(),
((PHTMessageData) message.getAdditionalData()).getIID(), message.getLifetime());
this.numItems++;
if (numItems > nodeCapacity) { clearExpiredValues(); }
if (numItems > nodeCapacity) { initiateNodeSplit(); }
break;
case SPLITTING_LEAF_NODE:
long newMessageID = this.counter.nextValue();
theNode.addAckToList(newMessageID);
byte[] keyInBinary2 = messageData.getBinaryKey().bigIntegerValue().toByteArray();
byte differentBit = keyInBinary2[messageData.getPlaceInKey()+1];
BitID nodeToSendData;
if (differentBit > 0) {
nodeToSendData = theNode.getRightLeafNode();
} else {
nodeToSendData = theNode.getLeftLeafNode();
}
PHTMessageData additionalData2 = PHTMessageData.allocate(messageData.getBinaryKey(),
messageData.getPlaceInKey(), 0, messageData.getIID());
PHTDataTransferMessage data = PHTDataTransferMessage.allocate(this.name, this.type,
message.getLifetime(), this.selfSocketAddress,
newMessageID, additionalData2, message.getKey(), message.getValue(),
theNodeKey, nodeToSendData, (short)0);
pierBackend.executorSend(nodeToSendData, this.indexManager.getIndexNS(), "PHTdata", 0,
data, 0, true, false);
break;
default:
throw new PHTException("The method processInsertMessage found node in unexpected state!");
}
}
/**
* Method processPrintMessage
* Used purely for debugging. Will print node ID and contents, then forward along to
* next two leaf nodes. Also checks for a few state constants.
* Does not go through usual DEBUG method, so prints to standard output regardless of settings.
*/
protected void processPrintMessage(PHTPrintTreeMessage message) {
NodeState theNode = (NodeState) PHTNodes.get(new BitID(new BigInteger(message.getKeySoFar())));
if (theNode == null) {
throw new PHTException("Node not found in processPrintMessage!");
}
switch (theNode.getState()) {
case INTERIOR_NODE:
if (theNode.getNumLocalItems() != 0) {
throw new PHTException("Interior node somehow has items stored at it!");
}
byte[] theKey = message.getKeySoFar();
int theBits = message.getBitsSoFar();
PRINT("--------------------------------------------");
PRINT("<HEAD NODE: " + KeyConverter.printBitID(selfID) + ", NUM ITEMS: " + numItems);
PRINT("NODE " + KeyConverter.printBitID(new BitID(new BigInteger(theKey))) + " <INTERIOR>");
PRINT("--------------------------------------------");
byte[] subtreeZero = new byte[theBits+2];
System.arraycopy(theKey, 0, subtreeZero, 0, theBits+1);
subtreeZero[theBits+1] = 0;
byte[] subtreeOne = new byte[theBits+2];
System.arraycopy(theKey, 0, subtreeOne, 0, theBits+1);
subtreeOne[theBits+1] = 1;
PHTPrintTreeMessage messageZero = PHTPrintTreeMessage.allocate(this.name, this.type,
0, this.selfSocketAddress, this.counter.nextValue(), theBits+1, subtreeZero);
PHTPrintTreeMessage messageOne = PHTPrintTreeMessage.allocate(this.name, this.type,
0, this.selfSocketAddress, this.counter.nextValue(), theBits+1, subtreeOne);
pierBackend.executorSend(new BitID(new BigInteger(subtreeZero)),
this.indexManager.getIndexNS(), "PHTprint", 0, messageZero, 0, true, false);
pierBackend.executorSend(new BitID(new BigInteger(subtreeOne)),
this.indexManager.getIndexNS(), "PHTprint", 0, messageOne, 0, true, false);
break;
case LEAF_NODE:
case SPLITTING_LEAF_NODE:
theKey = message.getKeySoFar();
PRINT("--------------------------------------------");
PRINT("<HEAD NODE: " + KeyConverter.printBitID(selfID) + ", NUM ITEMS: " + numItems);
PRINT("NODE " + KeyConverter.printBitID(new BitID(new BigInteger(theKey))) + " <LEAF>");
if (theNode.getState() == SPLITTING_LEAF_NODE) { DEBUG("<SPLITTING NODE!!>"); }
Iterator listOfKeys = theNode.getKeys();
while (listOfKeys.hasNext()) {
Payload nextKey = (Payload) listOfKeys.next();
Iterator iteratorOfValues = theNode.getValues(nextKey);
PHTLocalStoreContainer nextValue = null;
while (iteratorOfValues.hasNext()) {
nextValue = (PHTLocalStoreContainer) iteratorOfValues.next();
DEBUG(" KEY: " + KeyConverter.printBitID(new BitID(new BigInteger(KeyConverter.getKeyInBinary(((BitID)nextKey).bigIntegerValue().toByteArray(), numBits)))));
}
}
PRINT(" <" + theNode.getNumLocalItems() + " VALUES>");
PRINT("--------------------------------------------");
break;
case NOT_NODE:
// DON'T NEED TO PRINT ANYTHING!
break;
default:
throw new PHTException("The method processPrintMessage found node in unexpected state!");
}
}
/**
* Method processSplitMessage
*
* @param message
*/
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?