📄 chordimpl.java
字号:
// create NodeImpl instance for communication
this.localNode = new NodeImpl(this, this.getID(), this.localURL,
this.references, this.entries);
// create proxy for outgoing connection to bootstrap node
Node bootstrapNode;
try {
bootstrapNode = Proxy.createConnection(this.localURL, bootstrapURL);
} catch (CommunicationException e) {
throw new ServiceException(
"An error occured when creating a proxy for outgoing "
+ "connection to bootstrap node! Join operation"
+ "failed!", e);
}
// only an optimization: store reference on bootstrap node
this.references.addReference(bootstrapNode);
// Asking for my successor at node bootstrapNode.nodeID
// find my successor
Node mySuccessor;
try {
mySuccessor = bootstrapNode.findSuccessor(this.getID());
} catch (CommunicationException e1) {
throw new ServiceException("An error occured when trying to find "
+ "the successor of this node using bootstrap node "
+ "with url " + bootstrapURL.toString() + "! Join "
+ "operation failed!", e1);
}
// store reference on my successor
this.logger.info(this.localURL + " has successor " + mySuccessor.nodeURL);
this.references.addReference(mySuccessor);
// notify successor for the first time and copy keys from successor
if (mySuccessor != null) {
RefsAndEntries copyOfRefsAndEntries;
try {
copyOfRefsAndEntries = mySuccessor
.notifyAndCopyEntries(this.localNode);
} catch (CommunicationException e2) {
throw new ServiceException("An error occured when contacting "
+ "the successor of this node in order to "
+ "obtain its references and entries! Join "
+ "operation failed!", e2);
}
// add new references, if pings are successful
Set<Node> pingRequestSent = new HashSet<Node>();
for (final Node newReference : copyOfRefsAndEntries.getRefs()) {
if (newReference != null
&& !newReference.equals(this.localNode)
&& !this.references.containsReference(newReference)) {
if (!pingRequestSent.contains(newReference)) {
this.logger
.debug("Sending ping request to new reference "
+ newReference.nodeID.toHexString());
this.asyncExecutor.execute(new Runnable() {
public void run() {
try {
newReference.ping();
ChordImpl.this.references
.addReference(newReference);
ChordImpl.this.logger
.info("Added reference on "
+ newReference.nodeID
+ " which responded to "
+ "ping request");
} catch (CommunicationException e) {
ChordImpl.this.logger.info("New reference "
+ newReference + " unreachable");
}
}
});
pingRequestSent.add(newReference);
}
}
}
// add copied entries of successor
this.entries.addAll(copyOfRefsAndEntries.getEntries());
}
// accept content requests from outside
this.localNode.acceptEntries();
// create tasks for fixing finger table, checking predecessor and
// stabilizing
this.createTasks();
}
public final void leave() {
if (this.localNode == null) {
// ring has not been created or joined, st. leave has no effect
return;
}
this.maintenanceTasks.shutdownNow();
// this.executor.shutdown();
// inform successor about leaving
try {
Node successor = this.references.getSuccessor();
if (successor != null && this.references.getPredecessor() != null) {
successor.leavesNetwork(this.references.getPredecessor());
}
} catch (CommunicationException e) {
/*
* throw new ServiceException( "An unknown error occured when trying
* to contact the " + "successor of this node to inform it about " +
* "leaving! Leave operation failed!");
*/
}
this.localNode.disconnect();
this.asyncExecutor.shutdownNow();
this.localNode = null;
}
public final void insert(Key key, Serializable s) {
// check parameters
if (key == null || s == null) {
throw new NullPointerException(
"Neither parameter may have value null!");
}
// determine ID for key
ID id = this.hashFunction.getHashKey(key);
Entry entryToInsert = new Entry(id, s);
this.logger.debug("Inserting new entry with id " + id);
boolean inserted = false;
while (!inserted) {
// find successor of id
Node responsibleNode;
// try {
responsibleNode = this.findSuccessor(id);
// } catch (CommunicationException e) {
// this.logger.warn(
// "An error occured while finding the successor of the id "
// + "to insert! Insert operation failed!", e);
// continue;
// }
this.logger.debug("Invoking insertEntry method on node "
+ responsibleNode.nodeID);
// invoke insertEntry method
try {
responsibleNode.insertEntry(entryToInsert);
inserted = true;
} catch (CommunicationException e1) {
this.logger.warn(
"An error occured while invoking the insertEntry method "
+ " on the appropriate node! Insert operation "
+ "failed!", e1);
continue;
}
}
this.logger.info("New entry was inserted!");
}
public final Set<Serializable> retrieve(Key key) {
// check parameters
if (key == null) {
NullPointerException e = new NullPointerException(
"Key must not have value null!");
this.logger.error("Null pointer", e);
throw e;
}
// determine ID for key
ID id = this.hashFunction.getHashKey(key);
this.logger.debug("Retrieving entries with id " + id);
Set<Entry> result = null;
boolean retrieved = false;
while (!retrieved) {
// find successor of id
Node responsibleNode = null;
// try {
responsibleNode = findSuccessor(id);
// } catch (CommunicationException e) {
// this.logger.warn(
// "An error occured while finding the successor of the id "
// + "to retrieve! Retrieve operation failed!", e);
// continue;
// }
// logger.fatal("retrieve: Successor of key " + key + ": " +
// responsibleNode.nodeID);
// invoke retrieveEntry method
try {
result = responsibleNode.retrieveEntries(id);
// cause while loop to end.
retrieved = true;
} catch (CommunicationException e1) {
this.logger
.warn(
"An error occured while invoking the retrieveEntry method "
+ " on the appropriate node! Retrieve operation "
+ "failed!", e1);
continue;
}
}
Set<Serializable> values = new HashSet<Serializable>();
for (Entry entry : result) {
values.add(entry.getValue());
}
this.logger.info("Entries were retrieved!");
return values;
}
public final void remove(Key key, Serializable s) {
// check parameters
if (key == null || s == null) {
throw new NullPointerException(
"Neither parameter may have value null!");
}
// determine ID for key
ID id = this.hashFunction.getHashKey(key);
Entry entryToRemove = new Entry(id, s);
boolean removed = false;
while (!removed) {
this.logger.debug("Removing entry with id " + id + " and value "
+ s);
// find successor of id
Node responsibleNode;
// try {
responsibleNode = findSuccessor(id);
// } catch (CommunicationException e) {
// this.logger.warn(
// "An error occured while finding the successor of the id "
// + "to insert! Remove operation failed!", e);
// continue;
// }
this.logger.debug("Invoking removeEntry method on node "
+ responsibleNode.nodeID);
// invoke removeEntry method
try {
responsibleNode.removeEntry(entryToRemove);
removed = true;
} catch (CommunicationException e1) {
this.logger.warn(
"An error occured while invoking the removeEntry method "
+ " on the appropriate node! Remove operation "
+ "failed!", e1);
continue;
}
}
this.logger.info("Entry was removed!");
}
/**
* Returns a human-readable string representation containing this node's
* node ID and URL.
*
* @see java.lang.Object#toString()
*/
@Override
public final String toString() {
return "Chord node: id = "
+ (this.localID == null ? "null" : this.localID.toString())
+ ", url = "
+ (this.localURL == null ? "null" : this.localURL.toString()
+ "\n");
}
/**
* Returns the Chord node which is responsible for the given key.
*
* @param key
* Key for which the successor is searched for.
* @throws NullPointerException
* If given ID is <code>null</code>.
* @return Responsible node.
*/
final Node findSuccessor(ID key) {
if (key == null) {
NullPointerException e = new NullPointerException(
"ID to find successor for may not be null!");
this.logger.error("Null pointer.", e);
throw e;
}
// check if the local node is the only node in the network
Node successor = this.references.getSuccessor();
if (successor == null) {
this.logger
.info("I appear to be the only node in the network, so I am "
+ "my own "
+ "successor; return reference on me: "
+ this.getID());
return this.localNode;
}
// check if the key to look up lies between this node and its successor
else if (key.isInInterval(this.getID(), successor.nodeID)
|| key.equals(successor.nodeID)) {
this.logger.info("The requested key lies between my own and my "
+ "successor's node id; therefore return my successor.");
// try to reach successor
try {
successor.ping(); // if methods returns, successor is alive
this.logger.info("Returning my successor " + successor.nodeID
+ " of type " + successor.getClass());
return successor;
} catch (CommunicationException e) {
// not successful, delete node from successor list and finger
// table, and set new successor, if available
this.logger
.warn("Successor did not respond! Removing it from all "
+ "lists and retrying...");
this.references.removeReference(successor);
return findSuccessor(key);
}
}
// ask closest preceding node found in local references for closest
// preceding node concerning the key to look up
else {
Node closestPrecedingNode = this.references
.getClosestPrecedingNode(key);
try {
this.logger
.info("Asking closest preceding node known to this node for closest preceding node "
+ closestPrecedingNode.nodeID
+ " concerning key " + key + " to look up");
return closestPrecedingNode.findSuccessor(key);
} catch (CommunicationException e) {
this.logger
.warn("Communication failure while requesting successor "
+ "for key "
+ key
+ " from node "
+ closestPrecedingNode.toString()
+ " - looking up successor for failed node "
+ closestPrecedingNode.toString());
this.references.removeReference(closestPrecedingNode);
return findSuccessor(key);
}
}
}
/* Implementation of Report interface */
public final String printEntries() {
return this.entries.toString();
}
public final String printFingerTable() {
return this.references.printFingerTable();
}
public final String printSuccessorList() {
return this.references.printSuccessorList();
}
public final String printReferences() {
return this.references.toString();
}
public final String printPredecessor() {
Node pre = this.references.getPredecessor();
if (pre == null) {
return "Predecessor: null";
} else {
return "Predecessor: " + pre.toString();
}
}
public void retrieve(final Key key, final ChordCallback callback) {
final Chord chord = this;
this.asyncExecutor.execute(new Runnable() {
public void run() {
Throwable t = null;
Set<Serializable> result = null;
try {
result = chord.retrieve(key);
} catch (ServiceException e) {
t = e;
} catch (Throwable th) {
t = th;
}
callback.retrieved(key, result, t);
}
});
}
public void insert(final Key key, final Serializable entry,
final ChordCallback callback) {
final Chord chord = this;
this.asyncExecutor.execute(new Runnable() {
public void run() {
Throwable t = null;
try {
chord.insert(key, entry);
} catch (ServiceException e) {
t = e;
} catch (Throwable th) {
t = th;
}
callback.inserted(key, entry, t);
}
});
}
public void remove(final Key key, final Serializable entry,
final ChordCallback callback) {
final Chord chord = this;
this.asyncExecutor.execute(new Runnable() {
public void run() {
Throwable t = null;
try {
chord.remove(key, entry);
} catch (ServiceException e) {
t = e;
} catch (Throwable th) {
t = th;
}
callback.removed(key, entry, t);
}
});
}
public ChordRetrievalFuture retrieveAsync(Key key) {
return ChordRetrievalFutureImpl.create(this.asyncExecutor, this, key);
}
public ChordFuture insertAsync(Key key, Serializable entry) {
return ChordInsertFuture.create(this.asyncExecutor, this, key, entry);
}
public ChordFuture removeAsync(Key key, Serializable entry) {
return ChordRemoveFuture.create(this.asyncExecutor, this, key, entry);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -