📄 prefixhashtree.java
字号:
/*
* @(#)$Id: PrefixHashTree.java,v 1.4 2005/12/30 23:22:22 burkhart Exp $
*
* Copyright (c) 2001-2004 Regents of the University of California.
* All rights reserved.
*
* This file is distributed under the terms in the attached BERKELEY-LICENSE
* file. If you do not find these files, copies can be found by writing to:
* Computer Science Division, Database Group, Universite of California,
* 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License
*
* Copyright (c) 2003-2004 Intel Corporation. All rights reserved.
*
* This file is distributed under the terms in the attached INTEL-LICENSE file.
* If you do not find these files, copies can be found by writing to:
* Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300,
* Berkeley, CA, 94704. Attention: Intel License Inquiry.
*/
package pier.indexes.prefixhashtree;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.TreeMap;
import org.apache.log4j.Logger;
import pier.components.PierBackend;
import pier.components.PierIndexManager;
import pier.indexes.Index;
import pier.indexes.IndexClient;
import pier.messages.IndexInsertMessage;
import pier.messages.IndexMessage;
import pier.messages.IndexSendMessage;
import services.Output;
import services.network.Payload;
import util.BitID;
import util.logging.LogMessage;
import util.logging.StructuredLogMessage;
import util.network.serialization.SerializationManager;
import util.timer.TimeBasedCounter;
/**
* Class PrefixHashTree
*
*/
public class PrefixHashTree implements Index {
private static Logger logger = Logger.getLogger(PrefixHashTree.class);
protected static final int NOT_NODE = 0;
protected static final int INTERIOR_NODE = 1;
protected static final int LEAF_NODE = 2;
protected static final int SPLITTING_LEAF_NODE = 3;
protected String name;
protected String type;
protected PierBackend pierBackend;
protected PierIndexManager indexManager;
protected TimeBasedCounter counter;
protected BitID rootID;
protected BitID selfID;
protected InetSocketAddress selfSocketAddress;
protected int numBits;
protected int nodeCapacity;
protected int numItems;
protected HashMap PHTNodes;
// True to output PHT debugging, regardless of system settings
private static final boolean PHT_OUTPUT_ONLY = false;
/**
* Method DEBUG
* Prints if debugging is turned on.
*
* @param s
*/
public static void DEBUG(String s) {
if (PHT_OUTPUT_ONLY == true || Output.debuggingEnabled) { PRINT(s); }
}
/**
* Method PRINT
* Makes it easier to write statements to be output to the log.
* Also makes it easier to change the output method (only in one place).
*
* @param s
*/
public static void PRINT(String s) {
logger.debug(new LogMessage(new Object[]{"[PHT] ", s}));
}
/**
* Method init
*
* @param name
* @param type
* @param pierBackend
* @param indexManager
* @param parameters
*/
public void init(String name, String type, PierBackend pierBackend,
PierIndexManager indexManager, Payload[] parameters) {
this.name = name;
this.type = type;
this.pierBackend = pierBackend;
this.indexManager = indexManager;
this.counter = new TimeBasedCounter();
this.numBits = Integer.parseInt(
pierBackend.executorGetSetting(
"idBits"));
this.rootID = new BitID(name, 0, this.numBits);
this.selfID = pierBackend.getProvider().getNodeID();
this.selfSocketAddress =
pierBackend.getProvider().getNodeSocketAddress();
this.nodeCapacity = Integer.parseInt(
pierBackend.executorGetSetting(
"pierPHTNodeCapacity"));
this.numItems = 0;
this.PHTNodes = new HashMap();
DEBUG("MADE NODE " + KeyConverter.printBitID(selfID));
}
/**
* Method processPut
* Should never be called on a PHT
*
* @param message
* @param ns
* @param rid
*/
public void processPut(IndexMessage message, String ns, String rid) {
throw new PHTException("The method processPut should never be called on a PHT!");
}
/**
* Method processMessage
* Called by the PierIndexManager to deliver a network message that arrived
* destined for this index. Any messages sent to the Pier Index namespace
* of type IndexMessage (or derivate) are delivered by this method
*
* @param message message of IndexMessage (or derivate) type from another
* node
* @param ns same as the Pier Index namespace
* @param rid value specified by sending node, opaque to the
* PierIndexManager
*/
public boolean processMessage(IndexMessage message, String ns, String rid, boolean midway, boolean local) {
if (midway) { return false; }
if (message instanceof NewPHTMessage) {
processNewMessage((NewPHTMessage) message);
} else if (message instanceof PHTDataTransferMessage) {
processDataMessage((PHTDataTransferMessage) message);
} else if (message instanceof PHTMessageAck) {
processAck((PHTMessageAck) message);
} else if (message instanceof IndexSendMessage) {
processSendMessage(((IndexSendMessage) message), ns, rid);
} else if (message instanceof IndexInsertMessage) {
processInsertMessage(((IndexInsertMessage) message), ns, rid);
} else if (message instanceof PHTPrintTreeMessage) {
processPrintMessage((PHTPrintTreeMessage) message);
} else if (message instanceof SplitFurtherMessage) {
processSplitMessage((SplitFurtherMessage) message);
} else {
throw new PHTException("Received unexpected message!");
}
return true;
}
/**
* Method processNewMessage
* @param message
*/
protected void processNewMessage(NewPHTMessage message) {
NodeState newNode = (NodeState) PHTNodes.get(message.getAdditionalData());
if (newNode == null) {
newNode = new NodeState(LEAF_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
} else {
newNode.setState(LEAF_NODE);
}
PHTNodes.put(message.getAdditionalData(), newNode);
}
/**
* Method processDataMessage
* Called by processMessage when it receives a message of type PHTDataTransferMessage
*
* @param message
*/
protected void processDataMessage(PHTDataTransferMessage message) {
NodeState theNode = (NodeState) PHTNodes.get(message.getKeySentTo());
if (theNode == null) {
theNode = new NodeState(NOT_NODE, new TreeMap(), new HashMap(), 0, 0, null, null);
PHTNodes.put(message.getKeySentTo(), theNode);
}
if (((PHTDataTransferMessage) message).transferDone()) {
sendAck(message);
theNode.setState(LEAF_NODE);
} else {
DEBUG("Storing <" + message.getKey() + "." +
((PHTMessageData) message.getAdditionalData()).getIID() + "," +
message.getValue() + "> at node " + KeyConverter.printBitID((BitID)message.getKeySentTo()) +
"(" + KeyConverter.printBitID((BitID)selfID) + ")");
theNode.storeLocally(message.getKey(), message.getValue(),
((PHTMessageData) message.getAdditionalData()).getIID(), message.getLifetime());
this.numItems++;
sendAck(message);
}
}
/**
* Method processAck
* Called by processMessage when it receives a message of type PHTMessageAck
*
* @param message
*/
protected void processAck(PHTMessageAck message) {
BitID theKey = (BitID) message.getAdditionalData();
NodeState theNode = (NodeState) PHTNodes.get(theKey);
if (((PHTMessageAck) message).transferDone()) {
theNode.anotherNewLeafReady();
if (theNode.getNewLeavesReady() == 2) {
DEBUG("Node " + KeyConverter.printBitID(selfID) + " successfully split!");
if (theNode.getNumLocalItems() == 0) {
DEBUG("Setting node " + KeyConverter.printBitID(theKey) + " to INTERIOR");
theNode.setState(INTERIOR_NODE);
} else {
DEBUG("Can't set node " + KeyConverter.printBitID(theKey) + " to INTERIOR! (" +
theNode.getNumLocalItems() + " items here)");
}
theNode.resetNewLeavesReady();
SplitFurtherMessage splitMessage = SplitFurtherMessage.allocate(this.name,
this.type, this.selfSocketAddress, this.counter.nextValue());
pierBackend.executorSend(theNode.getLeftLeafNode(), this.indexManager.getIndexNS(), "PHTsplit", 0,
splitMessage, 0, true, false);
pierBackend.executorSend(theNode.getRightLeafNode(), this.indexManager.getIndexNS(), "PHTsplit", 0,
splitMessage, 0, true, false);
}
} else {
theNode.processAck(message.getAckedMessageId());
if (theNode.allAcksReceived()) {
PHTDataTransferMessage transferDoneLeft = PHTDataTransferMessage.allocate(this.name,
this.type, 0, this.selfSocketAddress, this.counter.nextValue(), new BitID(),
new BitID(), new BitID(), theKey, theNode.getLeftLeafNode(), (short)1);
PHTDataTransferMessage transferDoneRight = PHTDataTransferMessage.allocate(this.name,
this.type, 0, this.selfSocketAddress, this.counter.nextValue(), new BitID(),
new BitID(), new BitID(), theKey, theNode.getRightLeafNode(), (short)1);
pierBackend.executorSend(theNode.getLeftLeafNode(), this.indexManager.getIndexNS(), "PHTdata", 0,
transferDoneLeft, 0, true, false);
pierBackend.executorSend(theNode.getRightLeafNode(), this.indexManager.getIndexNS(), "PHTdata", 0,
transferDoneRight, 0, true, false);
}
}
}
/**
* Method processSendMessage
* Called by processMessage when it receives a message of type IndexSendMessage
*
* @param message message of IndexSendMessage type from another node
* @param ns same as the Pier Index namespace
* @param rid value specified by sending node, opaque to the PierIndexManager
*/
protected void processSendMessage(IndexSendMessage message, String ns, String rid) {
PHTMessageData messageData = (PHTMessageData) message.getAdditionalData();
byte[] keyInBinary = messageData.getBinaryKey().bigIntegerValue().toByteArray();
byte[] newKey = new byte[messageData.getPlaceInKey()+1];
System.arraycopy(keyInBinary, 0, newKey, 0, newKey.length);
BitID theNodeKey = new BitID(new BigInteger(newKey));
NodeState theNode = (NodeState) PHTNodes.get(theNodeKey);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -