📄 threadendpoint.java
字号:
this.node.leavesNetwork(predecessor);
this
.notifyInvocationListenersFinished(InvocationListener.LEAVES_NETWORK);
}
/**
* @param sendingNodeID
* @param entriesToRemove
* @throws CommunicationException
*/
public void removeReplicas(ID sendingNodeID, Set<Entry> entriesToRemove)
throws CommunicationException {
this.checkIfCrashed();
this.waitFor(Endpoint.LISTENING);
this.notifyInvocationListeners(InvocationListener.REMOVE_REPLICAS);
this.node.removeReplicas(sendingNodeID, entriesToRemove);
this
.notifyInvocationListenersFinished(InvocationListener.REMOVE_REPLICAS);
}
/**
* @param entries
* @throws CommunicationException
*/
public void insertReplicas(Set<Entry> entries)
throws CommunicationException {
this.checkIfCrashed();
this.waitFor(Endpoint.LISTENING);
this.notifyInvocationListeners(InvocationListener.INSERT_REPLICAS);
this.node.insertReplicas(entries);
this
.notifyInvocationListenersFinished(InvocationListener.INSERT_REPLICAS);
}
/**
* @param potentialPredecessor
* @return Implementation of {@link Node#notify(Node)}. See documentation
* of {@link Node}.
* @throws CommunicationException
*/
public RefsAndEntries notifyAndCopyEntries(Node potentialPredecessor)
throws CommunicationException {
this.checkIfCrashed();
this.waitFor(Endpoint.ACCEPT_ENTRIES);
this.notifyInvocationListeners(InvocationListener.NOTIFY_AND_COPY);
RefsAndEntries refs = this.node
.notifyAndCopyEntries(potentialPredecessor);
List<Node> nodes = refs.getRefs();
for (Node current : nodes) {
if (current == this.node) {
nodes.remove(current);
this.logger
.debug("Returned node is local node. Converting to 'remote' reference. ");
nodes.add(new ThreadProxy(this.url, this.url));
}
}
this
.notifyInvocationListenersFinished(InvocationListener.NOTIFY_AND_COPY);
return new RefsAndEntries(nodes, refs.getEntries());
}
/**
* Wait for the endpoint to get into given state.
*
* @param state_
* The state to wait for.
* @throws CommunicationException
*/
private void waitFor(int state_) throws CommunicationException {
synchronized (this.lock) {
while (this.getState() < state_) {
try {
this.logger.debug(Thread.currentThread()
+ " waiting for state: " + state_);
this.lock.wait();
if (state_ == CRASHED) {
throw new CommunicationException(
"Connection destroyed!");
}
} catch (InterruptedException t) {
this.logger.warn("Unexpected exception while waiting!", t);
}
}
}
}
/**
* Notify threads waiting for monitor on lock.
*/
private void notifyWaitingThreads() {
synchronized (this.lock) {
this.logger.debug("Notifying waiting threads.");
this.lock.notifyAll();
}
}
/*
* (non-Javadoc)
*
* @see de.uniba.wiai.lspi.chord.com.Endpoint#openConnections()
*/
protected void openConnections() {
/** state has changed. notify waiting threads */
this.logger.debug("openConnections()");
this.notifyWaitingThreads();
this.registry.bind(this);
}
/*
* (non-Javadoc)
*
* @see de.uniba.wiai.lspi.chord.com.Endpoint#closeConnections()
*/
protected void closeConnections() {
this.registry.unbind(this);
this.registry.removeProxiesInUseBy(this.getURL());
/** state has changed. notify waiting threads */
this.notifyWaitingThreads();
}
/*
* (non-Javadoc)
*
* @see de.uniba.wiai.lspi.chord.com.Endpoint#entriesAcceptable()
*/
protected void entriesAcceptable() {
/** state has changed. notify waiting threads */
this.notifyWaitingThreads();
}
/**
* Method to emulate a crash of the node that this is the endpoint for. This
* method heavily relise on the internal structure of service layer
* implementation to make it possible to emulate a chord overlay network
* within one JVM.
*
* This method may cause problems at runtime.
*/
public void crash() {
this.logger.debug("crash() invoked!");
this.registry.unbind(this);
List<ThreadProxy> proxies = this.registry.getProxiesInUseBy(this
.getURL());
if (proxies != null) {
for (ThreadProxy p : proxies) {
p.invalidate();
}
}
this.registry.removeProxiesInUseBy(this.getURL());
this.setState(CRASHED);
this.notifyWaitingThreads();
/* kill threads of node (gefrickelt) */
ChordImpl impl = ChordImplAccess.fetchChordImplOfNode(this.node);
Field[] fields = impl.getClass().getDeclaredFields();
this.logger.debug(fields.length + " fields obtained from class "
+ impl.getClass());
for (Field field : fields) {
this.logger.debug("Examining field " + field + " of node "
+ this.node);
try {
if (field.getName().equals("maintenanceTasks")) {
field.setAccessible(true);
Object executor = field.get(impl);
this.logger.debug("Shutting down TaskExecutor " + executor
+ ".");
Method m = executor.getClass().getMethod("shutdown",
new Class[0]);
m.setAccessible(true);
m.invoke(executor, new Object[0]);
}
} catch (Throwable t) {
this.logger.warn("Could not kill threads of node " + this.node,
t);
t.printStackTrace();
}
}
Endpoint.endpoints.remove(this.url);
this.invocationListeners = null;
}
/**
* Checks if this has crashed.
*
* @throws CommunicationException
*/
private void checkIfCrashed() throws CommunicationException {
if ((this.getState() == CRASHED)
|| (this.getState() < Endpoint.LISTENING)) {
this.logger.debug(this + " has crashed. Throwing Exception.");
throw new CommunicationException();
}
}
/** ********************************************************** */
/* START: Methods overwritten from java.lang.Object */
/** ********************************************************** */
/**
* Overwritten from {@link java.lang.Object}. Two ThreadEndpoints A and B
* are equal if they are endpoints for the node with the same name. (A.name ==
* B.name).
*
* @param obj
* @return <code>true</code> if this equals the provided <code>obj</code>.
*/
public boolean equals(Object obj) {
if (obj instanceof ThreadEndpoint) {
ThreadEndpoint ep = (ThreadEndpoint) obj;
URL epURL = ep.getURL();
return ((epURL.equals(this.getURL())) && (ep.hashCode() == this
.hashCode()));
} else {
return false;
}
}
/**
* Overwritten from {@link java.lang.Object}.
*
* @return Overwritten from {@link java.lang.Object}.
*/
public int hashCode() {
return super.hashCode();
}
/**
* Overwritten from {@link java.lang.Object}.
*/
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("[ThreadEndpoint for node ");
buffer.append(this.node);
buffer.append(" with URL ");
buffer.append(this.url);
buffer.append("]");
return buffer.toString();
}
/** ********************************************************** */
/* END: Methods overwritten from java.lang.Object */
/** ********************************************************** */
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -