📄 pingmanager.java
字号:
// manager.markAlive(from);// manager.markProximity(from, ping);// notifyPingResponseListeners(from, ping, start);// } /** * Adds a feature to the PingResponseListener attribute of the PingManager * object * * @param prl The feature to be added to the PingResponseListener attribute * @param path DESCRIBE THE PARAMETER */ protected void removePingResponseListener(SourceRoute path, PingResponseListener prl) { if (prl == null) { return; } ArrayList list = (ArrayList) pingListeners.get(path); if (list != null) { // remove all while (list.remove(prl)) { ; } } } /** * Adds a feature to the PingResponseListener attribute of the PingManager * object * * @param prl The feature to be added to the PingResponseListener attribute * @param path The feature to be added to the PingResponseListener attribute */ protected void addPingResponseListener(SourceRoute path, PingResponseListener prl) { if (prl == null) { return; } ArrayList list = (ArrayList) pingListeners.get(path); if (list == null) { list = new ArrayList(); pingListeners.put(path, list); } list.add(prl); } /** * caller must synchronized(pingResponseTimes) * * @param proximity * @param lastTimePinged * @param path DESCRIBE THE PARAMETER */ protected void notifyPingResponseListeners(SourceRoute path, int proximity, long lastTimePinged) { ArrayList list = (ArrayList) pingListeners.remove(path); if (list != null) { Iterator i = list.iterator(); while (i.hasNext()) { ((PingResponseListener) i.next()).pingResponse(path, proximity, lastTimePinged); } } } /** * DESCRIBE THE METHOD * * @param path DESCRIBE THE PARAMETER * @param msg DESCRIBE THE PARAMETER */ public void enqueue(SourceRoute path, PRawMessage msg) { if (logger.level <= Logger.FINER) { logger.log("enqueue(" + path + "," + msg + ")"); } try { enqueue(path, new SocketBuffer(localAddress, path, msg)); } catch (IOException e) { if (logger.level <= Logger.SEVERE) { logger.log( "ERROR: Received exceptoin " + e + " while enqueuing ping " + msg); } } } /** * DESCRIBE THE METHOD * * @param msg DESCRIBE THE PARAMETER * @param path DESCRIBE THE PARAMETER */ public void enqueue(SourceRoute path, SocketBuffer msg) {// SocketBuffer data = addHeader(path, msg, localAddress, environment,logger); synchronized (pendingMsgs) { pendingMsgs.add(new Envelope(path.getFirstHop(), msg)); } if (spn != null) { ((SocketPastryNode) spn).broadcastSentListeners(msg, path.getLastHop().address, msg.getBuffer().limit(), NetworkListener.TYPE_UDP); }// if (logger.level <= Logger.FINER) {// switch (msg.getType()) {// // if (! (msg instanceof byte[])) {// case SHORT_PING_TYPE:// logger.log("COUNT: Sent message rice.pastry.socket.messaging.ShortPingMessage of size " + msg.getBuffer().limit() + " to " + path);// break;// case SHORT_PING_RESPONSE_TYPE:// logger.log("COUNT: Sent message rice.pastry.socket.messaging.ShortPingResponseMessage of size " + msg.getBuffer().limit() + " to " + path);// break;// default: if (logger.level <= Logger.FINER) { logger.log( "COUNT: Sent message " + msg.getType() + " of size " + msg.getBuffer().limit() + " to " + path); }// }// } environment.getSelectorManager().modifyKey(key); } /** * DESCRIBE THE METHOD * * @param sr DESCRIBE THE PARAMETER * @param dm DESCRIBE THE PARAMETER * @param size DESCRIBE THE PARAMETER * @param from DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ public void receiveMessage(SourceRoute sr, DatagramMessage dm, int size, InetSocketAddress from) throws IOException {// if (message instanceof DatagramMessage) {// DatagramMessage dm = (DatagramMessage) message; long start = dm.getStartTime(); SourceRoute inboundPath = sr.removeLastHop(); //dm.getInboundPath(); SourceRoute outboundPath = inboundPath.reverse(); //dm.getOutboundPath(); if (inboundPath == null) { inboundPath = SourceRoute.build(new EpochInetSocketAddress(from)); } if (spn != null) { ((SocketPastryNode) spn).broadcastReceivedListeners(dm, inboundPath.reverse().getLastHop().address, size, NetworkListener.TYPE_UDP); } if (dm instanceof PingMessage) { if (logger.level <= Logger.FINER) { logger.log( "COUNT: Read message(1) " + dm.getClass() + " of size " + size + " from " + inboundPath.reverse()); } enqueue(inboundPath.reverse(), new PingResponseMessage( /* * outboundPath, inboundPath, */ start)); } else if (dm instanceof PingResponseMessage) { if (logger.level <= Logger.FINER) { logger.log( "COUNT: Read message(2) " + dm.getClass() + " of size " + size + " from " + outboundPath.reverse()); } int ping = (int) (environment.getTimeSource().currentTimeMillis() - start); manager.markAlive(outboundPath); manager.markProximity(outboundPath, ping); notifyPingResponseListeners(outboundPath, ping, start); } else if (dm instanceof WrongEpochMessage) { WrongEpochMessage wem = (WrongEpochMessage) dm; if (logger.level <= Logger.FINER) { logger.log( "COUNT: Read message(3) " + dm.getClass() + " of size " + size + " from " + outboundPath.reverse()); } manager.markAlive(outboundPath); manager.markDead(wem.getIncorrect()); } else if (dm instanceof IPAddressRequestMessage) { if (logger.level <= Logger.FINER) { logger.log( "COUNT: Read message(4) " + dm.getClass() + " of size " + size + " from " + SourceRoute.build(new EpochInetSocketAddress(from))); } enqueue(SourceRoute.build(new EpochInetSocketAddress(from)), new IPAddressResponseMessage(from, environment.getTimeSource().currentTimeMillis())); } else { if (logger.level <= Logger.WARNING) { logger.log( "ERROR: Received unknown DatagramMessage " + dm); } }// } } /** * DESCRIBE THE METHOD * * @param key DESCRIBE THE PARAMETER */ public void read(SelectionKey key) { try { InetSocketAddress address = null; while ((address = (InetSocketAddress) channel.receive(buffer)) != null) { buffer.flip(); if (testSourceRouting) {// if (address.getPort() % 2 == localAddress.getAddress().getPort() % 2) { if ((address.getPort() % 2 == 0) && (localAddress.getAddress().getPort() % 2 == 0)) { buffer.clear(); if (logger.level <= Logger.INFO) { logger.log("Dropping packet"); } return; } } if (buffer.remaining() > 0) { readHeader(address); } else { if (logger.level <= Logger.INFO) { logger.log( "(PM) Read from datagram channel, but no bytes were there - no bad, but wierd."); } break; } } } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.logException( "ERROR (datagrammanager:read): ", e); } } finally { buffer.clear(); } } /** * DESCRIBE THE METHOD * * @param key DESCRIBE THE PARAMETER */ public void write(SelectionKey key) { Envelope write = null; try { synchronized (pendingMsgs) { Iterator i = pendingMsgs.iterator(); while (i.hasNext()) { write = (Envelope) i.next(); if (logger.level <= Logger.FINER) { byte[] metadata = new byte[2]; metadata[0] = write.data.getBuffer().get(HEADER_SIZE + 4); metadata[1] = write.data.getBuffer().get(HEADER_SIZE + 5); byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]]; System.arraycopy(write.data.getBuffer().array(), HEADER_SIZE + 6, route, 0, route.length); logger.log("write(" + write.destination + ") (" + metadata[0] + " " + metadata[1] + ") local " + localAddress); for (int ii = 0; ii < metadata[1]; ii++) { logger.log(" " + SocketChannelRepeater.decodeHeader(route, ii)); } } if (write.data.getBuffer().get(HEADER_SIZE) != 0) { throw new IOException("Attempting to send Invalid version"); } try { if (channel.send(write.data.getBuffer(), write.destination.getAddress()) == write.data.getBuffer().limit()) { i.remove(); } else { break; } } catch (IOException e) { i.remove(); throw e; } } } } catch (IOException e) { if (logger.level <= Logger.WARNING) { // This code prevents this line from filling up logs during some kinds of network outages // it makes this error only be printed 1ce/second long now = timeSource.currentTimeMillis(); if (lastTimePrinted + 1000 > now) { return; } lastTimePrinted = now; logger.logException( "ERROR (datagrammanager:write) to " + (write == null ? null : write.destination.getAddress()), e); } } finally { if (pendingMsgs.isEmpty()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); } } } /** * DESCRIBE THE METHOD * * @param key DESCRIBE THE PARAMETER */ public void modifyKey(SelectionKey key) { synchronized (pendingMsgs) { if (!pendingMsgs.isEmpty()) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } } } /** * Method which serializes a given object into a ByteBuffer, in order to * prepare it for writing. * * @param header DESCRIBE THE PARAMETER * @return A ByteBuffer containing the object * @exception IOException if the object can't be serialized */// public static byte[] serialize(Object message, Environment environment, Logger logger) throws IOException {// try {//// ByteArrayOutputStream baos = new ByteArrayOutputStream();// ObjectOutputStream oos = new ObjectOutputStream(baos);// oos.writeObject(message);// oos.close();//// byte[] ret = baos.toByteArray();// return ret;// } catch (InvalidClassException e) {// if (logger.level <= Logger.SEVERE) logger.logException(// "PANIC: Object to be serialized was an invalid class!",e);// throw new IOException("Invalid class during attempt to serialize.");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -