📄 socketmanager.java
字号:
/** * DESCRIBE THE METHOD * * @param msg DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ public void send(Message msg) throws IOException { PRawMessage rm; if (msg instanceof PRawMessage) { rm = (PRawMessage) msg; } else { rm = new PJavaSerializedMessage(msg); } // todo, pool final SocketBuffer buffer = new SocketBuffer(manager.defaultDeserializer, manager.pastryNode); buffer.serialize(rm, true); send(buffer); } /** * The entry point for outgoing messages - messages from here are * ensocketQueued for transport to the remote node * * @param message DESCRIBE THE PARAMETER */ public void send(final SocketBuffer message) { writer.enqueue(message); if (key != null) { manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); } } /** * Method which should change the interestOps of the handler's key. This * method should *ONLY* be called by the selection thread in the context of a * select(). * * @param key The key in question */ public synchronized void modifyKey(SelectionKey key) { if (channel.socket().isOutputShutdown()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); clearTimer(); } else if ((!writer.isEmpty()) && ((key.interestOps() & SelectionKey.OP_WRITE) == 0)) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); setTimer(); } } /** * Specified by the SelectionKeyHandler interface - calling this tells this * socket manager that the connection has completed and we can now read/write. * * @param key The key which is connectable. */ public void connect(SelectionKey key) { try { // deregister interest in connecting to this socket if (channel.finishConnect()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); } manager.manager.markAlive(path); if (manager.logger.level <= Logger.FINE) { manager.logger.log("(SM) Found connectable channel - completed connection"); } } catch (Exception e) { if (manager.logger.level <= Logger.FINE) { manager.logger.logException( "(SM) Unable to connect to path " + path + " (" + e + ") marking as dead.", e); } manager.manager.markDead(path); close(); } } /** * Reads from the socket attached to this connector. * * @param key The selection key for this manager */ public void read(SelectionKey key) { try { SocketBuffer o = reader.read(channel); if (o != null) { if (manager.logger.level <= Logger.FINE) { manager.logger.log("(SM) Read message " + o + " from socket."); } receive(o); } } catch (IOException e) { if (manager.logger.level <= Logger.FINE) { manager.logger.log("(SM) WARNING " + e + " reading - cancelling."); } // if it's not a bootstrap path, and we didn't close this socket's output, // then check to see if the remote address is dead or just closing a socket if ((path != null) && (!((SocketChannel) key.channel()).socket().isOutputShutdown())) { manager.checkLiveness(path); } close(); } } /** * Writes to the socket attached to this socket manager. * * @param key The selection key for this manager */ public synchronized void write(SelectionKey key) { try { clearTimer(); if (writer.write(channel)) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); if (bootstrap) { close(); } } else { setTimer(); } } catch (IOException e) { if (manager.logger.level <= Logger.WARNING) { manager.logger.log("(SM) ERROR " + e + " writing - cancelling."); } close(); } } /** * Accepts a new connection on the given key * * @param key DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected void acceptConnection(SelectionKey key) throws IOException { this.channel = (SocketChannel) key.channel(); this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, 0); this.key.interestOps(SelectionKey.OP_READ); if (manager.logger.level <= Logger.FINE) { manager.logger.log( "(SM) Accepted connection from " + channel.socket().getRemoteSocketAddress()); } } /** * Creates the outgoing socket to the remote handle * * @param path DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected void createConnection(final SourceRoute path) throws IOException { this.path = path; this.channel = SocketChannel.open(); this.channel.socket().setSendBufferSize(manager.SOCKET_BUFFER_SIZE); this.channel.socket().setReceiveBufferSize(manager.SOCKET_BUFFER_SIZE); this.channel.configureBlocking(false); this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(channel, this, 0); if (manager.logger.level <= Logger.FINE) { manager.logger.log("(SM) Initiating socket connection to path " + path); } manager.pastryNode.broadcastChannelOpened(path.getFirstHop().getAddress(), NetworkListener.REASON_NORMAL); if (this.channel.connect(path.getFirstHop().getAddress())) { this.key.interestOps(SelectionKey.OP_READ); } else { this.key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_CONNECT); } } /** * Method which is called once a message is received off of the wire If it's * for us, it's handled here, otherwise, it's passed to the pastry node. * * @param delivery DESCRIBE THE PARAMETER */ /** * Method which is called once a message is received off of the wire If it's * for us, it's handled here, otherwise, it's passed to the pastry node. * * @param delivery DESCRIBE THE PARAMETER */ protected void receive(SocketBuffer delivery) { if (delivery.getAddress() == 0) { // short circuit, these are the internal messages that Socket handles try { delivery.deserialize(deserializer); } catch (IOException ioe) { if (manager.logger.level <= Logger.SEVERE) { manager.logger.logException("Internal error while deserializing.", ioe); } } } else { long start = manager.pastryNode.getEnvironment().getTimeSource().currentTimeMillis(); manager.pastryNode.receiveMessage(delivery); if (manager.logger.level <= Logger.FINER) { manager.logger.log("ST: " + (manager.pastryNode.getEnvironment().getTimeSource().currentTimeMillis() - start) + " deliver of " + delivery); } return; } } /** * Internal method which clears the internal timer */ protected void clearTimer() { if (this.timer != null) { this.timer.cancel(); } this.timer = null; } // short circuit the deserialization step /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ class SMDeserializer implements MessageDeserializer { /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @param priority DESCRIBE THE PARAMETER * @param sender DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, rice.p2p.commonapi.NodeHandle sender) throws IOException { byte version; switch (type) { case SourceRoute.TYPE: SourceRoute tempPath = SourceRoute.build(buf); if (path == null) { path = tempPath; manager.socketOpened(path, SocketManager.this); manager.manager.markAlive(path); writer.setPath(path); reader.setPath(path.reverse()); if (manager.logger.level <= Logger.FINE) { manager.logger.log("Read open connection with path " + path); } } else { if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("SERIOUS ERROR: Received duplicate path assignments: " + path + " and " + tempPath); } } return null; case NodeIdRequestMessage.TYPE: version = buf.readByte(); switch (version) { case 0: send(new NodeIdResponseMessage(manager.pastryNode.getNodeId(), manager.localAddress.getEpoch())); break; default: throw new IOException("Unknown Version: " + version); } return null; case LeafSetRequestMessage.TYPE: version = buf.readByte(); switch (version) { case 0: send(new LeafSetResponseMessage(manager.pastryNode.getLeafSet())); break; default: throw new IOException("Unknown Version: " + version); } return null; case RoutesRequestMessage.TYPE: version = buf.readByte(); switch (version) { case 0: send(new RoutesResponseMessage((SourceRoute[]) manager.manager.getBest().values().toArray(new SourceRoute[0]))); break; default: throw new IOException("Unknown Version: " + version); } return null; case RouteRowRequestMessage.TYPE: version = buf.readByte(); switch (version) { case 0:// RouteRowRequestMessage rrMessage = new RouteRowRequestMessage(buf.readInt());// send(new RouteRowResponseMessage(manager.pastryNode.getRoutingTable().getRow(rrMessage.getRow()))); send(new RouteRowResponseMessage(manager.pastryNode.getRoutingTable().getRow(buf.readInt()))); break; default: throw new IOException("Unknown Version: " + version); } return null; default: if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("SERIOUS ERROR: Received unknown message address: " + 0 + "type:" + type); } } return null; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -