📄 socketpastrynodefactory.java
字号:
* generated NodeId. * * @param bootstrap Node handle to bootstrap from. * @param nodeId DESCRIBE THE PARAMETER * @return A node with a random ID and next port number. */ public PastryNode newNode(final NodeHandle bootstrap, Id nodeId) { return newNode(bootstrap, nodeId, null); } /** * Method which creates a Pastry node from the next port with a randomly * generated NodeId. * * @param bootstrap Node handle to bootstrap from. * @param proxy DESCRIBE THE PARAMETER * @return A node with a random ID and next port number. */ public PastryNode newNode(NodeHandle bootstrap, InetSocketAddress proxy) { return newNode(bootstrap, nidFactory.generateNodeId(), proxy); } /** * Method which creates a Pastry node from the next port with a randomly * generated NodeId. * * @param bootstrap Node handle to bootstrap from. * @param nodeId DESCRIBE THE PARAMETER * @param pAddress DESCRIBE THE PARAMETER * @return A node with a random ID and next port number. */ public PastryNode newNode(NodeHandle bootstrap, Id nodeId, InetSocketAddress pAddress) { try { return newNode(bootstrap, nodeId, pAddress, true); // fix the method just below if you change this } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.log("Warning: " + e); } if (environment.getParameters().getBoolean("pastry_socket_increment_port_after_construction")) { port++; try { return newNode(bootstrap, nodeId, pAddress); // recursion, this will prevent from things getting too out of hand in // case the node can't bind to anything, expect a StackOverflowException } catch (StackOverflowError soe) { if (logger.level <= Logger.SEVERE) { logger.log("SEVERE: SocketPastryNodeFactory: Could not bind on any ports!" + soe); } throw soe; } } else { throw new RuntimeException(e); } } } /** * DESCRIBE THE METHOD * * @param bootstrap DESCRIBE THE PARAMETER * @param nodeId DESCRIBE THE PARAMETER * @param pAddress DESCRIBE THE PARAMETER * @param throwException DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public PastryNode newNode(NodeHandle bootstrap, Id nodeId, InetSocketAddress pAddress, boolean throwException) throws IOException { if (!throwException) { return newNode(bootstrap, nodeId, pAddress); } // yes, this is sort of bizarre // the idea is that we can't throw an exception by default because it will break reverse compatibility // so this method gets called twice if throwException is false. But the second time, // it will be called with true, but will be // wrapped with the above function which will catch the exception. // -Jeff May 12, 2006 if (bootstrap == null) { if (logger.level <= Logger.WARNING) { logger.log("No bootstrap node provided, starting a new ring binding to address " + localAddress + ":" + port + "..."); } } // this code builds a different environment for each PastryNode Environment environment = this.environment; if (this.environment.getParameters().getBoolean( "pastry_factory_multipleNodes")) { if (this.environment.getLogManager() instanceof CloneableLogManager) { LogManager lman = ((CloneableLogManager) this.environment .getLogManager()).clone("0x" + nodeId.toStringBare()); SelectorManager sman = this.environment.getSelectorManager(); Processor proc = this.environment.getProcessor(); if (this.environment.getParameters().getBoolean( "pastry_factory_selectorPerNode")) { sman = new SelectorManager(nodeId.toString() + " Selector", this.environment.getTimeSource(), lman); } if (this.environment.getParameters().getBoolean( "pastry_factory_processorPerNode")) { proc = new SimpleProcessor(nodeId.toString() + " Processor"); } environment = new Environment(sman, proc, this.environment .getRandomSource(), this.environment.getTimeSource(), lman, this.environment.getParameters()); this.environment.addDestructable(environment); } } try { final SocketPastryNode pn = new SocketPastryNode(nodeId, environment); SocketSourceRouteManager srManager = null; EpochInetSocketAddress localAddress = null; EpochInetSocketAddress proxyAddress = null; // NOTE: We _don't_ want to use the environment RandomSource because this // will cause // problems if we run the same node twice quickly with the same seed. Epochs // should really // be different every time. long epoch = random.nextLong(); synchronized (this) { localAddress = getEpochAddress(port, epoch); if (pAddress == null) { proxyAddress = localAddress; } else { proxyAddress = new EpochInetSocketAddress(pAddress, epoch); } srManager = new SocketSourceRouteManager(pn, localAddress, proxyAddress, random); if (environment.getParameters().getBoolean("pastry_socket_increment_port_after_construction")) { port++; } } pn.setSocketSourceRouteManager(srManager); SocketNodeHandle localhandle = new SocketNodeHandle(proxyAddress, nodeId); localhandle = (SocketNodeHandle) pn.coalesce(localhandle); MessageDispatch msgDisp = new MessageDispatch(pn); RoutingTable routeTable = new RoutingTable(localhandle, rtMax, rtBase, environment); LeafSet leafSet = new LeafSet(localhandle, lSetSize); StandardRouter router = new StandardRouter(pn); StandardRouteSetProtocol rsProtocol = new StandardRouteSetProtocol( pn, routeTable, environment); pn.setElements(localhandle, msgDisp, leafSet, routeTable); router.register(); rsProtocol.register(); pn.setSocketElements(proxyAddress, leafSetMaintFreq, routeSetMaintFreq); PeriodicLeafSetProtocol lsProtocol = new PeriodicLeafSetProtocol(pn, localhandle, leafSet, routeTable); lsProtocol.register(); // msgDisp.registerReceiver(lsProtocol.getAddress(), lsProtocol); ConsistentJoinProtocol jProtocol = new ConsistentJoinProtocol(pn, localhandle, routeTable, leafSet); jProtocol.register(); if (bootstrap != null) { bootstrap = (SocketNodeHandle) pn.coalesce(bootstrap); } try { Thread.sleep(1000); } catch (InterruptedException e) { } pn.doneNode(getNearest(localhandle, bootstrap)); // pn.doneNode(bootstrap); return pn; } catch (IOException ioe) { // this will useually be a bind exception // clean up Environment if (this.environment.getParameters().getBoolean( "pastry_factory_multipleNodes")) { environment.destroy(); } throw ioe; } } /** * Method which can be used to test the connectivity contstrains of the local * node. This (optional) method is designed to be called by applications to * ensure that the local node is able to connect through the network - checks * can be done to check TCP/UDP connectivity, firewall setup, etc... If the * method works, then nothing should be done and the method should return. If * an error condition is detected, an exception should be thrown. * * @param timeout DESCRIBE THE PARAMETER * @param local DESCRIBE THE PARAMETER * @param existing DESCRIBE THE PARAMETER * @param env DESCRIBE THE PARAMETER * @param logger DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public static InetSocketAddress verifyConnection(int timeout, InetSocketAddress local, InetSocketAddress[] existing, Environment env, Logger logger) throws IOException { if (logger.level <= Logger.INFO) { logger.log("Verifying connection of local node " + local + " using " + existing[0] + " and " + existing.length + " more"); } DatagramSocket socket = null; try { socket = new DatagramSocket(local); socket.setSoTimeout(timeout); for (int i = 0; i < existing.length; i++) {// byte[] buf = PingManager// .addHeader(SourceRoute// .build(new EpochInetSocketAddress(existing[i])),// new IPAddressRequestMessage(env.getTimeSource()// .currentTimeMillis()), new EpochInetSocketAddress(local),// env, logger); SocketBuffer sb = new SocketBuffer(new EpochInetSocketAddress(local), SourceRoute.build(new EpochInetSocketAddress(existing[i])), new IPAddressRequestMessage(env.getTimeSource() .currentTimeMillis())); DatagramPacket send = new DatagramPacket(sb.getBuffer().array(), sb.getBuffer().limit(), existing[i]); socket.send(send); } DatagramPacket receive = new DatagramPacket(new byte[10000], 10000); socket.receive(receive); byte[] data = new byte[receive.getLength() - 38]; System.arraycopy(receive.getData(), 38, data, 0, data.length); return ((IPAddressResponseMessage) new SocketBuffer(data).deserialize(new PingManager.PMDeserializer(logger))).getAddress();// return ((IPAddressResponseMessage) PingManager.deserialize(data, env,// null, logger)).getAddress(); } finally { if (socket != null) { socket.close(); } } } /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ class TimerContinuation implements Continuation { /** * DESCRIBE THE FIELD */ public Object ret = null; /** * DESCRIBE THE METHOD * * @param result DESCRIBE THE PARAMETER */ public void receiveResult(Object result) { ret = result; synchronized (this) { this.notify(); } } /** * DESCRIBE THE METHOD * * @param result DESCRIBE THE PARAMETER */ public void receiveException(Exception result) { synchronized (this) { this.notify(); } } } /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ class SPNFDeserializer implements MessageDeserializer { /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @param priority DESCRIBE THE PARAMETER * @param sender DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, rice.p2p.commonapi.NodeHandle sender) throws IOException { switch (type) { case NodeIdResponseMessage.TYPE: return new NodeIdResponseMessage(buf); case LeafSetResponseMessage.TYPE: return new LeafSetResponseMessage(buf, nhf); case RoutesResponseMessage.TYPE: return new RoutesResponseMessage(buf); case RouteRowResponseMessage.TYPE: return new RouteRowResponseMessage(buf, nhf); default: if (logger.level <= Logger.SEVERE) { logger.log("SERIOUS ERROR: Received unknown message address: " + 0 + "type:" + type); } return null; } } } static { TOTAL_HEADER = new byte[SocketCollectionManager.TOTAL_HEADER_SIZE + 4]; // plus the appId System.arraycopy(SocketCollectionManager.PASTRY_MAGIC_NUMBER, 0, TOTAL_HEADER, 0, SocketCollectionManager.PASTRY_MAGIC_NUMBER.length); //System.arraycopy(new byte[4],0,TOTAL_HEADER,4,4); // version 0 // can just leave blank zero, cause java zeros everything automatically System.arraycopy(SocketCollectionManager.HEADER_DIRECT, 0, TOTAL_HEADER, 8, SocketCollectionManager.HEADER_SIZE); //System.arraycopy(new byte[4],0,TOTAL_HEADER,12,4); // appId 0 // can just leave blank zero, cause java zeros everything automatically }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -