📄 nodeimpl.java
字号:
/***************************************************************************
* *
* NodeImpl.java *
* ------------------- *
* date : 16.08.2004 *
* copyright : (C) 2004/2005 Distributed and *
* Mobile Systems Group *
* Lehrstuhl fuer Praktische Informatik *
* Universitaet Bamberg *
* http://www.lspi.wiai.uni-bamberg.de/ *
* email : sven.kaffille@wiai.uni-bamberg.de *
* karsten.loesing@wiai.uni-bamberg.de *
* *
* *
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* A copy of the license can be found in the license.txt file supplied *
* with this software or at: http://www.gnu.org/copyleft/gpl.html *
* *
***************************************************************************/
package de.uniba.wiai.lspi.chord.service.impl;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import de.uniba.wiai.lspi.chord.com.CommunicationException;
import de.uniba.wiai.lspi.chord.com.Endpoint;
import de.uniba.wiai.lspi.chord.com.Entry;
import de.uniba.wiai.lspi.chord.com.Node;
import de.uniba.wiai.lspi.chord.com.RefsAndEntries;
import de.uniba.wiai.lspi.chord.data.ID;
import de.uniba.wiai.lspi.chord.data.URL;
import de.uniba.wiai.lspi.util.logging.Logger;
/**
* Implements all operations which can be invoked remotely by other nodes.
*
* @author Karsten Loesing
* @version 1.0.1
*/
public final class NodeImpl extends Node {
/**
* Endpoint for incoming communication.
*/
private Endpoint myEndpoint = null;
/**
* Reference on local node.
*/
private ChordImpl impl;
/**
* Object logger.
*/
private Logger logger;
/**
* Routing table (including finger table, successor list, and predecessor
* reference)
*/
private References references;
/**
* Repository for locally stored entries.
*/
private Entries entries;
/**
* Executor that executes insertion and removal of entries on successors of
* this node.
*/
private Executor asyncExecutor;
/**
* Creates that part of the local node which answers remote requests by
* other nodes. Sole constructor, is invoked by ChordImpl only.
*
* @param impl
* Reference on ChordImpl instance which created this object.
* @param nodeID
* This node's Chord ID.
* @param nodeURL
* URL, on which this node accepts connections.
* @param references
* Routing table of this node.
* @param entries
* Repository for entries of this node.
* @throws NullPointerException
* If any of the parameter has value <code>null</code>.
*/
NodeImpl(ChordImpl impl, ID nodeID, URL nodeURL, References references,
Entries entries) {
if (impl == null || nodeID == null || nodeURL == null
|| references == null || entries == null) {
throw new NullPointerException(
"Neither parameter of the constructor may have a null value!");
}
this.logger = Logger.getLogger(NodeImpl.class + "." + nodeID);
this.impl = impl;
this.asyncExecutor = impl.getAsyncExecutor();
this.nodeID = nodeID;
this.nodeURL = nodeURL;
this.references = references;
this.entries = entries;
// create endpoint for incoming connections
this.myEndpoint = Endpoint.createEndpoint(this, nodeURL);
this.myEndpoint.listen();
}
/**
* Makes this endpoint accept entries by other nodes. Is invoked by
* ChordImpl only.
*/
final void acceptEntries() {
this.myEndpoint.acceptEntries();
}
public final void disconnect() {
this.myEndpoint.disconnect();
}
public final ID getNodeID() {
return this.nodeID;
}
public final Node findSuccessor(ID key) {
return this.impl.findSuccessor(key);
}
public final List<Node> notify(Node potentialPredecessor) {
// the result will contain the list of successors as well as the
// predecessor of this node
List<Node> result = new LinkedList<Node>();
// /*synchronized*/ (this.references) {
// add potential predecessor to successor list and finger table and set
// it as predecessor if no better predecessor is available
this.references.addReferenceAsPredecessor(potentialPredecessor);
// add reference on predecessor as well as on successors to result
result.add(this.references.getPredecessor());
result.addAll(this.references.getSuccessors());
// }
return result;
}
public final RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
throws CommunicationException {
// copy all entries which lie between the local node ID and the ID of
// the potential predecessor, including those equal to potential
// predecessor
Set<Entry> copiedEntries = this.entries.getEntriesInInterval(
this.nodeID, potentialPredecessor.getNodeID());
return new RefsAndEntries(this.notify(potentialPredecessor),
copiedEntries);
}
public final void ping() {
// do nothing---returning of method is proof of live
return;
}
public final void insertEntry(Entry toInsert) throws CommunicationException {
this.logger.debug("Inserting entry with id " + toInsert.getId()
+ " at node " + this.nodeID);
// Possible, but rare situation: a new node has joined which now is
// responsible for the id!
if ((this.references.getPredecessor() != null)
&& !toInsert.getId().isInInterval(
this.references.getPredecessor().nodeID, this.nodeID)) {
this.references.getPredecessor().insertEntry(toInsert);
return;
}
// add entry to local repository
this.entries.add(toInsert);
// create set containing this entry for insertion of replicates at all
// nodes in successor list
Set<Entry> newEntries = new HashSet<Entry>();
newEntries.add(toInsert);
// invoke insertReplicates method on all nodes in successor list
final Set<Entry> mustBeFinal = new HashSet<Entry>(newEntries);
for (final Node successor : this.references.getSuccessors()) {
this.asyncExecutor.execute(new Runnable() {
public void run() {
try {
successor.insertReplicas(mustBeFinal);
} catch (CommunicationException e) {
// do nothing
}
}
});
}
}
public final void insertReplicas(Set<Entry> replicatesToInsert) {
this.entries.addAll(replicatesToInsert);
}
public final void removeEntry(Entry entryToRemove)
throws CommunicationException {
this.logger.debug("Removing entry with id " + entryToRemove.getId()
+ " at node " + this.nodeID);
// Possible, but rare situation: a new node has joined which now is
// responsible for the id!
if (this.references.getPredecessor() != null
&& !entryToRemove.getId().isInInterval(
this.references.getPredecessor().nodeID, this.nodeID)) {
this.references.getPredecessor().removeEntry(entryToRemove);
return;
}
// remove entry from repository
this.entries.remove(entryToRemove);
// create set containing this entry for removal of replicates at all
// nodes in successor list
final Set<Entry> entriesToRemove = new HashSet<Entry>();
entriesToRemove.add(entryToRemove);
// invoke removeReplicates method on all nodes in successor list
List<Node> successors = this.references.getSuccessors();
final ID id = this.nodeID;
for (final Node successor : successors) {
this.asyncExecutor.execute(new Runnable() {
public void run() {
try {
// remove only replica of removed entry
successor.removeReplicas(id, entriesToRemove);
} catch (CommunicationException e) {
// do nothing for the moment
}
}
});
}
}
public final void removeReplicas(ID sendingNodeID,
Set<Entry> replicasToRemove) {
if (replicasToRemove.size() == 0) {
// remove all replicas in interval
Set<Entry> allReplicasToRemove = this.entries.getEntriesInInterval(
this.nodeID, sendingNodeID);
this.entries.removeAll(allReplicasToRemove);
} else {
// remove only replicas of given entry
this.entries.removeAll(replicasToRemove);
}
}
public final synchronized Set<Entry> retrieveEntries(ID id)
throws CommunicationException {
// Possible, but rare situation: a new node has joined which now is
// responsible for the id!
if (this.references.getPredecessor() != null
&& !id.isInInterval(this.references.getPredecessor().nodeID,
this.nodeID)) {
this.logger.fatal("The rare situation has occured at time "
+ System.currentTimeMillis() + ", id to look up=" + id
+ ", id of local node=" + this.nodeID
+ ", id of predecessor="
+ this.references.getPredecessor().nodeID);
return this.references.getPredecessor().retrieveEntries(id);
}
// return entries from local repository
return this.entries.getEntries(id);
}
final public void leavesNetwork(Node predecessor) {
this.logger.info("Leaves network invoked; " + this.nodeID + ". Updating references.");
this.logger.info("New predecessor " + predecessor.nodeID);
this.logger.debug("References before update: " + this.references.toString());
this.references.removeReference(this.references.getPredecessor());
this.references.addReference(predecessor);
this.references.setPredecessor(predecessor);
this.logger.debug("References after update: " + this.references.toString());
}
/**
* Returns the local ChordImpl instance. Actually, this is only used by the
* test implementation with multiple Chord nodes in one JVM. Might be
* discarded in the future.
*
* @return A reference on the local ChordImpl instance.
* @deprecated Tasks on behalf of remote nodes should be conducted by this
* class itself and not by delegating them to the ChordImpl
* instance!
*/
public final ChordImpl getChordImpl() {
return this.impl;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -