prefixhashtree.java

来自「High performance DB query」· Java 代码 · 共 858 行 · 第 1/3 页

JAVA
858
字号
		DEBUG("PROCESSSEND: KEY = " + KeyConverter.printBitID(theNodeKey));
		
		if (theNode == null) {
			theNode = new NodeState(NOT_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
			PHTNodes.put(theNodeKey, theNode);
		}
		
		if (messageData.getPlaceInKey() == 1 && theNode.getState() == NOT_NODE) {
			theNode.setState(LEAF_NODE);
			
			byte firstBit = keyInBinary[1];
			if (firstBit == 0) { firstBit = 1; }
			else { firstBit = 0; }
			BitID otherRootKey = new BitID(new BigInteger(new byte[]{'B', firstBit}));
			
			NewPHTMessage newMessage = NewPHTMessage.allocate(this.name, this.type, 
					this.selfSocketAddress, this.counter.nextValue(), otherRootKey);
			
			pierBackend.executorSend(otherRootKey, this.indexManager.getIndexNS(), "PHTnew", 0,
					newMessage, 0, true, false);
		}
		
		int newPIK = messageData.getPlaceInKey();
		
		switch(theNode.getState()) {
		
		case NOT_NODE:

			newPIK -= 1;
			byte[] shorterPrefix = new byte[newPIK+1];
			System.arraycopy(keyInBinary, 0, shorterPrefix, 0, newPIK+1);
			BitID parentNodeKey = new BitID(new BigInteger(shorterPrefix));
			
			PHTMessageData additionalData = PHTMessageData.allocate(new BitID(), newPIK, 0, 0);
			
			IndexSendMessage newMessage = IndexSendMessage.allocate(this.name, this.type, 
					message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData,
					message.getKeys(), message.getNS(), message.getRID(), message.getIID(), 
					message.getSendMessage());
			
			pierBackend.executorSend(parentNodeKey, this.indexManager.getIndexNS(), "PHTsend", 0,
					newMessage, 0, true, false);
			
			break;
			
		case INTERIOR_NODE:
			
			newPIK += 1;
			byte[] longerPrefix0 = new byte[newPIK+1];
			byte[] longerPrefix1 = new byte[newPIK+1];
			System.arraycopy(keyInBinary, 0, longerPrefix0, 0, newPIK);
			System.arraycopy(keyInBinary, 0, longerPrefix1, 0, newPIK);
			longerPrefix0[newPIK] = 0;
			longerPrefix1[newPIK] = 1;
			BitID leafNodeKey0 = new BitID(new BigInteger(longerPrefix0));
			BitID leafNodeKey1 = new BitID(new BigInteger(longerPrefix1));

			byte[] leftRange = ((BitID)message.getKeys()[0]).bigIntegerValue().toByteArray();
			BitID leftRangeKey = new BitID(new BigInteger(KeyConverter.getKeyInBinary(leftRange, numBits)));
			byte[] rightRange = ((BitID)message.getKeys()[1]).bigIntegerValue().toByteArray();
			BitID rightRangeKey = new BitID(new BigInteger(KeyConverter.getKeyInBinary(rightRange, numBits)));
			
			if (KeyConverter.keyIsInRange(leafNodeKey0, leftRangeKey, rightRangeKey)) {
				PHTMessageData additionalData0 = PHTMessageData.allocate(leafNodeKey0, newPIK, 0, 0);
				IndexSendMessage newMessage0 = IndexSendMessage.allocate(this.name, this.type, 
						message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData0,
						message.getKeys(), message.getNS(), message.getRID(), message.getIID(), 
						message.getSendMessage());
				pierBackend.executorSend(leafNodeKey0, this.indexManager.getIndexNS(), "PHTsend", 0,
						newMessage0, 0, true, false);
			}

			if (KeyConverter.keyIsInRange(leafNodeKey1, leftRangeKey, rightRangeKey)) {
				PHTMessageData additionalData1 = PHTMessageData.allocate(leafNodeKey1, newPIK, 0, 0);
				IndexSendMessage newMessage1 = IndexSendMessage.allocate(this.name, this.type, 
						message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData1,
						message.getKeys(), message.getNS(), message.getRID(), message.getIID(), 
						message.getSendMessage());
				pierBackend.executorSend(leafNodeKey1, this.indexManager.getIndexNS(), "PHTsend", 0,
						newMessage1, 0, true, false);
			}
			
			break;
			
		case LEAF_NODE:
		case SPLITTING_LEAF_NODE:
			
			DEBUG("Inserting " + message + " into DHT at node " + KeyConverter.printBitID(selfID));
			this.indexManager.deliverMessage(message);
			
			break;
			
		default:
			throw new PHTException("The method processSendMessage found node in unexpected state!");
		}
	}
	
	/**
	 * Method processInsertMessage
	 * Called by processMessage when it receives a message of type IndexInsertMessage
	 * 
	 * @param message message of IndexInsertMessage type from another node
	 * @param ns same as the Pier Index namespace
	 * @param rid value specified by sending node, opaque to the PierIndexManager
	 */
	protected void processInsertMessage(IndexInsertMessage message, String ns, String rid) {
		
		PHTMessageData messageData = (PHTMessageData) message.getAdditionalData();
		byte[] keyInBinary = messageData.getBinaryKey().bigIntegerValue().toByteArray();
		byte[] newKey = new byte[messageData.getPlaceInKey()+1];
		System.arraycopy(keyInBinary, 0, newKey, 0, newKey.length);
		BitID theNodeKey = new BitID(new BigInteger(newKey));
		NodeState theNode = (NodeState) PHTNodes.get(theNodeKey);
		
		if (theNode == null) {
			theNode = new NodeState(NOT_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
			PHTNodes.put(theNodeKey, theNode);
		}
		if (messageData.getPlaceInKey() == 1 && theNode.getState() == NOT_NODE) {
			theNode.setState(LEAF_NODE);
			
			byte firstBit = keyInBinary[1];
			if (firstBit == 0) { firstBit = 1; }
			else { firstBit = 0; }
			BitID otherRootKey = new BitID(new BigInteger(new byte[]{'B', firstBit}));
			
			NewPHTMessage newMessage = NewPHTMessage.allocate(this.name, this.type, 
					this.selfSocketAddress, this.counter.nextValue(), otherRootKey);
			
			pierBackend.executorSend(otherRootKey, this.indexManager.getIndexNS(), "PHTnew", 0,
					newMessage, 0, true, false);
		}
		
		switch (theNode.getState()) {
		
		case NOT_NODE:
		case INTERIOR_NODE:
			
			int newFOS = messageData.getFactorOfSplit() * 2;
			int newPIK;
			if (theNode.getState() == NOT_NODE) {
				newPIK = (int)(messageData.getPlaceInKey() - Math.ceil((double)numBits/newFOS));
			} else { 
				newPIK = (int)(messageData.getPlaceInKey() + Math.ceil((double)numBits/newFOS)); 
			}			
			byte[] prefix = new byte[newPIK+1];
			System.arraycopy(keyInBinary, 0, prefix, 0, newPIK+1);
			BitID nextNodeKey = new BitID(new BigInteger(prefix));
			PHTMessageData additionalData = PHTMessageData.allocate(messageData.getBinaryKey(),
					newPIK, newFOS, messageData.getIID());
			
			IndexInsertMessage newMessage = IndexInsertMessage.allocate(this.name, this.type,
					message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), additionalData,
					message.getKey(), message.getValue());
			
			pierBackend.executorSend(nextNodeKey, this.indexManager.getIndexNS(), "PHTinsert", 0,
					newMessage, 0, true, false);
			
			break;
			
		case LEAF_NODE:
			DEBUG("Storing <" + message.getKey() + "," + message.getValue() + "> at node " + 
					KeyConverter.printBitID((BitID)theNodeKey) + "(" + 
					KeyConverter.printBitID((BitID)selfID) + ")");
			theNode.storeLocally(message.getKey(), message.getValue(), 
					((PHTMessageData) message.getAdditionalData()).getIID(), message.getLifetime());
			this.numItems++;
			if (numItems > nodeCapacity) { clearExpiredValues(); }
			if (numItems > nodeCapacity) { initiateNodeSplit(); }
			
			break;
			
		case SPLITTING_LEAF_NODE:
			
			long newMessageID = this.counter.nextValue();
			theNode.addAckToList(newMessageID);
			
			byte[] keyInBinary2 = messageData.getBinaryKey().bigIntegerValue().toByteArray();
			byte differentBit = keyInBinary2[messageData.getPlaceInKey()+1];
			BitID nodeToSendData;
			if (differentBit > 0) {
				nodeToSendData = theNode.getRightLeafNode();
			} else { 
				nodeToSendData = theNode.getLeftLeafNode();
			}
			
			PHTMessageData additionalData2 = PHTMessageData.allocate(messageData.getBinaryKey(), 
					messageData.getPlaceInKey(), 0, messageData.getIID());
			PHTDataTransferMessage data = PHTDataTransferMessage.allocate(this.name, this.type,
					message.getLifetime(), this.selfSocketAddress, 
					newMessageID, additionalData2, message.getKey(), message.getValue(),
					theNodeKey, nodeToSendData, (short)0);
			
			pierBackend.executorSend(nodeToSendData, this.indexManager.getIndexNS(), "PHTdata", 0,
					data, 0, true, false);
			
			break;
			
		default:
			throw new PHTException("The method processInsertMessage found node in unexpected state!");
		}
	}
	
	/**
	 * Method processPrintMessage
	 * Used purely for debugging.  Will print node ID and contents, then forward along to
	 * next two leaf nodes.  Also checks for a few state constants.
	 * Does not go through usual DEBUG method, so prints to standard output regardless of settings.
	 */
	protected void processPrintMessage(PHTPrintTreeMessage message) {
		
		NodeState theNode = (NodeState) PHTNodes.get(new BitID(new BigInteger(message.getKeySoFar())));
		
		if (theNode == null) {
			throw new PHTException("Node not found in processPrintMessage!");
		}
		
		switch (theNode.getState()) {
		
		case INTERIOR_NODE:
			
			if (theNode.getNumLocalItems() != 0) {
				throw new PHTException("Interior node somehow has items stored at it!");
			}
			
			byte[] theKey = message.getKeySoFar();
			int theBits = message.getBitsSoFar();
			PRINT("--------------------------------------------");
			PRINT("<HEAD NODE: " + KeyConverter.printBitID(selfID) + ", NUM ITEMS: " + numItems);
			PRINT("NODE " + KeyConverter.printBitID(new BitID(new BigInteger(theKey))) + " <INTERIOR>");
			PRINT("--------------------------------------------");
			
			byte[] subtreeZero = new byte[theBits+2];
			System.arraycopy(theKey, 0, subtreeZero, 0, theBits+1);
			subtreeZero[theBits+1] = 0;
			byte[] subtreeOne = new byte[theBits+2];
			System.arraycopy(theKey, 0, subtreeOne, 0, theBits+1);
			subtreeOne[theBits+1] = 1;
			
			PHTPrintTreeMessage messageZero = PHTPrintTreeMessage.allocate(this.name, this.type, 
					0, this.selfSocketAddress, this.counter.nextValue(), theBits+1, subtreeZero);
			PHTPrintTreeMessage messageOne = PHTPrintTreeMessage.allocate(this.name, this.type, 
					0, this.selfSocketAddress, this.counter.nextValue(), theBits+1, subtreeOne);
			
			pierBackend.executorSend(new BitID(new BigInteger(subtreeZero)), 
					this.indexManager.getIndexNS(), "PHTprint", 0, messageZero, 0, true, false);
			pierBackend.executorSend(new BitID(new BigInteger(subtreeOne)), 
					this.indexManager.getIndexNS(), "PHTprint", 0, messageOne, 0, true, false);
			break;
		
		case LEAF_NODE:
		case SPLITTING_LEAF_NODE:
			
			theKey = message.getKeySoFar();
			PRINT("--------------------------------------------");
			PRINT("<HEAD NODE: " + KeyConverter.printBitID(selfID) + ", NUM ITEMS: " + numItems);
			PRINT("NODE " + KeyConverter.printBitID(new BitID(new BigInteger(theKey))) + " <LEAF>");
			if (theNode.getState() == SPLITTING_LEAF_NODE) { DEBUG("<SPLITTING NODE!!>"); }
			Iterator listOfKeys = theNode.getKeys();
			while (listOfKeys.hasNext()) {
				Payload nextKey = (Payload) listOfKeys.next();
				Iterator iteratorOfValues = theNode.getValues(nextKey);
				PHTLocalStoreContainer nextValue = null;
				while (iteratorOfValues.hasNext()) {
					nextValue = (PHTLocalStoreContainer) iteratorOfValues.next();
					DEBUG("		KEY: " + KeyConverter.printBitID(new BitID(new BigInteger(KeyConverter.getKeyInBinary(((BitID)nextKey).bigIntegerValue().toByteArray(), numBits)))));
				}
			}
			PRINT("		<" + theNode.getNumLocalItems() + " VALUES>");
			PRINT("--------------------------------------------");
			break;
			
		case NOT_NODE:
			// DON'T NEED TO PRINT ANYTHING!
			break;
			
		default:
			throw new PHTException("The method processPrintMessage found node in unexpected state!");
		}
	}
	
	/**
	 * Method processSplitMessage
	 * 
	 * @param message
	 */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?