📄 tcpconnection.java
字号:
sharedSocket.setSoLinger(true, TcpTransport.LingerDelay); sharedSocket.setTcpNoDelay(true); woutputStream = new WatchedOutputStream(sharedSocket.getOutputStream(), TcpTransport.ChunkSize); woutputStream.setWatchList(proto.ShortCycle); winputStream = new WatchedInputStream(sharedSocket.getInputStream(), TcpTransport.ChunkSize); winputStream.setWatchList(proto.LongCycle); if ((winputStream == null) || (woutputStream == null)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(" failed getting streams."); } throw new IOException("Could not get streams"); } outputStream = new BufferedOutputStream(woutputStream, TcpTransport.ChunkSize); inputStream = winputStream; myWelcome = new WelcomeMessage(fullDstAddress, proto.getPublicAddress(), proto.group.getPeerID(), false); myWelcome.sendToStream(outputStream); outputStream.flush(); // The response should arrive shortly or we bail out. inputActive(true); itsWelcome = new WelcomeMessage(inputStream); // Ok, we can wait for messages now. inputActive(false); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("startSocket : Hello from " + itsWelcome.getPublicAddress() + " [" + itsWelcome.getPeerID() + "]"); } recvThread = new Thread(proto.myThreadGroup, this); setThreadName(); recvThread.setDaemon(true); } protected void start() { recvThread.start(); } /** * Send message to the remote peer. * * @param msg the message to send. * @return If <tt>true</tt> the message was sent successfully otherwise <tt>false</tt>. */ public boolean sendMessage(Message msg) throws IOException { // socket is a stream, only one writer at a time... synchronized (writeLock) { if (closed) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Connection was closed to : " + dstAddress); } throw new IOException("Connection was closed to : " + dstAddress); } boolean success = false; long sendBeginTime = 0; long size = 0; if (TransportMeterBuildSettings.TRANSPORT_METERING) { sendBeginTime = System.currentTimeMillis(); } try { // 20020730 bondolo@jxta.org Do something with content-coding here // serialize the message. WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, appMsg, (MimeMediaType[]) null); // Build the protocol header // Allocate a buffer to contain the message and the header MessagePackageHeader header = new MessagePackageHeader(); header.setContentTypeHeader(serialed.getMimeType()); size = serialed.getByteLength(); header.setContentLengthHeader(size); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + " (" + serialed.getByteLength() + ") to " + dstAddress + " via " + inetAddress.getHostAddress() + ":" + port); } header.sendToStream(outputStream); serialed.sendToStream(outputStream); outputStream.flush(); // all done! success = true; setLastUsed(System.currentTimeMillis()); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.messageSent(initiator, msg, System.currentTimeMillis() - sendBeginTime, size); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sent " + msg + " successfully via " + inetAddress.getHostAddress() + ":" + port); } return true; } catch (Exception failed) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.sendFailure(initiator, msg, System.currentTimeMillis() - sendBeginTime, size); } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Message send failed for " + inetAddress.getHostAddress() + ":" + port, failed); } closingDueToFailure = true; close(); IOException failure = new IOException("Failed sending " + msg + " to : " + inetAddress.getHostAddress() + ":" + port); failure.initCause(failed); throw failure; } } } /** * {@inheritDoc} * * This is the background Thread. While the connection is active, takes * messages from the queue and send it. */ public void run() { long receiveBeginTime = 0; long size = 0; try { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Starting receiver for " + inetAddress.getHostAddress() + ":" + port); } try { while (isConnected()) { if (closed) { break; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Message receive starts for " + inetAddress.getHostAddress() + ":" + port); } // We can stay blocked here for a long time, it's ok. MessagePackageHeader header = new MessagePackageHeader(inputStream); if (TransportMeterBuildSettings.TRANSPORT_METERING) { receiveBeginTime = System.currentTimeMillis(); } MimeMediaType msgMime = header.getContentTypeHeader(); long msglength = header.getContentLengthHeader(); // FIXME 20020730 bondolo@jxta.org Do something with content-coding here. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("tcp receive - message body (" + msglength + ") starts for " + inetAddress.getHostAddress() + ":" + port); } // read the message! // We have received the header, so, the rest had better // come. Turn the short timeout on. inputActive(true); Message msg = null; try { msg = WireFormatMessageFactory.fromWire(new LimitInputStream(inputStream, msglength, true), msgMime, (MimeMediaType) null); } catch (IOException failed) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("tcp receive - failed reading msg from " + inetAddress.getHostAddress() + ":" + port); // LOG.error(sharedSocket.toString() + // "\tbound " + sharedSocket.isBound() + // "\tclosed " + sharedSocket.isClosed() + // "\tconntected " + sharedSocket.isConnected() + // "\tisInputShutDown " + sharedSocket.isInputShutdown()); } throw failed; } finally { // We can relax again. inputActive(false); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Handing " + msg + " from " + inetAddress.getHostAddress() + ":" + port + " to EndpointService"); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.messageReceived(initiator, msg, System.currentTimeMillis() - receiveBeginTime, msglength); } // Demux the message for the upper layers. proto.endpoint.demux(msg); setLastUsed(System.currentTimeMillis()); } } catch (InterruptedIOException woken) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size); } // We have to treat this as fatal since we don't know where // in the framing the input stream was at. This should have // been handled below. closingDueToFailure = true; if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "tcp receive - Error : read() timeout after " + woken.bytesTransferred + " on connection " + inetAddress.getHostAddress() + ":" + port); } } catch (EOFException finished) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size); } // The other side has closed the connection if (LOG.isEnabledFor(Level.INFO)) { LOG.info("tcp receive - Connection was closed by " + inetAddress.getHostAddress() + ":" + port); } } catch (SocketException finished) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size); } closingDueToFailure = true; // The other side has closed the connection if (LOG.isEnabledFor(Level.INFO)) { LOG.info("tcp receive - Connection was closed by " + inetAddress.getHostAddress() + ":" + port); } } catch (Throwable e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size); } closingDueToFailure = true; if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("tcp receive - Error on connection " + inetAddress.getHostAddress() + ":" + port, e); } } finally { if (!closed) { // We need to close the connection down. close(); } } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { recvThread = null; } } private void closeIOs() { if (inputStream != null) { try { inputStream.close(); inputStream = null; } catch (Exception ez1) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("could not close inputStream ", ez1); } } } if (outputStream != null) { try { outputStream.close(); outputStream = null; } catch (Exception ez1) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Error : could not close outputStream ", ez1); } } } if (sharedSocket != null) { try { sharedSocket.close(); sharedSocket = null; } catch (Exception ez1) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Error : could not close socket ", ez1); } } } } /** * Soft close of the connection. Messages can no longer be sent, but any * in the queue will be flushed. */ public synchronized void close() { if (LOG.isEnabledFor(Level.INFO)) { LOG.info( (closingDueToFailure ? "Failure" : "Normal") + " close (open " + TimeUtils.toRelativeTimeMillis( TimeUtils.timeNow(), firstUsed ) + "ms) of socket to : " + dstAddress + " / " + (inetAddress != null ? inetAddress.getHostAddress() : "UNKNOWN" )+ ":" + port); if (LOG.isEnabledFor(Level.DEBUG) && closingDueToFailure) { LOG.debug("stack trace", new Throwable("stack trace")); } } if (!closed) { setLastUsed(0); // we idle now. Way idle. closeIOs(); closed = true; Thread temp = recvThread; if (temp != null) { temp.interrupt(); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { if (closingDueToFailure) { transportBindingMeter.connectionDropped(initiator, System.currentTimeMillis() - connectionBegunTime); } else { transportBindingMeter.connectionClosed(initiator, System.currentTimeMillis() - connectionBegunTime); } } // socket closing happens in the shutdown of recvThread } } /** * return the current connection status. * * @param true if there is an active connection to the remote peer, * otherwise false. * */ public boolean isConnected() { return ((recvThread != null) && (!closed)); } /** * Return the absolute time in milliseconds at which this Connection was last used. * * @return absolute time in milliseconds. */ public long getLastUsed() { return lastUsed; } /** * Set the last used time for this connection in absolute milliseconds. * * @param time absolute time in milliseconds. */ private void setLastUsed(long time) { lastUsed = time; } TransportBindingMeter getTransportBindingMeter() { return transportBindingMeter; } /** * This is called with "true" when the invoker is about to read some * input and is not willing to wait for it to come. * This is called with "false" when the invoker is about to wait for * a long time for input to become available with a potentialy very long * blocking read. */ private void inputActive(boolean active) { if (active) { winputStream.setWatchList(proto.ShortCycle); } else { winputStream.setWatchList(proto.LongCycle); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -