📄 pingmanager.java
字号:
// } catch (NotSerializableException e) {// if (logger.level <= Logger.SEVERE) logger.logException(// "PANIC: Object to be serialized was not serializable! [" + message + "]",e);// throw new IOException("Unserializable class " + message + " during attempt to serialize.");// }// } /** * Method which takes in a ByteBuffer read from a datagram, and deserializes * the contained object. * * @param header DESCRIBE THE PARAMETER * @return The deserialized object. * @exception IOException if the buffer can't be deserialized */// public static Object deserialize(byte[] array, Environment env, SocketPastryNode spn, Logger logger) throws IOException {// PastryObjectInputStream ois = new PastryObjectInputStream(new ByteArrayInputStream(array), spn);//// try {// Object ret = ois.readObject();// return ret;// } catch (ClassNotFoundException e) {// if (logger.level <= Logger.SEVERE) logger.logException(// "PANIC: Unknown class type in serialized message!",e);// throw new IOException("Unknown class type in message - closing channel.");// } catch (InvalidClassException e) {// if (logger.level <= Logger.SEVERE) logger.logException(// "PANIC: Serialized message was an invalid class!",e);// throw new IOException("Invalid class in message - closing channel.");// }// } /** * Method which adds a header for the provided path to the given data. * * @param header DESCRIBE THE PARAMETER * @return The messag with a header attached * @exception IOException DESCRIBE THE EXCEPTION */// public static SocketBuffer addHeader(SourceRoute path, SocketBuffer data, EpochInetSocketAddress localAddress, Environment env, Logger logger) throws IOException {//// ByteArrayOutputStream baos = new ByteArrayOutputStream();//// DataOutputStream dos = new DataOutputStream(baos);//// SocketBuffer sb = new SocketBuffer();// OutputBuffer dos = sb.o;//// dos.write(HEADER_PING, 0, HEADER_PING.length);// dos.writeByte((byte) 1);// dos.writeByte((byte) (path.getNumHops() + 1));// SocketChannelRepeater.encodeHeader(localAddress, dos);//// for (int i=0; i<path.getNumHops(); i++)// SocketChannelRepeater.encodeHeader(path.getHop(i), dos);//// dos.write(data.buffer.array(), 0, data.buffer.limit());////// dos.flush();////// return new SocketBuffer(baos.toByteArray());// return sb;// } /** * Method which adds a header for the provided path to the given data. * * @param header DESCRIBE THE PARAMETER * @return The messag with a header attached * @exception IOException DESCRIBE THE EXCEPTION */ public SourceRoute decodeHeader(byte[] header) throws IOException { EpochInetSocketAddress[] route = new EpochInetSocketAddress[header.length / SocketChannelRepeater.HEADER_BUFFER_SIZE]; for (int i = 0; i < route.length; i++) { route[i] = SocketChannelRepeater.decodeHeader(header, i); } return SourceRoute.build(route); } /** * Method which processes an incoming message and hands it off to the * appropriate handler. * * @param address DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected void readHeader(InetSocketAddress address) throws IOException { byte[] header = new byte[HEADER_SIZE]; buffer.get(header, 0, HEADER_SIZE); if (!Arrays.equals(header, SocketCollectionManager.PASTRY_MAGIC_NUMBER)) { throw new IOException("Not a pastry message:" + header[0] + "," + header[1] + "," + header[2] + "," + header[3]); } buffer.get(header, 0, HEADER_SIZE); int version = MathUtils.byteArrayToInt(header); if (!(version == 0)) { throw new IOException("Unknown Version:" + version); }// header = new byte[HEADER_SIZE];// buffer.get(header);//// if (Arrays.equals(header, HEADER_PING)) { byte[] metadata = new byte[2]; buffer.get(metadata); // first, read all of the source route byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]]; buffer.get(route); // now, check to make sure our hop is correct EpochInetSocketAddress eisa; if (logger.level <= Logger.FINER) { logger.log("readHeader(" + address + ") (" + metadata[0] + " " + metadata[1] + ") local " + localAddress); for (int i = 0; i < metadata[1]; i++) { logger.log(" " + SocketChannelRepeater.decodeHeader(route, i)); } } try { eisa = SocketChannelRepeater.decodeHeader(route, metadata[0]); } catch (IOException ioe) { throw ioe; } // if so, process the packet if ((eisa.equals(localAddress)) || (eisa.getAddress().equals(localAddress.getAddress()) && (eisa.getEpoch() == EpochInetSocketAddress.EPOCH_UNKNOWN))) { // if the packet is at the end of the route, accept it // otherwise, forward it to the next hop (and increment the stamp) if (metadata[0] + 1 == metadata[1]) { // The message was meant for me byte[] array = new byte[buffer.remaining()]; buffer.get(array); buffer.clear();// byte[] test = new byte[HEADER_SHORT_PING.length];// System.arraycopy(array, 0, test, 0, test.length); SourceRoute inbound = decodeHeader(route);// SourceRoute sr = inbound.removeLastHop();// if (Arrays.equals(test, HEADER_SHORT_PING)) {// // the PING was meant for me// int len = (header.length + metadata.length + array.length + route.length);// if (logger.level <= Logger.FINER) logger.log(// "COUNT: Read message rice.pastry.socket.messaging.ShortPingMessage of size " + len + " from " + sr);// if (spn != null) {// ((SocketPastryNode) spn).broadcastReceivedListeners(array, address, len, NetworkListener.TYPE_UDP);// }//// shortPingReceived(sr, array);// } else if (Arrays.equals(test, HEADER_SHORT_PING_RESPONSE)) {// // the PING_RESPONSE was meant for me// int len = (header.length + metadata.length + array.length + route.length);// if (logger.level <= Logger.FINER) logger.log(// "COUNT: Read message rice.pastry.socket.messaging.ShortPingResponseMessage of size " + len + " from " + sr);//// if (spn != null) {// ((SocketPastryNode) spn).broadcastReceivedListeners(array, address, len, NetworkListener.TYPE_UDP);// }// shortPingResponseReceived(sr, array);// } else { // a normal message SocketBuffer delivery = new SocketBuffer(array, spn); receiveMessage(inbound, (DatagramMessage) delivery.deserialize(deserializer), array.length, address);// } } else { // sourceroute hop EpochInetSocketAddress next = SocketChannelRepeater.decodeHeader(route, metadata[0] + 1); buffer.position(0); byte[] packet = new byte[buffer.remaining()]; buffer.get(packet); // increment the hop count packet[HEADER_SIZE + 4]++; if (logger.level <= Logger.FINE) { logger.log("Forwarding (" + metadata[0] + " " + metadata[1] + ") from " + address + " to " + next + " at " + localAddress); } if (spn != null) { ((SocketPastryNode) spn).broadcastReceivedListeners(packet, address, packet.length, NetworkListener.TYPE_SR_UDP); ((SocketPastryNode) spn).broadcastSentListeners(packet, next.address, packet.length, NetworkListener.TYPE_SR_UDP); } synchronized (pendingMsgs) { pendingMsgs.add(new Envelope(next, new SocketBuffer(packet))); } environment.getSelectorManager().modifyKey(key); } } else { // if this is an old epoch of ours, reply with an update if (eisa.getAddress().equals(localAddress.getAddress())) { SourceRoute back = SourceRoute.build(new EpochInetSocketAddress[0]); SourceRoute outbound = SourceRoute.build(new EpochInetSocketAddress[0]); for (int i = 0; i < metadata[0]; i++) { back = back.append(SocketChannelRepeater.decodeHeader(route, i)); if (i > 0) { outbound = outbound.append(SocketChannelRepeater.decodeHeader(route, i)); } } outbound = outbound.append(localAddress);// if (spn != null) {// ((SocketPastryNode) spn).broadcastReceivedListeners(packet, address, packet.length, NetworkListener.TYPE_SR_UDP);// } enqueue(back.reverse(), new WrongEpochMessage( /* * outbound, back.reverse(), */ eisa, localAddress, environment.getTimeSource().currentTimeMillis())); } else { if (logger.level <= Logger.WARNING) { logger.log( "WARNING: Received packet destined for EISA (" + metadata[0] + " " + metadata[1] + ") " + eisa + " but the local address is " + localAddress + " - dropping silently."); } throw new IOException("Received packet destined for EISA (" + metadata[0] + " " + metadata[1] + ") " + eisa + " but the local address is " + localAddress + " - dropping silently."); } }// } else {// if (logger.level <= Logger.WARNING) logger.log(// "WARNING: Received unrecognized message header - ignoring from "+address+".");// throw new IOException("Improper message header received - ignoring from "+address+". Read " + ((byte) header[0]) + " " + ((byte) header[1]) + " " + ((byte) header[2]) + " " + ((byte) header[3]));// } } /** * Internal class which holds a pending datagram * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author amislove */ public class Envelope { /** * DESCRIBE THE FIELD */ protected EpochInetSocketAddress destination; /** * DESCRIBE THE FIELD */ protected SocketBuffer data; /** * Constructor for Envelope. * * @param destination DESCRIBE THE PARAMETER * @param data DESCRIBE THE PARAMETER */ public Envelope(EpochInetSocketAddress destination, SocketBuffer data) { this.destination = destination; this.data = data; if (logger.level <= Logger.FINER) { try { byte[] metadata = new byte[2]; metadata[0] = data.getBuffer().get(HEADER_SIZE + 4); metadata[1] = data.getBuffer().get(HEADER_SIZE + 5); byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]]; System.arraycopy(data.getBuffer().array(), HEADER_SIZE + 6, route, 0, route.length); logger.log("enqueue(" + destination + ") (" + metadata[0] + " " + metadata[1] + ") local " + localAddress); for (int ii = 0; ii < metadata[1]; ii++) { logger.log(" " + SocketChannelRepeater.decodeHeader(route, ii)); }// if (metadata[1] == 3 && metadata[0] == 1) {// EpochInetSocketAddress dest = SocketChannelRepeater.decodeHeader(route, metadata[1]-1);// if (dest.equals(destination)) {// System.out.println("Warning");// }// } } catch (IOException ioe) { logger.logException("", ioe); } } if (data.getBuffer().get(HEADER_SIZE) != 0) { throw new RuntimeException("Attempting to send Invalid version"); } } } /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ static class PMDeserializer implements MessageDeserializer { Logger logger; /** * Constructor for PMDeserializer. * * @param logger DESCRIBE THE PARAMETER */ public PMDeserializer(Logger logger) { this.logger = logger; } /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @param priority DESCRIBE THE PARAMETER * @param sender DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, rice.p2p.commonapi.NodeHandle sender) throws IOException { switch (type) { case IPAddressRequestMessage.TYPE: return new IPAddressRequestMessage(buf); case IPAddressResponseMessage.TYPE: return new IPAddressResponseMessage(buf); case PingMessage.TYPE: return new PingMessage(buf); case PingResponseMessage.TYPE: return new PingResponseMessage(buf); case WrongEpochMessage.TYPE: return new WrongEpochMessage(buf); default: if (logger.level <= Logger.SEVERE) { logger.logException("PM SERIOUS ERROR: Received unknown message address: " + 0 + " type:" + type, new Exception("stack trace")); } } return null; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -