⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadendpoint.java

📁 Chord package into p2psim
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
		this.node.leavesNetwork(predecessor);
		this
				.notifyInvocationListenersFinished(InvocationListener.LEAVES_NETWORK);
	}

	/**
	 * @param sendingNodeID
	 * @param entriesToRemove
	 * @throws CommunicationException
	 */
	public void removeReplicas(ID sendingNodeID, Set<Entry> entriesToRemove)
			throws CommunicationException {
		this.checkIfCrashed();
		this.waitFor(Endpoint.LISTENING);
		this.notifyInvocationListeners(InvocationListener.REMOVE_REPLICAS);
		this.node.removeReplicas(sendingNodeID, entriesToRemove);
		this
				.notifyInvocationListenersFinished(InvocationListener.REMOVE_REPLICAS);
	}

	/**
	 * @param entries
	 * @throws CommunicationException
	 */
	public void insertReplicas(Set<Entry> entries)
			throws CommunicationException {
		this.checkIfCrashed();
		this.waitFor(Endpoint.LISTENING);
		this.notifyInvocationListeners(InvocationListener.INSERT_REPLICAS);
		this.node.insertReplicas(entries);
		this
				.notifyInvocationListenersFinished(InvocationListener.INSERT_REPLICAS);
	}

	/**
	 * @param potentialPredecessor
	 * @return Implementation of {@link Node#notify(Node)}. See documentation
	 *         of {@link Node}.
	 * @throws CommunicationException
	 */
	public RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
			throws CommunicationException {
		this.checkIfCrashed();
		this.waitFor(Endpoint.ACCEPT_ENTRIES);
		this.notifyInvocationListeners(InvocationListener.NOTIFY_AND_COPY);
		RefsAndEntries refs = this.node
				.notifyAndCopyEntries(potentialPredecessor);
		List<Node> nodes = refs.getRefs();

		for (Node current : nodes) {
			if (current == this.node) {
				nodes.remove(current);

				this.logger
						.debug("Returned node is local node. Converting to 'remote' reference. ");
				nodes.add(new ThreadProxy(this.url, this.url));
			}
		}
		this
				.notifyInvocationListenersFinished(InvocationListener.NOTIFY_AND_COPY);
		return new RefsAndEntries(nodes, refs.getEntries());
	}

	/**
	 * Wait for the endpoint to get into given state.
	 * 
	 * @param state_
	 *            The state to wait for.
	 * @throws CommunicationException
	 */
	private void waitFor(int state_) throws CommunicationException {
		synchronized (this.lock) {
			while (this.getState() < state_) {
				try {
					this.logger.debug(Thread.currentThread()
							+ " waiting for state: " + state_);
					this.lock.wait();
					if (state_ == CRASHED) {
						throw new CommunicationException(
								"Connection destroyed!");
					}
				} catch (InterruptedException t) {
					this.logger.warn("Unexpected exception while waiting!", t);
				}
			}
		}
	}

	/**
	 * Notify threads waiting for monitor on lock.
	 */
	private void notifyWaitingThreads() {
		synchronized (this.lock) {
			this.logger.debug("Notifying waiting threads.");
			this.lock.notifyAll();
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see de.uniba.wiai.lspi.chord.com.Endpoint#openConnections()
	 */
	protected void openConnections() {
		/** state has changed. notify waiting threads */
		this.logger.debug("openConnections()");
		this.notifyWaitingThreads();
		this.registry.bind(this);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see de.uniba.wiai.lspi.chord.com.Endpoint#closeConnections()
	 */
	protected void closeConnections() {
		this.registry.unbind(this);
		this.registry.removeProxiesInUseBy(this.getURL());
		/** state has changed. notify waiting threads */
		this.notifyWaitingThreads();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see de.uniba.wiai.lspi.chord.com.Endpoint#entriesAcceptable()
	 */
	protected void entriesAcceptable() {
		/** state has changed. notify waiting threads */
		this.notifyWaitingThreads();
	}

	/**
	 * Method to emulate a crash of the node that this is the endpoint for. This
	 * method heavily relise on the internal structure of service layer
	 * implementation to make it possible to emulate a chord overlay network
	 * within one JVM.
	 * 
	 * This method may cause problems at runtime.
	 */
	public void crash() {
		this.logger.debug("crash() invoked!");
		this.registry.unbind(this);
		List<ThreadProxy> proxies = this.registry.getProxiesInUseBy(this
				.getURL());
		if (proxies != null) {
			for (ThreadProxy p : proxies) {
				p.invalidate();
			}
		}
		
		this.registry.removeProxiesInUseBy(this.getURL());
		this.setState(CRASHED);
		this.notifyWaitingThreads();
		/* kill threads of node (gefrickelt) */
		ChordImpl impl = ChordImplAccess.fetchChordImplOfNode(this.node);
		Field[] fields = impl.getClass().getDeclaredFields();
		this.logger.debug(fields.length + " fields obtained from class "
				+ impl.getClass());
		for (Field field : fields) {
			this.logger.debug("Examining field " + field + " of node "
					+ this.node);
			try {
				if (field.getName().equals("maintenanceTasks")) {
					field.setAccessible(true);

					Object executor = field.get(impl);
					this.logger.debug("Shutting down TaskExecutor " + executor
							+ ".");
					Method m = executor.getClass().getMethod("shutdown",
							new Class[0]);
					m.setAccessible(true);
					m.invoke(executor, new Object[0]);
				}
			} catch (Throwable t) {
				this.logger.warn("Could not kill threads of node " + this.node,
						t);
				t.printStackTrace();
			}
		}
		Endpoint.endpoints.remove(this.url);
		this.invocationListeners = null;
	}

	/**
	 * Checks if this has crashed.
	 * 
	 * @throws CommunicationException
	 */
	private void checkIfCrashed() throws CommunicationException {
		if ((this.getState() == CRASHED)
				|| (this.getState() < Endpoint.LISTENING)) {
			this.logger.debug(this + " has crashed. Throwing Exception.");
			throw new CommunicationException();
		}
	}

	/** ********************************************************** */
	/* START: Methods overwritten from java.lang.Object */
	/** ********************************************************** */

	/**
	 * Overwritten from {@link java.lang.Object}. Two ThreadEndpoints A and B
	 * are equal if they are endpoints for the node with the same name. (A.name ==
	 * B.name).
	 * 
	 * @param obj
	 * @return <code>true</code> if this equals the provided <code>obj</code>.
	 */
	public boolean equals(Object obj) {
		if (obj instanceof ThreadEndpoint) {
			ThreadEndpoint ep = (ThreadEndpoint) obj;
			URL epURL = ep.getURL();
			return ((epURL.equals(this.getURL())) && (ep.hashCode() == this
					.hashCode()));
		} else {
			return false;
		}
	}

	/**
	 * Overwritten from {@link java.lang.Object}.
	 * 
	 * @return Overwritten from {@link java.lang.Object}.
	 */
	public int hashCode() {
		return super.hashCode();
	}

	/**
	 * Overwritten from {@link java.lang.Object}.
	 */
	public String toString() {
		StringBuilder buffer = new StringBuilder();
		buffer.append("[ThreadEndpoint for node ");
		buffer.append(this.node);
		buffer.append(" with URL ");
		buffer.append(this.url);
		buffer.append("]");
		return buffer.toString();
	}

	/** ********************************************************** */
	/* END: Methods overwritten from java.lang.Object */
	/** ********************************************************** */

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -