⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prefixhashtree.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*
 * @(#)$Id: PrefixHashTree.java,v 1.4 2005/12/30 23:22:22 burkhart Exp $
 *
 * Copyright (c) 2001-2004 Regents of the University of California.
 * All rights reserved.
 *
 * This file is distributed under the terms in the attached BERKELEY-LICENSE
 * file. If you do not find these files, copies can be found by writing to:
 * Computer Science Division, Database Group, Universite of California,
 * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License
 *
 * Copyright (c) 2003-2004 Intel Corporation. All rights reserved.
 *
 * This file is distributed under the terms in the attached INTEL-LICENSE file.
 * If you do not find these files, copies can be found by writing to:
 * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300,
 * Berkeley, CA, 94704.  Attention:  Intel License Inquiry.
 */


package pier.indexes.prefixhashtree;

import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.TreeMap;

import org.apache.log4j.Logger;
import pier.components.PierBackend;
import pier.components.PierIndexManager;
import pier.indexes.Index;
import pier.indexes.IndexClient;
import pier.messages.IndexInsertMessage;
import pier.messages.IndexMessage;
import pier.messages.IndexSendMessage;
import services.Output;
import services.network.Payload;
import util.BitID;
import util.logging.LogMessage;
import util.logging.StructuredLogMessage;
import util.network.serialization.SerializationManager;
import util.timer.TimeBasedCounter;

/**
 * Class PrefixHashTree
 *
 */
public class PrefixHashTree implements Index {
	
	private static Logger logger = Logger.getLogger(PrefixHashTree.class);
	protected static final int NOT_NODE = 0;
	protected static final int INTERIOR_NODE = 1;
	protected static final int LEAF_NODE = 2;
	protected static final int SPLITTING_LEAF_NODE = 3;
	protected String name;
	protected String type;
	protected PierBackend pierBackend;
	protected PierIndexManager indexManager;
	protected TimeBasedCounter counter;
	protected BitID rootID;
	protected BitID selfID;
	protected InetSocketAddress selfSocketAddress;
	protected int numBits;
	protected int nodeCapacity;
	protected int numItems;
	protected HashMap PHTNodes;
	
	// True to output PHT debugging, regardless of system settings
	private static final boolean PHT_OUTPUT_ONLY = false;
	
	/**
	 * Method DEBUG
	 * Prints if debugging is turned on.
	 * 
	 * @param s
	 */
	public static void DEBUG(String s) {
		if (PHT_OUTPUT_ONLY == true || Output.debuggingEnabled) { PRINT(s); }
	}
	
	/**
	 * Method PRINT
	 * Makes it easier to write statements to be output to the log.
	 * Also makes it easier to change the output method (only in one place).
	 * 
	 * @param s
	 */
	public static void PRINT(String s) {
		logger.debug(new LogMessage(new Object[]{"[PHT] ", s}));
	}
	
	/**
	 * Method init
	 *
	 * @param name
	 * @param type
	 * @param pierBackend
	 * @param indexManager
	 * @param parameters
	 */
	public void init(String name, String type, PierBackend pierBackend,
			PierIndexManager indexManager, Payload[] parameters) {
		
		this.name = name;
		this.type = type;
		this.pierBackend = pierBackend;
		this.indexManager = indexManager;
		this.counter = new TimeBasedCounter();
		this.numBits = Integer.parseInt(
				pierBackend.executorGetSetting(
				"idBits"));
		this.rootID = new BitID(name, 0, this.numBits);
		this.selfID = pierBackend.getProvider().getNodeID();
		this.selfSocketAddress =
			pierBackend.getProvider().getNodeSocketAddress();
		this.nodeCapacity = Integer.parseInt(
				pierBackend.executorGetSetting(
				"pierPHTNodeCapacity"));
		this.numItems = 0;
		this.PHTNodes = new HashMap();
		
		DEBUG("MADE NODE " + KeyConverter.printBitID(selfID));
	}
	
	/**
	 *     Method processPut
	 *     Should never be called on a PHT
	 * 
	 *     @param message
	 *     @param ns
	 *     @param rid
	 */
	public void processPut(IndexMessage message, String ns, String rid) {
		throw new PHTException("The method processPut should never be called on a PHT!");
	}
	
	/**
	 * Method processMessage
	 * Called by the PierIndexManager to deliver a network message that arrived
	 * destined for this index. Any messages sent to the Pier Index namespace
	 * of type IndexMessage (or derivate) are delivered by this method
	 *
	 * @param message message of IndexMessage (or derivate) type from another
	 * node
	 * @param ns same as the Pier Index namespace
	 * @param rid value specified by sending node, opaque to the
	 * PierIndexManager
	 */
	public boolean processMessage(IndexMessage message, String ns, String rid, boolean midway, boolean local) {
		
		if (midway) { return false; }
		
		if (message instanceof NewPHTMessage) {
			processNewMessage((NewPHTMessage) message);
		} else if (message instanceof PHTDataTransferMessage) {
			processDataMessage((PHTDataTransferMessage) message);
		} else if (message instanceof PHTMessageAck) {
			processAck((PHTMessageAck) message);
		} else if (message instanceof IndexSendMessage) {
			processSendMessage(((IndexSendMessage) message), ns, rid);
		} else if (message instanceof IndexInsertMessage) {
			processInsertMessage(((IndexInsertMessage) message), ns, rid);
		} else if (message instanceof PHTPrintTreeMessage) {
			processPrintMessage((PHTPrintTreeMessage) message);
		} else if (message instanceof SplitFurtherMessage) {
			processSplitMessage((SplitFurtherMessage) message);
		} else {
			throw new PHTException("Received unexpected message!");
		}
		
		return true;
	}
	
	/**
	 * Method processNewMessage
	 * @param message
	 */
	protected void processNewMessage(NewPHTMessage message) {
		NodeState newNode = (NodeState) PHTNodes.get(message.getAdditionalData());
		if (newNode == null) {
			newNode = new NodeState(LEAF_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
		} else {
			newNode.setState(LEAF_NODE);
		}
		PHTNodes.put(message.getAdditionalData(), newNode);
	}
	
	/**
	 * Method processDataMessage
	 * Called by processMessage when it receives a message of type PHTDataTransferMessage
	 * 
	 * @param message
	 */
	protected void processDataMessage(PHTDataTransferMessage message) {
		
		NodeState theNode = (NodeState) PHTNodes.get(message.getKeySentTo());
		if (theNode == null) {
			theNode = new NodeState(NOT_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
			PHTNodes.put(message.getKeySentTo(), theNode);
		}
		
		if (((PHTDataTransferMessage) message).transferDone()) {
			sendAck(message);
			theNode.setState(LEAF_NODE);
		} else {
			DEBUG("Storing <" + message.getKey() + "." +
					((PHTMessageData) message.getAdditionalData()).getIID() + "," + 
					message.getValue() + "> at node " + KeyConverter.printBitID((BitID)message.getKeySentTo()) + 
					"(" + KeyConverter.printBitID((BitID)selfID) + ")");
			theNode.storeLocally(message.getKey(), message.getValue(), 
					((PHTMessageData) message.getAdditionalData()).getIID(), message.getLifetime());
			this.numItems++;
			sendAck(message);
		}
	}
	
	/**
	 * Method processAck
	 * Called by processMessage when it receives a message of type PHTMessageAck
	 * 
	 * @param message
	 */
	protected void processAck(PHTMessageAck message) {
		
		BitID theKey = (BitID) message.getAdditionalData();
		NodeState theNode = (NodeState) PHTNodes.get(theKey);
		
		if (((PHTMessageAck) message).transferDone()) {
			
			theNode.anotherNewLeafReady();
			if (theNode.getNewLeavesReady() == 2) {
				DEBUG("Node " + KeyConverter.printBitID(selfID) + " successfully split!");
				if (theNode.getNumLocalItems() == 0) {
					DEBUG("Setting node " + KeyConverter.printBitID(theKey) + " to INTERIOR");
					theNode.setState(INTERIOR_NODE); 
				} else {
					DEBUG("Can't set node " + KeyConverter.printBitID(theKey) + " to INTERIOR! (" + 
							theNode.getNumLocalItems() + " items here)");
				}
				theNode.resetNewLeavesReady();
				
				SplitFurtherMessage splitMessage = SplitFurtherMessage.allocate(this.name,
						this.type, this.selfSocketAddress, this.counter.nextValue());
				pierBackend.executorSend(theNode.getLeftLeafNode(), this.indexManager.getIndexNS(), "PHTsplit", 0,
						splitMessage, 0, true, false);
				pierBackend.executorSend(theNode.getRightLeafNode(), this.indexManager.getIndexNS(), "PHTsplit", 0,
						splitMessage, 0, true, false);
			}
		} else {
			theNode.processAck(message.getAckedMessageId());
			if (theNode.allAcksReceived()) {
				
				PHTDataTransferMessage transferDoneLeft = PHTDataTransferMessage.allocate(this.name,
						this.type, 0, this.selfSocketAddress, this.counter.nextValue(), new BitID(),
						new BitID(), new BitID(), theKey, theNode.getLeftLeafNode(), (short)1);
				PHTDataTransferMessage transferDoneRight = PHTDataTransferMessage.allocate(this.name,
						this.type, 0, this.selfSocketAddress, this.counter.nextValue(), new BitID(),
						new BitID(), new BitID(), theKey, theNode.getRightLeafNode(), (short)1);
				
				pierBackend.executorSend(theNode.getLeftLeafNode(), this.indexManager.getIndexNS(), "PHTdata", 0,
						transferDoneLeft, 0, true, false);
				pierBackend.executorSend(theNode.getRightLeafNode(), this.indexManager.getIndexNS(), "PHTdata", 0,
						transferDoneRight, 0, true, false);
			}
		}
	}
	
	/**
	 * Method processSendMessage
	 * Called by processMessage when it receives a message of type IndexSendMessage
	 * 
	 * @param message message of IndexSendMessage type from another node
	 * @param ns same as the Pier Index namespace
	 * @param rid value specified by sending node, opaque to the PierIndexManager
	 */
	protected void processSendMessage(IndexSendMessage message, String ns, String rid) {
		
		PHTMessageData messageData = (PHTMessageData) message.getAdditionalData();
		byte[] keyInBinary = messageData.getBinaryKey().bigIntegerValue().toByteArray();
		byte[] newKey = new byte[messageData.getPlaceInKey()+1];
		System.arraycopy(keyInBinary, 0, newKey, 0, newKey.length);
		BitID theNodeKey = new BitID(new BigInteger(newKey));
		NodeState theNode = (NodeState) PHTNodes.get(theNodeKey);
		

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -