⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prefixhashtree.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
	protected void processSplitMessage(SplitFurtherMessage message) {
		if (numItems > nodeCapacity) { clearExpiredValues(); }
		if (numItems > nodeCapacity) { initiateNodeSplit(); }
	}
	
	/**
	 * Method sendAck
	 * 
	 * @param message
	 */
	protected void sendAck(PHTDataTransferMessage message) {
		short transferDone;
		if (message.transferDone()) { transferDone = 1; }
		else { transferDone = 0; }
		PHTMessageAck ack = PHTMessageAck.allocate(this.name, this.type,
				message.getLifetime(), this.selfSocketAddress, this.counter.nextValue(), message.getSendingNode(),
				message.getMessageID(), transferDone);
		
		pierBackend.executorSend((BitID)message.getSendingNode(), this.indexManager.getIndexNS(), 
				"PHTack", 0, ack, 0, true, false);
	}
	
	/**
	 * Method initiateNodeSplit
	 */
	protected void initiateNodeSplit() {		
		// Split the fullest, then highest-up, PHT node at this PIER node
		Iterator nodeKeys = PHTNodes.keySet().iterator();
		BitID fullestNodeKey = null;
		int fullestNodeCapacity = 0;

		while (nodeKeys.hasNext()) {
			BitID nextKey = (BitID) nodeKeys.next();
			NodeState nextNode = (NodeState) PHTNodes.get(nextKey);
			if (nextNode.getNumLocalItems() > fullestNodeCapacity && nextNode.getState() == LEAF_NODE) {
				int keyLength = nextKey.bigIntegerValue().toByteArray().length-1;
				if (keyLength < numBits && keyLength <= 
					(fullestNodeKey == null ? numBits : fullestNodeKey.bigIntegerValue().toByteArray().length-1)) {
					fullestNodeKey = nextKey;
					fullestNodeCapacity = nextNode.getNumLocalItems();
				}
			}
		}
		
		if (fullestNodeKey == null) {
			DEBUG("All nodes are either empty or can't be split any further!");
			return;
		}
		
		DEBUG("SPLITTING NODE " + KeyConverter.printBitID(fullestNodeKey) + "(" + fullestNodeCapacity + " items)");
		NodeState theNode = (NodeState) PHTNodes.get(fullestNodeKey);
		theNode.setState(SPLITTING_LEAF_NODE);
		byte[] binaryKeyArray = fullestNodeKey.bigIntegerValue().toByteArray();
		int placeInLeafKeys = binaryKeyArray.length;
		byte[] leftLeafKey = new byte[placeInLeafKeys+1];
		byte[] rightLeafKey = new byte[placeInLeafKeys+1];
		System.arraycopy(binaryKeyArray, 0, leftLeafKey, 0, placeInLeafKeys);
		System.arraycopy(binaryKeyArray, 0, rightLeafKey, 0, placeInLeafKeys);
		leftLeafKey[placeInLeafKeys] = 0;
		rightLeafKey[placeInLeafKeys] = 1;
		theNode.setLeftLeafNode(new BitID(new BigInteger(leftLeafKey)));
		theNode.setRightLeafNode(new BitID(new BigInteger(rightLeafKey)));
		
		// Have to send messages last, because otherwise it all goes to hell when the new leaf PHT nodes
		// are in the same PIER node as the parent!
		LinkedList messagesToSend = new LinkedList();
		
		Iterator listOfKeys = theNode.getKeys();
		while (listOfKeys.hasNext()) {
			BitID nextKey = (BitID) listOfKeys.next();
			byte[] keyInBinary = KeyConverter.getKeyInBinary(nextKey.bigIntegerValue().toByteArray(), numBits);
			byte differentBit = keyInBinary[placeInLeafKeys];
			
			BitID nodeToSendData;
			if (differentBit > 0) {
				nodeToSendData = theNode.getRightLeafNode();
			} else { 
				nodeToSendData = theNode.getLeftLeafNode();
			}
			
			Iterator iteratorOfValues = theNode.getValues(nextKey);
			while (iteratorOfValues.hasNext()) {
				PHTLocalStoreContainer nextValue = (PHTLocalStoreContainer) iteratorOfValues.next();
				long messageID = this.counter.nextValue();
				theNode.addAckToList(messageID);
				
				PHTMessageData additionalData = PHTMessageData.allocate(new BitID(), 0, 0, nextValue.getIID());
				PHTDataTransferMessage data = PHTDataTransferMessage.allocate(this.name, this.type,
						nextValue.getRemainingLifetime(), this.selfSocketAddress, 
						messageID, additionalData, nextKey, nextValue.getValue(),
						fullestNodeKey, nodeToSendData, (short)0);
				messagesToSend.add(nodeToSendData);
				messagesToSend.add(data);
			}
		}
		
		DEBUG("CLEARING STORAGE FOR NODE " + KeyConverter.printBitID(fullestNodeKey));
		this.numItems -= theNode.getNumLocalItems();
		theNode.clearStorage();
		
		Iterator dataMessages = messagesToSend.iterator();
		while (dataMessages.hasNext()) {
			BitID nodeToSendData = (BitID) dataMessages.next();
			PHTDataTransferMessage data = (PHTDataTransferMessage) dataMessages.next();
			pierBackend.executorSend(nodeToSendData, this.indexManager.getIndexNS(), "PHTdata", 0,
					data, 0, true, false);
		}
	}
	
	/**
	 * Method clearExpiredValues
	 * Removes any values from local storage in the index whose lifetime has been exceeded.
	 */
	protected void clearExpiredValues() {
		
		int runningItemTotal = 0;
		
		Iterator listOfNodes = PHTNodes.values().iterator();
		while (listOfNodes.hasNext()) {
			NodeState theNode = (NodeState) listOfNodes.next();
			theNode.clearExpiredValues();
			runningItemTotal += theNode.getNumLocalItems();
		}
		
		this.numItems = runningItemTotal;
	}
	
	/**
	 * Method insert
	 * Invoked from any node, value should be sent to the node currently
	 * responsible for key, and stored in the index until its lifetime expires.
	 * If the value already exists (same key and same iid) the value is renewed
	 * for lifetime. Invoking this function may involve multiple nodes and
	 * network traffic.
	 *
	 * @param key the key is data value used to index, maybe any valid DataType,
	 * although specific indexes may limit types further
	 * @param value the actual object to be stored in the index, contents are
	 * opaque to the index
	 * @param iid a uniquifer, used to determine whether two objects with the
	 * same key are the actually the same
	 * @param lifetime duration, in milliseconds, to store the value in the
	 * index
	 */
	public void insert(Payload key, Payload value, int iid, long lifetime) {
		byte[] keyInBytes = ((BitID) key).bigIntegerValue().toByteArray();
		byte[] keyInBinary = KeyConverter.getKeyInBinary(keyInBytes, numBits);
		byte[] halfOfKey = new byte[(numBits/2)+1];
		System.arraycopy(keyInBinary, 0, halfOfKey, 0, (numBits/2)+1);
		BitID nextNodeKey = new BitID(new BigInteger(halfOfKey));
		BitID binaryKey = new BitID(new BigInteger(keyInBinary));
		
		DEBUG("INSERT: key=" + KeyConverter.printBitID(new BitID(new BigInteger(keyInBytes))) + 
				", binarykey=" + KeyConverter.printBitID(binaryKey));
		
		PHTMessageData additionalData = PHTMessageData.allocate(binaryKey, numBits/2, 2, iid);
		
		IndexInsertMessage message = IndexInsertMessage.allocate(this.name, this.type,
				lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData,
				key, value);
		
		pierBackend.executorSend(nextNodeKey, this.indexManager.getIndexNS(), "PHTinsert", 0,
				message, 0, true, false);
	}
	
	/**
	 * Method send
	 * Deliver a copy of sendMessage to every node that *may* store values
	 * encomposaed by a set of keys (number of keys and meaning depends on index
	 * type). At each of the nodes, deliver the message to the local DHT
	 * instance with storage ns, rid, iid and lifetime specified. The index
	 * should not store the message except while forwarding/delivering. Invoking
	 * this function may involve multiple nodes and network traffic.
	 *
	 * @param keys the set of keys (number of keys and meaning depends on index
	 * type) used to determine which set of nodes (one or more) should receive
	 * a copy of sendMessage
	 * @param sendMessage the actual data object to be delivered
	 * @param ns opaque value representing the DHT storage ns at each of the
	 * responsible nodes
	 * @param rid opaque value representing the DHT rid at each of the
	 * responsible nodes
	 * @param iid opaque value representing the DHT iid at each of the
	 * responsible nodes
	 * @param lifetime opaque value representing the DHT lifetime at each of the
	 * responsible node
	 */
	public void send(Payload[] keys, Payload sendMessage, String ns,
			String rid, int iid, long lifetime) {
		
		byte[] key1 = ((BitID) keys[0]).bigIntegerValue().toByteArray();
		byte[] key2 = ((BitID) keys[1]).bigIntegerValue().toByteArray();
		byte[] key1binary = KeyConverter.getKeyInBinary(key1, numBits);
		byte[] key2binary = KeyConverter.getKeyInBinary(key2, numBits);
		int commonPrefixEnd = 1;
		
		while (commonPrefixEnd < key1binary.length) {
			if (key1binary[commonPrefixEnd] != key2binary[commonPrefixEnd]) { break; }
			else { commonPrefixEnd++; }
		}
		
		BitID binaryKey1 = new BitID(new BigInteger(key1binary));
		BitID binaryKey2 = new BitID(new BigInteger(key2binary));
		
		DEBUG("SEND:");
		DEBUG("\t" + KeyConverter.printBitID(binaryKey1));
		DEBUG("\t" + KeyConverter.printBitID(binaryKey2));
		
		if (commonPrefixEnd > 1) {
			byte[] prefix = new byte[commonPrefixEnd-1];
			System.arraycopy(key1binary, 0, prefix, 0, commonPrefixEnd-1);
			BitID nextNodeKey = new BitID(new BigInteger(prefix));
			
			PHTMessageData additionalData = PHTMessageData.allocate(binaryKey1, commonPrefixEnd-1, 0, 0);
			
			IndexSendMessage message = IndexSendMessage.allocate(this.name, this.type, 
					lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData,
					keys, ns, rid, iid, sendMessage);
			
			pierBackend.executorSend(nextNodeKey, this.indexManager.getIndexNS(), "PHTsend", 0,
					message, 0, true, false);
			
		} else {
			// No common prefix, so send to both highest-up subtrees
			BitID firstKey = new BitID(new BigInteger(new byte[]{'B',0}));
			BitID secondKey = new BitID(new BigInteger(new byte[]{'B',1}));
			
			PHTMessageData additionalData1 = PHTMessageData.allocate(binaryKey1, 1, 0, 0);
			PHTMessageData additionalData2 = PHTMessageData.allocate(binaryKey2, 1, 0, 0);
			
			IndexSendMessage message1 = IndexSendMessage.allocate(this.name, this.type, 
					lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData1,
					keys, ns, rid, iid, sendMessage);
			IndexSendMessage message2 = IndexSendMessage.allocate(this.name, this.type, 
					lifetime, this.selfSocketAddress, this.counter.nextValue(), additionalData2,
					keys, ns, rid, iid, sendMessage);
			
			pierBackend.executorSend(firstKey, this.indexManager.getIndexNS(), "PHTsend", 0,
					message1, 0, true, false);
			pierBackend.executorSend(secondKey, this.indexManager.getIndexNS(), "PHTsend", 0,
					message2, 0, true, false);
		}
	}
	
	/**
	 * Method retrieve
	 * Locally retrieve data that matches a set of keys. No network traffic is
	 * generated as a direct result of invoking this method
	 *
	 * @param keys the set of keys (number of keys and meaning depends on index
	 * type) to retrieve
	 * @param client a callback client to receive data once ready
	 */
	public void retrieve(Payload[] keys, IndexClient client) {
		
		clearExpiredValues();
		
		Iterator listOfNodes = PHTNodes.values().iterator();
		while (listOfNodes.hasNext()) {
			NodeState theNode = (NodeState) listOfNodes.next();
			theNode.retrieve(keys, client);
		}
	}
	
	/**
	 * Method printTree
	 *
	 * For debugging purposes only, sends a message to the tree's two root nodes and then
	 * perpetuates down, printing tree contents and node status along the way.
	 */
	public void printTree() {
		byte[] subtreeZero = new byte[]{'B', 0};
		byte[] subtreeOne = new byte[]{'B', 1};
		
		PHTPrintTreeMessage messageZero = PHTPrintTreeMessage.allocate(this.name, this.type, 
				0, this.selfSocketAddress, this.counter.nextValue(), 1, subtreeZero);
		PHTPrintTreeMessage messageOne = PHTPrintTreeMessage.allocate(this.name, this.type, 
				0, this.selfSocketAddress, this.counter.nextValue(), 1, subtreeOne);
		
		pierBackend.executorSend(new BitID(new BigInteger(subtreeZero)), 
				this.indexManager.getIndexNS(), "PHTprint", 0, messageZero, 0, true, false);
		pierBackend.executorSend(new BitID(new BigInteger(subtreeOne)), 
				this.indexManager.getIndexNS(), "PHTprint", 0, messageOne, 0, true, false);
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -