⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chordimpl.java

📁 一个对等计算发生器
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
		// create NodeImpl instance for communication
		this.localNode = new NodeImpl(this, this.getID(), this.localURL,
				this.references, this.entries);

		// create proxy for outgoing connection to bootstrap node
		Node bootstrapNode;
		try {
			bootstrapNode = Proxy.createConnection(this.localURL, bootstrapURL);
		} catch (CommunicationException e) {
			throw new ServiceException(
					"An error occured when creating a proxy for outgoing "
							+ "connection to bootstrap node! Join operation"
							+ "failed!", e);

		}

		// only an optimization: store reference on bootstrap node
		this.references.addReference(bootstrapNode);

		// Asking for my successor at node bootstrapNode.nodeID

		// find my successor
		Node mySuccessor;
		try {
			mySuccessor = bootstrapNode.findSuccessor(this.getID());
		} catch (CommunicationException e1) {
			throw new ServiceException("An error occured when trying to find "
					+ "the successor of this node using bootstrap node "
					+ "with url " + bootstrapURL.toString() + "! Join "
					+ "operation failed!", e1);
		}

		// store reference on my successor
		this.logger.info(this.localURL + " has successor " + mySuccessor.nodeURL); 
		this.references.addReference(mySuccessor);

		// notify successor for the first time and copy keys from successor
		if (mySuccessor != null) {

			RefsAndEntries copyOfRefsAndEntries;
			try {
				copyOfRefsAndEntries = mySuccessor
						.notifyAndCopyEntries(this.localNode);
			} catch (CommunicationException e2) {
				throw new ServiceException("An error occured when contacting "
						+ "the successor of this node in order to "
						+ "obtain its references and entries! Join "
						+ "operation failed!", e2);
			}

			// add new references, if pings are successful
			Set<Node> pingRequestSent = new HashSet<Node>();
			for (final Node newReference : copyOfRefsAndEntries.getRefs()) {
				if (newReference != null
						&& !newReference.equals(this.localNode)
						&& !this.references.containsReference(newReference)) {

					if (!pingRequestSent.contains(newReference)) {
						this.logger
								.debug("Sending ping request to new reference "
										+ newReference.nodeID.toHexString());
						this.asyncExecutor.execute(new Runnable() {
							public void run() {
								try {
									newReference.ping();
									ChordImpl.this.references
											.addReference(newReference);
									ChordImpl.this.logger
											.info("Added reference on "
													+ newReference.nodeID
													+ " which responded to "
													+ "ping request");
								} catch (CommunicationException e) {
									ChordImpl.this.logger.info("New reference "
											+ newReference + " unreachable");
								}
							}
						});
						pingRequestSent.add(newReference);
					}
				}
			}

			// add copied entries of successor
			this.entries.addAll(copyOfRefsAndEntries.getEntries());
		}

		// accept content requests from outside
		this.localNode.acceptEntries();

		// create tasks for fixing finger table, checking predecessor and
		// stabilizing
		this.createTasks();
	}

	public final void leave() {

		if (this.localNode == null) {
			// ring has not been created or joined, st. leave has no effect
			return;
		}

		this.maintenanceTasks.shutdownNow();

		// this.executor.shutdown();
		// inform successor about leaving
		try {
			Node successor = this.references.getSuccessor();
			if (successor != null && this.references.getPredecessor() != null) {
				successor.leavesNetwork(this.references.getPredecessor());
			}
		} catch (CommunicationException e) {
			/*
			 * throw new ServiceException( "An unknown error occured when trying
			 * to contact the " + "successor of this node to inform it about " +
			 * "leaving! Leave operation failed!");
			 */

		}

		this.localNode.disconnect();
		this.asyncExecutor.shutdownNow(); 
		this.localNode = null;

	}

	public final void insert(Key key, Serializable s) {

		// check parameters
		if (key == null || s == null) {
			throw new NullPointerException(
					"Neither parameter may have value null!");
		}

		// determine ID for key
		ID id = this.hashFunction.getHashKey(key);
		Entry entryToInsert = new Entry(id, s);

		this.logger.debug("Inserting new entry with id " + id);

		boolean inserted = false;
		while (!inserted) {

			// find successor of id
			Node responsibleNode;
			// try {
			responsibleNode = this.findSuccessor(id);
			// } catch (CommunicationException e) {
			// this.logger.warn(
			// "An error occured while finding the successor of the id "
			// + "to insert! Insert operation failed!", e);
			// continue;
			// }

			this.logger.debug("Invoking insertEntry method on node "
					+ responsibleNode.nodeID);

			// invoke insertEntry method
			try {
				responsibleNode.insertEntry(entryToInsert);
				inserted = true;
			} catch (CommunicationException e1) {
				this.logger.warn(
						"An error occured while invoking the insertEntry method "
								+ " on the appropriate node! Insert operation "
								+ "failed!", e1);
				continue;
			}
		}
		this.logger.info("New entry was inserted!");
	}

	public final Set<Serializable> retrieve(Key key) {

		// check parameters
		if (key == null) {
			NullPointerException e = new NullPointerException(
					"Key must not have value null!");
			this.logger.error("Null pointer", e);
			throw e;
		}

		// determine ID for key
		ID id = this.hashFunction.getHashKey(key);

		this.logger.debug("Retrieving entries with id " + id);

		Set<Entry> result = null;

		boolean retrieved = false;
		while (!retrieved) {

			// find successor of id
			Node responsibleNode = null;
			// try {
			responsibleNode = findSuccessor(id);
			// } catch (CommunicationException e) {
			// this.logger.warn(
			// "An error occured while finding the successor of the id "
			// + "to retrieve! Retrieve operation failed!", e);
			// continue;
			// }

			// logger.fatal("retrieve: Successor of key " + key + ": " +
			// responsibleNode.nodeID);

			// invoke retrieveEntry method
			try {
				result = responsibleNode.retrieveEntries(id);
				// cause while loop to end.

				retrieved = true;
			} catch (CommunicationException e1) {
				this.logger
						.warn(
								"An error occured while invoking the retrieveEntry method "
										+ " on the appropriate node! Retrieve operation "
										+ "failed!", e1);
				continue;
			}
		}
		Set<Serializable> values = new HashSet<Serializable>();

		for (Entry entry : result) {
			values.add(entry.getValue());
		}

		this.logger.info("Entries were retrieved!");

		return values;

	}

	public final void remove(Key key, Serializable s) {

		// check parameters
		if (key == null || s == null) {
			throw new NullPointerException(
					"Neither parameter may have value null!");
		}

		// determine ID for key
		ID id = this.hashFunction.getHashKey(key);
		Entry entryToRemove = new Entry(id, s);

		boolean removed = false;
		while (!removed) {

			this.logger.debug("Removing entry with id " + id + " and value "
					+ s);

			// find successor of id
			Node responsibleNode;
			// try {
			responsibleNode = findSuccessor(id);
			// } catch (CommunicationException e) {
			// this.logger.warn(
			// "An error occured while finding the successor of the id "
			// + "to insert! Remove operation failed!", e);
			// continue;
			// }

			this.logger.debug("Invoking removeEntry method on node "
					+ responsibleNode.nodeID);

			// invoke removeEntry method
			try {
				responsibleNode.removeEntry(entryToRemove);
				removed = true;
			} catch (CommunicationException e1) {
				this.logger.warn(
						"An error occured while invoking the removeEntry method "
								+ " on the appropriate node! Remove operation "
								+ "failed!", e1);
				continue;
			}
		}
		this.logger.info("Entry was removed!");
	}

	/**
	 * Returns a human-readable string representation containing this node's
	 * node ID and URL.
	 * 
	 * @see java.lang.Object#toString()
	 */
	@Override
	public final String toString() {
		return "Chord node: id = "
				+ (this.localID == null ? "null" : this.localID.toString())
				+ ", url = "
				+ (this.localURL == null ? "null" : this.localURL.toString()
						+ "\n");
	}

	/**
	 * Returns the Chord node which is responsible for the given key.
	 * 
	 * @param key
	 *            Key for which the successor is searched for.
	 * @throws NullPointerException
	 *             If given ID is <code>null</code>.
	 * @return Responsible node.
	 */
	final Node findSuccessor(ID key) {

		if (key == null) {
			NullPointerException e = new NullPointerException(
					"ID to find successor for may not be null!");
			this.logger.error("Null pointer.", e);
			throw e;
		}

		// check if the local node is the only node in the network
		Node successor = this.references.getSuccessor();
		if (successor == null) {

			this.logger
					.info("I appear to be the only node in the network, so I am "
							+ "my own "
							+ "successor; return reference on me: "
							+ this.getID());

			return this.localNode;
		}
		// check if the key to look up lies between this node and its successor
		else if (key.isInInterval(this.getID(), successor.nodeID)
				|| key.equals(successor.nodeID)) {

			this.logger.info("The requested key lies between my own and my "
					+ "successor's node id; therefore return my successor.");

			// try to reach successor
			try {
				successor.ping(); // if methods returns, successor is alive

				this.logger.info("Returning my successor " + successor.nodeID
						+ " of type " + successor.getClass());

				return successor;
			} catch (CommunicationException e) {
				// not successful, delete node from successor list and finger
				// table, and set new successor, if available
				this.logger
						.warn("Successor did not respond! Removing it from all "
								+ "lists and retrying...");
				this.references.removeReference(successor);
				return findSuccessor(key);
			}
		}

		// ask closest preceding node found in local references for closest
		// preceding node concerning the key to look up
		else {

			Node closestPrecedingNode = this.references
					.getClosestPrecedingNode(key);

			try {
				this.logger
						.info("Asking closest preceding node known to this node for closest preceding node "
								+ closestPrecedingNode.nodeID
								+ " concerning key " + key + " to look up");
				return closestPrecedingNode.findSuccessor(key);
			} catch (CommunicationException e) {
				this.logger
						.warn("Communication failure while requesting successor "
								+ "for key "
								+ key
								+ " from node "
								+ closestPrecedingNode.toString()
								+ " - looking up successor for failed node "
								+ closestPrecedingNode.toString());
				this.references.removeReference(closestPrecedingNode);
				return findSuccessor(key);
			}
		}
	}

	/* Implementation of Report interface */
	public final String printEntries() {
		return this.entries.toString();
	}

	public final String printFingerTable() {
		return this.references.printFingerTable();
	}

	public final String printSuccessorList() {
		return this.references.printSuccessorList();
	}

	public final String printReferences() {
		return this.references.toString();
	}

	public final String printPredecessor() {
		Node pre = this.references.getPredecessor();
		if (pre == null) {
			return "Predecessor: null";
		} else {
			return "Predecessor: " + pre.toString();
		}
	}

	public void retrieve(final Key key, final ChordCallback callback) {
		final Chord chord = this;
		this.asyncExecutor.execute(new Runnable() {
			public void run() {
				Throwable t = null;
				Set<Serializable> result = null;
				try {
					result = chord.retrieve(key);
				} catch (ServiceException e) {
					t = e;
				} catch (Throwable th) {
					t = th;
				}
				callback.retrieved(key, result, t);
			}
		});
	}

	public void insert(final Key key, final Serializable entry,
			final ChordCallback callback) {
		final Chord chord = this;
		this.asyncExecutor.execute(new Runnable() {
			public void run() {
				Throwable t = null;
				try {
					chord.insert(key, entry);
				} catch (ServiceException e) {
					t = e;
				} catch (Throwable th) {
					t = th;
				}
				callback.inserted(key, entry, t);
			}
		});
	}

	public void remove(final Key key, final Serializable entry,
			final ChordCallback callback) {
		final Chord chord = this;
		this.asyncExecutor.execute(new Runnable() {
			public void run() {
				Throwable t = null;
				try {
					chord.remove(key, entry);
				} catch (ServiceException e) {
					t = e;
				} catch (Throwable th) {
					t = th;
				}
				callback.removed(key, entry, t);
			}
		});
	}

	public ChordRetrievalFuture retrieveAsync(Key key) {
		return ChordRetrievalFutureImpl.create(this.asyncExecutor, this, key);
	}

	public ChordFuture insertAsync(Key key, Serializable entry) {
		return ChordInsertFuture.create(this.asyncExecutor, this, key, entry);
	}

	public ChordFuture removeAsync(Key key, Serializable entry) {
		return ChordRemoveFuture.create(this.asyncExecutor, this, key, entry);
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -