⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nodeimpl.java

📁 一个对等计算发生器
💻 JAVA
字号:
/***************************************************************************
 *                                                                         *
 *                               NodeImpl.java                             *
 *                            -------------------                          *
 *   date                 : 16.08.2004                                     *
 *   copyright            : (C) 2004/2005 Distributed and                  *
 *								Mobile Systems Group                       *
 *                              Lehrstuhl fuer Praktische Informatik       *
 *                              Universitaet Bamberg                       *
 *                              http://www.lspi.wiai.uni-bamberg.de/       *
 *   email                : sven.kaffille@wiai.uni-bamberg.de              *
 *   						karsten.loesing@wiai.uni-bamberg.de            *
 *                                                                         *
 *                                                                         *
 ***************************************************************************/

/***************************************************************************
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 *   A copy of the license can be found in the license.txt file supplied   *
 *   with this software or at: http://www.gnu.org/copyleft/gpl.html        *
 *                                                                         *
 ***************************************************************************/
package de.uniba.wiai.lspi.chord.service.impl;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;

import de.uniba.wiai.lspi.chord.com.CommunicationException;
import de.uniba.wiai.lspi.chord.com.Endpoint;
import de.uniba.wiai.lspi.chord.com.Entry;
import de.uniba.wiai.lspi.chord.com.Node;
import de.uniba.wiai.lspi.chord.com.RefsAndEntries;
import de.uniba.wiai.lspi.chord.data.ID;
import de.uniba.wiai.lspi.chord.data.URL;
import de.uniba.wiai.lspi.util.logging.Logger;

/**
 * Implements all operations which can be invoked remotely by other nodes.
 * 
 * @author Karsten Loesing
 * @version 1.0.1
 */
public final class NodeImpl extends Node {

	/**
	 * Endpoint for incoming communication.
	 */
	private Endpoint myEndpoint = null;

	/**
	 * Reference on local node.
	 */
	private ChordImpl impl;

	/**
	 * Object logger.
	 */
	private Logger logger;

	/**
	 * Routing table (including finger table, successor list, and predecessor
	 * reference)
	 */
	private References references;

	/**
	 * Repository for locally stored entries.
	 */
	private Entries entries;

	/**
	 * Executor that executes insertion and removal of entries on successors of
	 * this node.
	 */
	private Executor asyncExecutor;

	/**
	 * Creates that part of the local node which answers remote requests by
	 * other nodes. Sole constructor, is invoked by ChordImpl only.
	 * 
	 * @param impl
	 *            Reference on ChordImpl instance which created this object.
	 * @param nodeID
	 *            This node's Chord ID.
	 * @param nodeURL
	 *            URL, on which this node accepts connections.
	 * @param references
	 *            Routing table of this node.
	 * @param entries
	 *            Repository for entries of this node.
	 * @throws NullPointerException
	 *             If any of the parameter has value <code>null</code>.
	 */
	NodeImpl(ChordImpl impl, ID nodeID, URL nodeURL, References references,
			Entries entries) {

		if (impl == null || nodeID == null || nodeURL == null
				|| references == null || entries == null) {
			throw new NullPointerException(
					"Neither parameter of the constructor may have a null value!");
		}

		this.logger = Logger.getLogger(NodeImpl.class + "." + nodeID);

		this.impl = impl;
		this.asyncExecutor = impl.getAsyncExecutor();
		this.nodeID = nodeID;
		this.nodeURL = nodeURL;
		this.references = references;
		this.entries = entries;

		// create endpoint for incoming connections
		this.myEndpoint = Endpoint.createEndpoint(this, nodeURL);
		this.myEndpoint.listen();
	}

	/**
	 * Makes this endpoint accept entries by other nodes. Is invoked by
	 * ChordImpl only.
	 */
	final void acceptEntries() {
		this.myEndpoint.acceptEntries();
	}

	public final void disconnect() {
		this.myEndpoint.disconnect();
	}

	public final ID getNodeID() {
		return this.nodeID;
	}

	public final Node findSuccessor(ID key) {
		return this.impl.findSuccessor(key);
	}

	public final List<Node> notify(Node potentialPredecessor) {

		// the result will contain the list of successors as well as the
		// predecessor of this node
		List<Node> result = new LinkedList<Node>();

		// /*synchronized*/ (this.references) {

		// add potential predecessor to successor list and finger table and set
		// it as predecessor if no better predecessor is available
		this.references.addReferenceAsPredecessor(potentialPredecessor);

		// add reference on predecessor as well as on successors to result
		result.add(this.references.getPredecessor());
		result.addAll(this.references.getSuccessors());
		// }

		return result;
	}

	public final RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
			throws CommunicationException {
		// copy all entries which lie between the local node ID and the ID of
		// the potential predecessor, including those equal to potential
		// predecessor
		Set<Entry> copiedEntries = this.entries.getEntriesInInterval(
				this.nodeID, potentialPredecessor.getNodeID());

		return new RefsAndEntries(this.notify(potentialPredecessor),
				copiedEntries);
	}

	public final void ping() {
		// do nothing---returning of method is proof of live
		return;
	}

	public final void insertEntry(Entry toInsert) throws CommunicationException {

		this.logger.debug("Inserting entry with id " + toInsert.getId()
				+ " at node " + this.nodeID);

		// Possible, but rare situation: a new node has joined which now is
		// responsible for the id!
		if ((this.references.getPredecessor() != null)
				&& !toInsert.getId().isInInterval(
						this.references.getPredecessor().nodeID, this.nodeID)) {
			this.references.getPredecessor().insertEntry(toInsert);
			return;
		}

		// add entry to local repository
		this.entries.add(toInsert);

		// create set containing this entry for insertion of replicates at all
		// nodes in successor list
		Set<Entry> newEntries = new HashSet<Entry>();
		newEntries.add(toInsert);

		// invoke insertReplicates method on all nodes in successor list
		final Set<Entry> mustBeFinal = new HashSet<Entry>(newEntries);
		for (final Node successor : this.references.getSuccessors()) {
			this.asyncExecutor.execute(new Runnable() {
				public void run() {
					try {
						successor.insertReplicas(mustBeFinal);
					} catch (CommunicationException e) {
						// do nothing
					}
				}
			});
		}
	}

	public final void insertReplicas(Set<Entry> replicatesToInsert) {
		this.entries.addAll(replicatesToInsert);
	}

	public final void removeEntry(Entry entryToRemove)
			throws CommunicationException {

		this.logger.debug("Removing entry with id " + entryToRemove.getId()
				+ " at node " + this.nodeID);

		// Possible, but rare situation: a new node has joined which now is
		// responsible for the id!
		if (this.references.getPredecessor() != null
				&& !entryToRemove.getId().isInInterval(
						this.references.getPredecessor().nodeID, this.nodeID)) {
			this.references.getPredecessor().removeEntry(entryToRemove);
			return;
		}

		// remove entry from repository
		this.entries.remove(entryToRemove);

		// create set containing this entry for removal of replicates at all
		// nodes in successor list
		final Set<Entry> entriesToRemove = new HashSet<Entry>();
		entriesToRemove.add(entryToRemove);

		// invoke removeReplicates method on all nodes in successor list
		List<Node> successors = this.references.getSuccessors();
		final ID id = this.nodeID;
		for (final Node successor : successors) {
			this.asyncExecutor.execute(new Runnable() {
				public void run() {
					try {
						// remove only replica of removed entry
						successor.removeReplicas(id, entriesToRemove);
					} catch (CommunicationException e) {
						// do nothing for the moment
					}
				}
			});
		}
	}

	public final void removeReplicas(ID sendingNodeID,
			Set<Entry> replicasToRemove) {
		if (replicasToRemove.size() == 0) {
			// remove all replicas in interval
			Set<Entry> allReplicasToRemove = this.entries.getEntriesInInterval(
					this.nodeID, sendingNodeID);
			this.entries.removeAll(allReplicasToRemove);
		} else {
			// remove only replicas of given entry
			this.entries.removeAll(replicasToRemove);
		}
	}

	public final synchronized Set<Entry> retrieveEntries(ID id)
			throws CommunicationException {

		// Possible, but rare situation: a new node has joined which now is
		// responsible for the id!
		if (this.references.getPredecessor() != null
				&& !id.isInInterval(this.references.getPredecessor().nodeID,
						this.nodeID)) {
			this.logger.fatal("The rare situation has occured at time "
					+ System.currentTimeMillis() + ", id to look up=" + id
					+ ", id of local node=" + this.nodeID
					+ ", id of predecessor="
					+ this.references.getPredecessor().nodeID);
			return this.references.getPredecessor().retrieveEntries(id);
		}

		// return entries from local repository
		return this.entries.getEntries(id);
	}

	final public void leavesNetwork(Node predecessor) {
		this.logger.info("Leaves network invoked; " + this.nodeID + ". Updating references.");
		this.logger.info("New predecessor " + predecessor.nodeID); 
		this.logger.debug("References before update: " + this.references.toString());
		this.references.removeReference(this.references.getPredecessor());
		this.references.addReference(predecessor);
		this.references.setPredecessor(predecessor);
		this.logger.debug("References after update: " + this.references.toString());
	}

	/**
	 * Returns the local ChordImpl instance. Actually, this is only used by the
	 * test implementation with multiple Chord nodes in one JVM. Might be
	 * discarded in the future.
	 * 
	 * @return A reference on the local ChordImpl instance.
	 * @deprecated Tasks on behalf of remote nodes should be conducted by this
	 *             class itself and not by delegating them to the ChordImpl
	 *             instance!
	 */
	public final ChordImpl getChordImpl() {
		return this.impl;
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -