📄 socketsourceroutemanager.java
字号:
* Should be called while synchronized on nodeHandles * * @param address * @param search DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public AddressManager putAddressManager(EpochInetSocketAddress address, boolean search) { WeakReference wr = (WeakReference) nodeHandles.get(address); SocketNodeHandle snh; AddressManager manager; if (wr == null) { snh = new SocketNodeHandle(address, null); snh.setLocalNode(spn); wr = new WeakReference(snh); nodeHandles.put(address, wr); } else { snh = (SocketNodeHandle) wr.get(); if (snh == null) { // WARNING: this code must be repeated because of a very slight timing // issue with the garbage collector snh = new SocketNodeHandle(address, null); snh.setLocalNode(spn); wr = new WeakReference(snh); nodeHandles.put(address, wr); } } if (snh.addressManager != null) { throw new IllegalStateException("Address manager for address " + address + " already exists."); } manager = new AddressManager(snh, search); // TODO make this time configurable this.spn.getEnvironment().getSelectorManager().getTimer().schedule( new HardLinkTimerTask(manager), 30000); snh.addressManager = manager; return manager; } /** * Method which sends a bootstrap message across the wire. * * @param address The address to send the message to * @param msg DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ public void bootstrap(EpochInetSocketAddress address, Message msg) throws IOException {// PRawMessage rm;// if (msg instanceof PRawMessage) {// rm = (PRawMessage)msg;// } else {// rm = new PJavaSerializedMessage(msg);// }// // todo, pool// final SocketBuffer message = new SocketBuffer(defaultDeserializer);// message.serialize(rm, logger); manager.bootstrap(SourceRoute.build(address), msg); } /** * DESCRIBE THE METHOD * * @param address DESCRIBE THE PARAMETER * @param msg DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ public void send(EpochInetSocketAddress address, Message msg) throws IOException { PRawMessage rm; if (msg instanceof PRawMessage) { rm = (PRawMessage) msg; } else { rm = new PJavaSerializedMessage(msg); } // todo, pool final SocketBuffer buffer = new SocketBuffer(manager.defaultDeserializer, manager.pastryNode); buffer.serialize(rm, true); send(address, buffer); } /** * Method which sends a message across the wire. * * @param message The message to send * @param address The address to send the message to */ public void send(final EpochInetSocketAddress address, final SocketBuffer message) { if (spn.getEnvironment().getSelectorManager().isSelectorThread()) { getAddressManager(address, true).send(message); } else { if (logger.level <= Logger.FINE) { logger.log("Application attempted to send " + message + " to " + address + " on a non-selector thread."); } spn.getEnvironment().getSelectorManager().invoke( new Runnable() { public void run() { getAddressManager(address, true).send(message); } }); } } /** * Method which sends a message across the wire. * * @param address The address to send the message to * @param appAddress DESCRIBE THE PARAMETER * @param receiver DESCRIBE THE PARAMETER * @param timeout DESCRIBE THE PARAMETER */ public void connect(final EpochInetSocketAddress address, final int appAddress, final AppSocketReceiver receiver, final int timeout) { if (spn.getEnvironment().getSelectorManager().isSelectorThread()) { getAddressManager(address, true).connect(appAddress, receiver, timeout); } else { if (logger.level <= Logger.FINE) { logger.log("Application " + appAddress + " attempted to open a connection to " + address + " on a non-selector thread."); } spn.getEnvironment().getSelectorManager().invoke( new Runnable() { public void run() { getAddressManager(address, true).connect(appAddress, receiver, timeout); } }); } } /** * Method which suggests a ping to the remote node. * * @param address DESCRIBE THE PARAMETER */ public void ping(EpochInetSocketAddress address) { AddressManager am = getAddressManager(address); if (am == null) { manager.ping(SourceRoute.build(address)); } else { am.ping(); } } /** * Method which FORCES a check of liveness of the remote node. Note that this * method should ONLY be called by internal Pastry maintenance algorithms - * this is NOT to be used by applications. Doing so will likely cause a blowup * of liveness traffic. * * @param address DESCRIBE THE PARAMETER */ public void checkLiveness(EpochInetSocketAddress address) { getAddressManager(address, true).checkLiveness(); } /** * Method which returns the last cached proximity value for the given address. * If there is no cached value, then DEFAULT_PROXIMITY is returned. * * @param address The address to return the value for * @return The ping value to the remote address */ public int proximity(EpochInetSocketAddress address) { AddressManager am = getAddressManager(address); if (am == null) { return SocketNodeHandle.DEFAULT_PROXIMITY; } else { return am.proximity(); } } /** * This method should be called when a known route is declared dead. * * @param route The now-dead route */ protected void markDead(SourceRoute route) { if (logger.level <= Logger.FINE) { logger.log("(SSRM) Found route " + route + " to be dead"); } AddressManager am = getAddressManager(route.getLastHop()); if (am != null) { am.markDead(route); } } /** * This method should be called when a known node is declared dead - this is * ONLY called when a new epoch of that node is detected. Note that this * method is silent - no checks are done. Caveat emptor. * * @param address The now-dead address */ protected void markDead(EpochInetSocketAddress address) { AddressManager am = getAddressManager(address); if (am != null) { am.markDeadForever(); } } /** * This method should be called when a known route is declared alive. * * @param route The now-live route */ protected void markAlive(SourceRoute route) { if (logger.level <= Logger.FINE) { logger.log("(SSRM) Found route " + route + " to be alive"); } getAddressManager(route.getLastHop(), false).markAlive(route); } /** * DESCRIBE THE METHOD * * @param route DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ protected int proximity(SourceRoute route) { return getAddressManager(route.getLastHop(), false).proximity(); } /** * This method should be called when a known route is declared suspected. * * @param route The now-live route */ protected void markSuspected(SourceRoute route) { if (logger.level <= Logger.FINE) { logger.log("(SSRM) Found route " + route + " to be suspected"); } getAddressManager(route.getLastHop(), false).markSuspected(route); } /** * This method should be called when a known route has its proximity updated * * @param route The route * @param proximity The proximity */ protected synchronized void markProximity(SourceRoute route, int proximity) { getAddressManager(route.getLastHop(), false).markProximity(route, proximity); } /** * Reroutes the given message. If this node is alive, send() is called. If * this node is not alive and the message is a route message, it is rerouted. * Otherwise, the message is dropped. * * @param m The message * @param address The address of the remote node */// protected void reroutee(EpochInetSocketAddress address, Message m) {// if (getLiveness(address) == SocketNodeHandle.LIVENESS_ALIVE) {// if (logger.level <= Logger.INFO) logger.log( "(SSRM) Attempting to resend message " + m + " to alive address " + address);// send(address, m);// } else {// if (m instanceof RouteMessage) {// if (((RouteMessage) m).getOptions().multipleHopsAllowed()) {// if (logger.level <= Logger.INFO) logger.log( "(SSRM) Attempting to reroute route message " + m);// ((RouteMessage) m).nextHop = null;// // kick it back to pastry// spn.receiveMessage(m);// } else if (getLiveness(address) <= SocketNodeHandle.LIVENESS_SUSPECTED) {// // it's required to go to this address only// send(address, m);// } else {// // this address is dead, and the routemessage is not allowed to go anywhere else// if (logger.level <= Logger.WARNING) logger.log("(SSRM) Dropping message " + m + " because next hop "+address+" is dead!");// }// } else {// if (logger.level <= Logger.WARNING) logger.log("(SSRM) Dropping message " + m + " because next hop "+address+" is dead!");// }// }// } /** * Reroutes the given message. If this node is alive, send() is called. If * this node is not alive and the message is a route message, it is rerouted. * Otherwise, the message is dropped. Can be called when a socket is closed, * if for example a different source route is found. This is how * non-routemessages may be called here For suspected/dead, it will get called * with all RouteMessages * * @param m The message * @param address The address of the remote node */ protected void reroute(EpochInetSocketAddress address, SocketBuffer m) { if (m.discard) { if (logger.level <= Logger.FINE) { logger.log("(SSRM) Dropping garbage in resend message " + m + " address " + address + " with liveness " + getLiveness(address)); } return; } switch (getLiveness(address)) { case SocketNodeHandle.LIVENESS_ALIVE: if (logger.level <= Logger.INFO) { logger.log("(SSRM) Attempting to resend message " + m + " to alive address " + address); } send(address, m); return; case SocketNodeHandle.LIVENESS_SUSPECTED: if (m.isRouteMessage()) { if (m.getOptions().multipleHopsAllowed() && m.getOptions().rerouteIfSuspected()) { // kick it back to pastry if (logger.level <= Logger.INFO) { logger.log("(SSRM) Attempting to reroute route message " + m); } RouteMessage rm = m.getRouteMessage(); rm.nextHop = null; spn.receiveMessage(rm); return; } } else { if (logger.level <= Logger.INFO) { logger.log("(SSRM) Attempting to resend message " + m + " to alive address " + address); } send(address, m); return; } case SocketNodeHandle.LIVENESS_DEAD: case SocketNodeHandle.LIVENESS_DEAD_FOREVER: if (m.isRouteMessage()) { if (m.getOptions().multipleHopsAllowed()) { if (logger.level <= Logger.INFO) { logger.log("(SSRM) Attempting to reroute route message " + m); } RouteMessage rm = m.getRouteMessage(); rm.nextHop = null; spn.receiveMessage(rm); return; } } } if (logger.level <= Logger.WARNING) { logger.log("(SSRM) Dropping message " + m + " because next hop " + address + " is dead!"); } } /** * Internal class which is tasked with maintaining the status of a single * remote address. This class is in charge of all source routes to that * address, as well as declaring liveness/death of this address * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ protected class AddressManager {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -