📄 tcpmessenger.java
字号:
startMessenger(); } catch (IOException io) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter(null, dstAddress); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(initiator, TimeUtils.timeNow() - createdAt); } } // If we failed for any reason, make sure the socket is closed. This is the only place it is known. if (socketChannel != null) { socketChannel.close(); } throw io; } if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter((PeerID) getDestinationPeerID(), dstAddress); if (transportBindingMeter != null) { transportBindingMeter.connectionEstablished(initiator, TimeUtils.timeNow() - createdAt); } // TODO: We need to add the bytes from the Welcome Messages to the transportBindingMeter, iam@jxta.org } if (!isConnected()) { throw new IOException("Failed to establish connection to " + dstAddress); } } /** * The cost of just having a finalize routine is high. The finalizer is * a bottleneck and can delay garbage collection all the way to heap * exhaustion. Leave this comment as a reminder to future maintainers. * Below is the reason why finalize is not needed here. * <p/> * These messengers are never given to application layers. Endpoint code * always calls close when needed. * <p/> * There used to be an incoming special case in order to *prevent* * closure because the inherited finalize used to call close. This is * no-longer the case. For the outgoing case, we do not need to call close * for the reason explained above. */ protected void finalize() throws Throwable { closeImpl(); super.finalize(); } /** * {@inheritDoc} * <p/> * Now everyone knows its closed and the connection can no-longer be * obtained. So, we can go about our business of closing it. * It can happen that a redundant close() is done but it does not matter. * close() is idempotent. */ public synchronized void closeImpl() { super.close(); if (closed) { return; } closed = true; // we are idle now. Way idle. setLastUsed(0); if (socketChannel != null) { // unregister from selector. tcpTransport.unregister(socketChannel); try { socketChannel.close(); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed to close messenger " + toString(), e); } } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info((closingDueToFailure ? "Failure" : "Normal") + " close (open " + TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), createdAt) + "ms) of socket to : " + dstAddress + " / " + inetAddress.getHostAddress() + ":" + port); if (closingDueToFailure && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "stack trace", new Throwable("stack trace")); } } } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { if (closingDueToFailure) { transportBindingMeter.connectionDropped(initiator, TimeUtils.timeNow() - createdAt); } else { transportBindingMeter.connectionClosed(initiator, TimeUtils.timeNow() - createdAt); } } } /** * {@inheritDoc} */ public boolean isClosed() { // FIXME - jice 20040413: Warning. this is overloading the standard // isClosed(). Things were arranged so that it // should still work, but it's a stretch. Transports should get a deeper // retrofit eventually. if (isConnected()) { return false; } // Ah, this connection is broken. So, we weren't closed, but now // we are. That could happen redundantly since two threads could // find that holdIt.isConnected() is false before one of them // first zeroes conn. But it does not matter. super.close() is // idempotent (and does pretty much nothing in our case, anyway). super.close(); return true; } /** * {@inheritDoc} * <p/> * Since we probe the connection status, we'll keep a messenger as long * as the connection is active, even if only on the incoming side. * So we're being a bit nice to the other side. Anyway, incoming * connections do not go away when the messenger does. There's a receive * timeout for that. */ public boolean isIdleImpl() { return (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), getLastUsed()) > 15 * TimeUtils.AMINUTE); } /** * {@inheritDoc} */ public EndpointAddress getLogicalDestinationImpl() { // FIXME 20070127 bondolo THIS IS BEING CALLED BEFORE IT IS INITED. return logicalDestAddress; } /** * {@inheritDoc} */ public void sendMessageBImpl(Message message, String service, String serviceParam) throws IOException { sendMessageDirect(message, service, serviceParam, false); } public void sendMessageDirect(Message message, String service, String serviceParam, boolean direct) throws IOException { if (isClosed()) { IOException failure = new IOException("Messenger was closed, it cannot be used to send messages."); if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, failure.getMessage(), failure); } throw failure; } // Set the message with the appropriate src and dest address message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement); EndpointAddress destAddressToUse; if (direct) { destAddressToUse = origAddress; } else { destAddressToUse = getDestAddressToUse(service, serviceParam); } MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddressToUse.toString(), null); message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement); // send it try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + message + " to " + destAddressToUse + " on connection " + getDestinationAddress()); } xmitMessage(message); } catch (IOException caught) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Message send failed for " + message, caught); } closeImpl(); throw caught; } } private void startMessenger() throws IOException { socketChannel.configureBlocking(true); // Send the welcome message WelcomeMessage myWelcome = new WelcomeMessage(fullDstAddress, tcpTransport.getPublicAddress(), tcpTransport.group.getPeerID(), false); long written = write(new ByteBuffer[]{myWelcome.getByteBuffer()}); tcpTransport.incrementBytesSent(written); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("welcome message sent"); } while (state.get() == readState.WELCOME) { if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), this.createdAt) > (TcpTransport.connectionTimeOut)) { throw new SocketTimeoutException("Failed to receive remote welcome message before timeout."); } read(); processBuffer(); } if (!closed) { socketChannel.configureBlocking(false); tcpTransport.register(socketChannel, this); } } /** * Send message to the remote peer. * * @param msg the message to send. * @throws java.io.IOException For errors sending the message. */ private void xmitMessage(Message msg) throws IOException { if (closed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Connection was closed to : " + dstAddress); } throw new IOException("Connection was closed to : " + dstAddress); } long sendBeginTime = TimeUtils.timeNow(); long size = 0; try { // todo 20020730 bondolo@jxta.org Do something with content-coding here // serialize the message. WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null); // Build the package header MessagePackageHeader header = new MessagePackageHeader(); header.setContentTypeHeader(serialed.getMimeType()); size = serialed.getByteLength(); header.setContentLengthHeader(size); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " (" + size + ") to " + dstAddress + " via " + inetAddress.getHostAddress() + ":"+ port); } List<ByteBuffer> partBuffers = new ArrayList<ByteBuffer>(); partBuffers.add(header.getByteBuffer()); partBuffers.addAll(Arrays.asList(serialed.getByteBuffers())); long written; writeLock.lock(); try { written = write(partBuffers.toArray(new ByteBuffer[partBuffers.size()])); } finally { writeLock.unlock(); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.messageSent(initiator, msg, TimeUtils.timeNow() - sendBeginTime, written); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("Sent {0} bytes {1} successfully via {2}:{3}", written, msg, inetAddress.getHostAddress(), port)); } tcpTransport.incrementBytesSent(written); tcpTransport.incrementMessagesSent(); setLastUsed(TimeUtils.timeNow()); } catch (SocketTimeoutException failed) { SocketTimeoutException failure = new SocketTimeoutException("Failed sending " + msg + " to : " + inetAddress.getHostAddress() + ":" + port); failure.initCause(failed); throw failure; } catch (IOException failed) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.sendFailure(initiator, msg, TimeUtils.timeNow() - sendBeginTime, size); } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Message send failed for " + inetAddress.getHostAddress() + ":" + port, failed); } closingDueToFailure = true; close(); IOException failure = new IOException("Failed sending " + msg + " to : " + inetAddress.getHostAddress() + ":" + port); failure.initCause(failed); throw failure; } } /** * Blocking write of byte buffers to the socket channel. * * @param byteBuffers The bytes to write. * @return The number of bytes written. * @throws IOException Thrown for errors while writing message. */ private long write(final ByteBuffer[] byteBuffers) throws IOException { // Determine how many bytes there are to be written in the buffers. long bytesToWrite = 0; for (ByteBuffer byteBuffer : byteBuffers) { bytesToWrite += byteBuffer.remaining(); } if (bytesToWrite == 0L) { return 0L; } long bytesWritten = 0; Selector writeSelector = null; SelectionKey wKey = null; int attempts = 1; try { do { long wroteBytes; // Write from the buffers until we write nothing. do { wroteBytes = socketChannel.write(byteBuffers); bytesWritten += wroteBytes; if (wroteBytes < 0) { throw new EOFException(); } if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer(MessageFormat.format("Wrote {0} bytes", wroteBytes)); } } while (wroteBytes != 0); // Are we done? if (bytesWritten == bytesToWrite) { break; } // This attempt failed, we may try again. attempts++; if (attempts > MAX_WRITE_ATTEMPTS) { throw new IOException(MessageFormat.format("Max write attempts ({0}) exceeded ({1})", attempts, MAX_WRITE_ATTEMPTS)); } // Get a write selector, we're going to do some waiting. if (writeSelector == null) { try { writeSelector = tcpTransport.getSelector(); } catch (InterruptedException woken) { InterruptedIOException incompleteIO = new InterruptedIOException("Interrupted while acquiring write selector."); incompleteIO.initCause(woken); incompleteIO.bytesTransferred = (int) Math.min(bytesWritten, Integer.MAX_VALUE); throw incompleteIO; } if (writeSelector == null) { continue; } wKey = socketChannel.register(writeSelector, SelectionKey.OP_WRITE); } // Wait until we are told we can write again. int ready = writeSelector.select(TcpTransport.connectionTimeOut); if (ready == 0) { throw new SocketTimeoutException("Timeout during socket write"); } else { attempts--; } } while (attempts <= MAX_WRITE_ATTEMPTS); } finally { // cancel the key before returning selector to the pool. if (wKey != null) { wKey.cancel(); wKey = null; } // put the selector back in the pool if (writeSelector != null) { // clean up the selector writeSelector.selectNow(); tcpTransport.returnSelector(writeSelector); } } return bytesWritten; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -