📄 tcpconnection.java
字号:
* Description of the Method */ private void closeIOs() { if (inputStream != null) { try { inputStream.close(); inputStream = null; } catch (Exception ez1) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("could not close inputStream ", ez1); } } } if (outputStream != null) { try { outputStream.close(); outputStream = null; } catch (Exception ez1) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Error : could not close outputStream ", ez1); } } } if (sharedSocket != null) { try { sharedSocket.close(); sharedSocket = null; } catch (Exception ez1) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Error : could not close socket ", ez1); } } } } /** * {@inheritDoc} * *@param target Description of the Parameter *@return Description of the Return Value */ public boolean equals(Object target) { if (this == target) { return true; } if (null == target) { return false; } if (target instanceof TcpConnection) { TcpConnection likeMe = (TcpConnection) target; return getDestinationAddress().equals(likeMe.getDestinationAddress()) && getDestinationPeerID().equals(likeMe.getDestinationPeerID()); } return false; } /** * {@inheritDoc} */ protected void finalize() { close(); } /** * {@inheritDoc} * *@return Description of the Return Value */ public int hashCode() { return getDestinationPeerID().hashCode() + getDestinationAddress().hashCode(); } /** * This is called with "true" when the invoker is about to read some input * and is not willing to wait for it to come. This is called with "false" * when the invoker is about to wait for a long time for input to become * available with a potentialy very long blocking read. * *@param active Description of the Parameter */ private void inputActive(boolean active) { if (active) { inputStream.setWatchList(ShortCycle); } else { inputStream.setWatchList(LongCycle); } } /** * {@inheritDoc} * * <p/>This is the background Thread. While the connection is active, takes * messages from the queue and send it. */ public void run() { try { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("tcp receive - starts for " + inetAddress.getHostAddress() + ":" + port); } try { while (isConnected()) { if (closed) { break; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("tcp receive - message starts for " + inetAddress.getHostAddress() + ":" + port); } // We can stay blocked here for a long time, it's ok. MessagePackageHeader header = new MessagePackageHeader(inputStream); MimeMediaType msgMime = header.getContentTypeHeader(); long msglength = header.getContentLengthHeader(); // FIXME 20020730 bondolo@jxta.org Do something with content-coding here. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Message body (" + msglength + ") starts for " + inetAddress.getHostAddress() + ":" + port); } // read the message! // We have received the header, so, the rest had better // come. Turn the short timeout on. inputActive(true); Message msg = null; try { InputStream msgStream = new LimitInputStream(inputStream, msglength, true); msg = WireFormatMessageFactory.fromWire(msgStream, msgMime, (MimeMediaType) null); } catch (IOException failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed reading msg from " + inetAddress.getHostAddress() + ":" + port); } throw failed; } finally { // We can relax again. inputActive(false); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Handing incoming message from " + inetAddress.getHostAddress() + ":" + port + " to EndpointService"); } try { // Demux the message for the upper layers if (listener != null) { listener.demux(msg); } } catch (Throwable t) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure while endpoint demuxing " + msg, t); } } setLastUsed(System.currentTimeMillis()); } } catch (InterruptedIOException woken) { // We have to treat this as fatal since we don't know where // in the framing the input stream was at. closingDueToFailure = true; if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Error : read() timeout after " + woken.bytesTransferred + " on connection " + inetAddress.getHostAddress() + ":" + port); } } catch (EOFException finished) { // The other side has closed the connection if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Connection was closed by " + inetAddress.getHostAddress() + ":" + port); } } catch (Throwable e) { closingDueToFailure = true; if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Error on connection " + inetAddress.getHostAddress() + ":" + port, e); } } finally { synchronized( this ) { if (!closed) { // We need to close the connection down. close(); } recvThread = null; } } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } } /** * Send message to the remote peer. * *@param msg the message to send. *@exception IOException Description of the Exception */ public void sendMessage(Message msg) throws IOException { // socket is a stream, only one writer at a time... synchronized (writeLock) { if (closed) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Connection was closed to : " + dstAddress); } throw new IOException("Connection was closed to : " + dstAddress); } boolean success = false; long size = 0; try { // 20020730 bondolo@jxta.org Do something with content-coding here // serialize the message. WireFormatMessage serialed = WireFormatMessageFactory.toWire( msg, appMsg, (MimeMediaType[]) null); // Build the protocol header // Allocate a buffer to contain the message and the header MessagePackageHeader header = new MessagePackageHeader(); header.setContentTypeHeader(serialed.getMimeType()); size = serialed.getByteLength(); header.setContentLengthHeader(size); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("sendMessage (" + serialed.getByteLength() + ") to " + dstAddress + " via " + inetAddress.getHostAddress() + ":" + port); } // Write the header and the message. header.sendToStream(outputStream); outputStream.flush(); serialed.sendToStream(outputStream); outputStream.flush(); // all done! success = true; setLastUsed(System.currentTimeMillis()); } catch (Throwable failure) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("tcp send - message send failed for " + inetAddress.getHostAddress() + ":" + port, failure); } closingDueToFailure = true; close(); } } } /** * Description of the Method */ protected void start() { recvThread.start(); } /** * Description of the Method * *@exception IOException Description of the Exception */ private void startSocket(PeerID id) throws IOException { sharedSocket.setKeepAlive(true); int useBufferSize = Math.max(SendBufferSize, sharedSocket.getSendBufferSize()); sharedSocket.setSendBufferSize(useBufferSize); useBufferSize = Math.max(RecvBufferSize, sharedSocket.getReceiveBufferSize()); sharedSocket.setReceiveBufferSize(useBufferSize); sharedSocket.setSoLinger(true, LingerDelay); // socket.setTcpNoDelay(true); outputStream = new WatchedOutputStream(sharedSocket.getOutputStream()); outputStream.setWatchList(ShortCycle); inputStream = new WatchedInputStream(sharedSocket.getInputStream()); outputStream.setWatchList(LongCycle); if ((inputStream == null) || (outputStream == null)) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error(" failed getting streams."); } throw new IOException("Could not get streams"); } myWelcome = new WelcomeMessage(fullDstAddress, fullDstAddress, id, false); myWelcome.sendToStream(outputStream); outputStream.flush(); // The response should arrive shortly or we bail out. inputActive(true); itsWelcome = new WelcomeMessage(inputStream); // Ok, we can wait for messages now. inputActive(false); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Hello from " + itsWelcome.getPublicAddress() + " [" + itsWelcome.getPeerID() + "]"); } recvThread = new Thread(this); setThreadName(); recvThread.setDaemon(true); } /** * {@inheritDoc} <p/> * * Implementation for debugging. * *@return Description of the Return Value */ public String toString() { return super.toString() + ":" + ((null != itsWelcome) ? itsWelcome.getPeerID().toString() : "unknown") + " on address " + ((null != dstAddress) ? dstAddress.toString() : "unknown"); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -