⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpmessenger.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            startMessenger();        } catch (IOException io) {            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter(null, dstAddress);                if (transportBindingMeter != null) {                    transportBindingMeter.connectionFailed(initiator, TimeUtils.timeNow() - createdAt);                }            }            // If we failed for any reason, make sure the socket is closed. This is the only place it is known.            if (socketChannel != null) {                socketChannel.close();            }            throw io;        }        if (TransportMeterBuildSettings.TRANSPORT_METERING) {            transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter((PeerID) getDestinationPeerID(), dstAddress);            if (transportBindingMeter != null) {                transportBindingMeter.connectionEstablished(initiator, TimeUtils.timeNow() - createdAt);            }            // TODO: We need to add the bytes from the Welcome Messages to the transportBindingMeter, iam@jxta.org        }        if (!isConnected()) {            throw new IOException("Failed to establish connection to " + dstAddress);        }    }    /**     * The cost of just having a finalize routine is high. The finalizer is     * a bottleneck and can delay garbage collection all the way to heap     * exhaustion. Leave this comment as a reminder to future maintainers.     * Below is the reason why finalize is not needed here.     * <p/>     * These messengers are never given to application layers. Endpoint code     * always calls close when needed.     * <p/>     * There used to be an incoming special case in order to *prevent*     * closure because the inherited finalize used to call close. This is     * no-longer the case. For the outgoing case, we do not need to call close     * for the reason explained above.     */    protected void finalize() throws Throwable {        closeImpl();        super.finalize();    }    /**     * {@inheritDoc}     * <p/>     * Now everyone knows its closed and the connection can no-longer be     * obtained. So, we can go about our business of closing it.     * It can happen that a redundant close() is done but it does not matter.     * close() is idempotent.     */    public synchronized void closeImpl() {        super.close();        if (closed) {            return;        }        closed = true;        // we are idle now. Way idle.        setLastUsed(0);        if (socketChannel != null) {            // unregister from selector.            tcpTransport.unregister(socketChannel);            try {                socketChannel.close();            } catch (IOException e) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "Failed to close messenger " + toString(), e);                }            }            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info((closingDueToFailure ? "Failure" : "Normal") + " close (open "                        + TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), createdAt) + "ms) of socket to : " + dstAddress + " / "                        + inetAddress.getHostAddress() + ":" + port);                if (closingDueToFailure && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.log(Level.FINE, "stack trace", new Throwable("stack trace"));                }            }        }        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {            if (closingDueToFailure) {                transportBindingMeter.connectionDropped(initiator, TimeUtils.timeNow() - createdAt);            } else {                transportBindingMeter.connectionClosed(initiator, TimeUtils.timeNow() - createdAt);            }        }    }    /**     * {@inheritDoc}     */    public boolean isClosed() {        // FIXME - jice 20040413: Warning. this is overloading the standard        // isClosed(). Things were arranged so that it        // should still work, but it's a stretch. Transports should get a deeper        // retrofit eventually.        if (isConnected()) {            return false;        }        // Ah, this connection is broken. So, we weren't closed, but now        // we are. That could happen redundantly since two threads could        // find that holdIt.isConnected() is false before one of them        // first zeroes conn. But it does not matter. super.close() is        // idempotent (and does pretty much nothing in our case, anyway).        super.close();        return true;    }    /**     * {@inheritDoc}     * <p/>     * Since we probe the connection status, we'll keep a messenger as long     * as the connection is active, even if only on the incoming side.     * So we're being a bit nice to the other side. Anyway, incoming     * connections do not go away when the messenger does. There's a receive     * timeout for that.     */    public boolean isIdleImpl() {        return (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), getLastUsed()) > 15 * TimeUtils.AMINUTE);    }    /**     * {@inheritDoc}     */    public EndpointAddress getLogicalDestinationImpl() {        // FIXME 20070127 bondolo THIS IS BEING CALLED BEFORE IT IS INITED.        return logicalDestAddress;    }    /**     * {@inheritDoc}     */    public void  sendMessageBImpl(Message message, String service, String serviceParam) throws IOException {        sendMessageDirect(message, service, serviceParam, false);    }    public void sendMessageDirect(Message message, String service, String serviceParam, boolean direct) throws IOException {        if (isClosed()) {            IOException failure = new IOException("Messenger was closed, it cannot be used to send messages.");            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, failure.getMessage(), failure);            }            throw failure;        }        // Set the message with the appropriate src and dest address        message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement);        EndpointAddress destAddressToUse;        if (direct) {            destAddressToUse = origAddress;        } else {            destAddressToUse = getDestAddressToUse(service, serviceParam);        }        MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddressToUse.toString(), null);        message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement);        // send it        try {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sending " + message + " to " + destAddressToUse + " on connection " + getDestinationAddress());            }            xmitMessage(message);        } catch (IOException caught) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Message send failed for " + message, caught);            }            closeImpl();            throw caught;        }    }    private void startMessenger() throws IOException {        socketChannel.configureBlocking(true);        // Send the welcome message        WelcomeMessage myWelcome = new WelcomeMessage(fullDstAddress,                                                      tcpTransport.getPublicAddress(),                                                      tcpTransport.group.getPeerID(), false);        long written = write(new ByteBuffer[]{myWelcome.getByteBuffer()});        tcpTransport.incrementBytesSent(written);        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("welcome message sent");        }        while (state.get() == readState.WELCOME) {            if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), this.createdAt) > (TcpTransport.connectionTimeOut)) {                throw new SocketTimeoutException("Failed to receive remote welcome message before timeout.");            }            read();            processBuffer();        }        if (!closed) {            socketChannel.configureBlocking(false);            tcpTransport.register(socketChannel, this);        }    }    /**     * Send message to the remote peer.     *     * @param msg the message to send.     * @throws java.io.IOException For errors sending the message.     */    private void xmitMessage(Message msg) throws IOException {        if (closed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Connection was closed to : " + dstAddress);            }            throw new IOException("Connection was closed to : " + dstAddress);        }        long sendBeginTime = TimeUtils.timeNow();        long size = 0;        try {            // todo 20020730 bondolo@jxta.org Do something with content-coding here            // serialize the message.            WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null);            // Build the package header            MessagePackageHeader header = new MessagePackageHeader();            header.setContentTypeHeader(serialed.getMimeType());            size = serialed.getByteLength();            header.setContentLengthHeader(size);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sending " + msg + " (" + size + ") to " + dstAddress + " via " + inetAddress.getHostAddress() + ":"+ port);            }            List<ByteBuffer> partBuffers = new ArrayList<ByteBuffer>();            partBuffers.add(header.getByteBuffer());            partBuffers.addAll(Arrays.asList(serialed.getByteBuffers()));            long written;            writeLock.lock();            try {                written = write(partBuffers.toArray(new ByteBuffer[partBuffers.size()]));            } finally {                writeLock.unlock();            }            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                transportBindingMeter.messageSent(initiator, msg, TimeUtils.timeNow() - sendBeginTime, written);            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(MessageFormat.format("Sent {0} bytes {1} successfully via {2}:{3}", written, msg,                        inetAddress.getHostAddress(), port));            }            tcpTransport.incrementBytesSent(written);            tcpTransport.incrementMessagesSent();            setLastUsed(TimeUtils.timeNow());        } catch (SocketTimeoutException failed) {            SocketTimeoutException failure = new SocketTimeoutException("Failed sending " + msg + " to : " + inetAddress.getHostAddress() + ":" + port);            failure.initCause(failed);            throw failure;        } catch (IOException failed) {            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                transportBindingMeter.sendFailure(initiator, msg, TimeUtils.timeNow() - sendBeginTime, size);            }            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Message send failed for " + inetAddress.getHostAddress() + ":" + port, failed);            }            closingDueToFailure = true;            close();            IOException failure = new IOException("Failed sending " + msg + " to : " + inetAddress.getHostAddress() + ":" + port);            failure.initCause(failed);            throw failure;        }    }    /**     * Blocking write of byte buffers to the socket channel.     *     * @param byteBuffers The bytes to write.     * @return The number of bytes written.     * @throws IOException Thrown for errors while writing message.     */    private long write(final ByteBuffer[] byteBuffers) throws IOException {        // Determine how many bytes there are to be written in the buffers.        long bytesToWrite = 0;        for (ByteBuffer byteBuffer : byteBuffers) {            bytesToWrite += byteBuffer.remaining();        }        if (bytesToWrite == 0L) {            return 0L;        }        long bytesWritten = 0;        Selector writeSelector = null;        SelectionKey wKey = null;        int attempts = 1;        try {            do {                long wroteBytes;                // Write from the buffers until we write nothing.                do {                    wroteBytes = socketChannel.write(byteBuffers);                    bytesWritten += wroteBytes;                    if (wroteBytes < 0) {                        throw new EOFException();                    }                    if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {                        LOG.finer(MessageFormat.format("Wrote {0} bytes", wroteBytes));                    }                } while (wroteBytes != 0);                // Are we done?                if (bytesWritten == bytesToWrite) {                    break;                }                // This attempt failed, we may try again.                attempts++;                if (attempts > MAX_WRITE_ATTEMPTS) {                    throw new IOException(MessageFormat.format("Max write attempts ({0}) exceeded ({1})", attempts, MAX_WRITE_ATTEMPTS));                }                // Get a write selector, we're going to do some waiting.                if (writeSelector == null) {                    try {                        writeSelector = tcpTransport.getSelector();                    } catch (InterruptedException woken) {                        InterruptedIOException incompleteIO = new InterruptedIOException("Interrupted while acquiring write selector.");                        incompleteIO.initCause(woken);                        incompleteIO.bytesTransferred = (int) Math.min(bytesWritten, Integer.MAX_VALUE);                        throw incompleteIO;                    }                    if (writeSelector == null) {                        continue;                    }                    wKey = socketChannel.register(writeSelector, SelectionKey.OP_WRITE);                }                // Wait until we are told we can write again.                int ready = writeSelector.select(TcpTransport.connectionTimeOut);                if (ready == 0) {                    throw new SocketTimeoutException("Timeout during socket write");                } else {                    attempts--;                }            } while (attempts <= MAX_WRITE_ATTEMPTS);        } finally {            // cancel the key before returning selector to the pool.            if (wKey != null) {                wKey.cancel();                wKey = null;            }            // put the selector back in the pool            if (writeSelector != null) {                // clean up the selector                writeSelector.selectNow();                tcpTransport.returnSelector(writeSelector);            }        }        return bytesWritten;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -