⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtasocket.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    /**     * Sets our credential to be used by this socket connection. If no     * credentials are set, the default group credential will be used.     *     * @param localCredential The credential to be used for connection responses     *                     or <tt>null</tt> if the default credential is to be used.     */    public void setCredential(Credential localCredential) {        if (localCredential == null) {            this.localCredential = localCredential;        } else {            try {                MembershipService membership = group.getMembershipService();                this.localCredential = membership.getDefaultCredential();            } catch (Exception failed) {                this.localCredential = null;            }        }    }    /**     * Create a connection request/response message     *     * @param group      The group in which the socket is being used.     * @param pipeAdv    Advertisement for our ephemeral pipe.     * @param credential Our credential or null if default credential is to     *                   be used.     * @param isReliable The socket is to be reliable (stream).     * @param initiator  indicates initiator     * @return The message.     * @throws IOException if an I/O error occurs     */    protected Message createConnectMessage(PeerGroup group, PipeAdvertisement pipeAdv, Credential credential, boolean isReliable, boolean initiator) throws IOException {        Message msg = new Message();        if (credential == null) {            credential = getDefaultCredential(group);        }        if ((credential == null) && PipeService.UnicastSecureType.equals(pipeAdv.getType())) {            throw new IOException("Credentials must be established to initiate a secure connection.");        }        if (credential != null) {            try {                XMLDocument credDoc = (XMLDocument) credential.getDocument(MimeMediaType.XMLUTF8);                msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE,                        new TextDocumentMessageElement(JxtaServerSocket.credTag, credDoc, null));            } catch (Exception failed) {                IOException failure = new IOException("Could not generate credential element.");                failure.initCause(failed);                throw failure;            }        }        msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE,                new TextDocumentMessageElement(initiator ? JxtaServerSocket.reqPipeTag : JxtaServerSocket.remPipeTag,                (XMLDocument) pipeAdv.getDocument(MimeMediaType.XMLUTF8), null));        msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE,                new TextDocumentMessageElement(JxtaServerSocket.remPeerTag,                (XMLDocument) group.getPeerAdvertisement().getDocument(MimeMediaType.XMLUTF8), null));        msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE,                new StringMessageElement(JxtaServerSocket.streamTag, Boolean.toString(isReliable), null));        return msg;    }    /**     * Create a pipe advertisement for an ephemeral pipe (w/random pipe ID) from an existing pipe advertisement.     * The specified pipe adveritsement is only used for the name and type     *     * @param pipeAdv to get the basename and type from     * @return A new pipe advertisement for an ephemeral pipe.     */    protected static PipeAdvertisement newEphemeralPipeAdv(PipeAdvertisement pipeAdv) {        PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());        PeerGroupID gid = (PeerGroupID) ((PipeID) pipeAdv.getPipeID()).getPeerGroupID();        adv.setPipeID(IDFactory.newPipeID(gid));        adv.setName(pipeAdv.getName() + ".remote");        adv.setType(pipeAdv.getType());        return adv;    }    /**     * {@inheritDoc}     */    @Override    public boolean isBound() {        return bound;    }    /**     * Sets whether this socket is currently bound or not. A socket is considered bound if the local resources required     * in order to interact with a remote peer are allocated and open.     *     * @param boundState The new bound state.     */    private void setBound(boolean boundState) {        bound = boundState;    }    /**     * {@inheritDoc}     */    @Override    public boolean isConnected() {        return connected;    }    /**     * Sets whether this socket is currently connected or not. A socket is     * considered connected if is believed that the Socket's remote peer has     * the resources required in order to interact with a local peer (us)     * allocated and open.     *     * @param connectedState The new connected state.     */    private void setConnected(boolean connectedState) {        connected = connectedState;    }    /**     * Opens our ephemeral input pipe enabling us to receive messages.     *     * @throws IOException Thrown for errors in creating the input pipe.     */    private void bind() throws IOException {        this.localEphemeralPipeIn = pipeSvc.createInputPipe(localEphemeralPipeAdv, this);        // The socket is bound now.        setBound(true);    }    /**     *  Create an appropriate Outgoing Adaptor. This method exists primarily     *  so that sub-classes can substitute a different Outgoing sub-class.     *     *  @param msgr The messenger to be wrapped.     *  @param timeout The timeout value;     *  @return Outgoing The messenger wrapped in an appropriate adaptor.     */    protected Outgoing makeOutgoing(Messenger msgr, long timeout) {        return new OutgoingMsgrAdaptor(msgr, (int) timeout);    }                /**     * Opens the ephemeral output pipe for the remote peer. Also opens the     * input and output streams. (delaying adds complexity).     *     * @throws IOException Thrown for errors in opening resources.     */    private void connect() throws IOException {        remoteEphemeralPipeMsgr = lightweightOutputPipe(group, remoteEphemeralPipeAdv, remotePeerAdv);        if (remoteEphemeralPipeMsgr == null) {            throw new IOException("Could not create messenger back to connecting peer");        }        // Force the buffer size smaller if user set it too high.        if (remoteEphemeralPipeMsgr.getMTU() < outputBufferSize) {            outputBufferSize = Math.min((int) remoteEphemeralPipeMsgr.getMTU(), DEFAULT_OUTPUT_BUFFER_SIZE);        }        if (outputBufferSize == -1) {            outputBufferSize = Math.min((int) remoteEphemeralPipeMsgr.getMTU(), DEFAULT_OUTPUT_BUFFER_SIZE);        }        // Force the creation of the inputStream now. Waiting until someone        // calls getInputStream() would likely cause us to drop messages.        if (isReliable) {            outgoing = makeOutgoing(remoteEphemeralPipeMsgr, retryTimeout);            ris = new ReliableInputStream(outgoing, soTimeout);            ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize));            try {                ros.setSendBufferSize(outputBufferSize);            } catch (IOException ignored) {// it's only a preference...            }        } else {            nonReliableInputStream = new JxtaSocketInputStream(this, windowSize);            nonReliableOutputStream = new JxtaSocketOutputStream(this, outputBufferSize);        }        // the socket is now connected!        setConnected(true);    }    /**     * Returns the internal output stream buffer size     *     * @return the internal buffer size.     * @deprecated Use the standard {@link #getSendBufferSize()} method instead.     */    @Deprecated    public int getOutputStreamBufferSize() {        return (outputBufferSize == -1) ? DEFAULT_OUTPUT_BUFFER_SIZE : outputBufferSize;    }    /**     * Sets the internal output stream buffer size.     *     * @param size The internal buffer size.     * @throws IOException if an I/O error occurs     * @deprecated Use the standard {@link #setSendBufferSize(int)} method instead.     */    @Deprecated    public void setOutputStreamBufferSize(int size) throws IOException {        setSendBufferSize(size);    }    /**     * {@inheritDoc}     */    @Override    public InputStream getInputStream() throws IOException {        checkState();        if (isInputShutdown()) {            throw new SocketException("Input already shutdown.");        }        if (isReliable) {            return ris;        } else {            return nonReliableInputStream;        }    }    /**     * {@inheritDoc}     */    @Override    public OutputStream getOutputStream() throws IOException {        checkState();        if (isOutputShutdown()) {            throw new SocketException("Output already shutdown.");        }        return isReliable ? ros : nonReliableOutputStream;    }    /**     * {@inheritDoc}     * <p/>     * We hard-close both the input and output streams. Nothing can be     * written or read to the socket after this method. Any queued incoming     * data is discarded. Any additional incoming messages will be ACKed but     * their content will be discarded. We will attempt to send any data which     * has already been written to the OutputStream.     * <p/>     * Once the output queue is empty we will send a close message to tell     * the remote side that no more data is coming.     * <p/>     * This is the only method in this class which is {@code synchronized}.     * All others use internal synchronization.     */    @Override    public synchronized void close() throws IOException {        try {            synchronized (closeLock) {                long closeEndsAt = System.currentTimeMillis() + timeout;                if (closeEndsAt < timeout) {                    closeEndsAt = Long.MAX_VALUE;                }                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("Closing " + this + " timeout=" + timeout + "ms.");                }                if (closed) {                    return;                }                closed = true;                shutdownOutput();                shutdownInput();                while (isConnected()) {                    long closingFor = closeEndsAt - System.currentTimeMillis();                    if (closingFor <= 0) {                        break;                    }                    if (isReliable) {                        try {                            if (ros.isQueueEmpty()) {                                // Only send a close if the queue is empty.                                sendClose();                            } else {                                // Reliable Output Stream not empty. Don't send close yet.                                ros.waitQueueEmpty(1000);                                continue;                            }                        } catch (InterruptedException woken) {                            Thread.interrupted();                            break;                        }                    } else {                        sendClose();                    }                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Sent close, awaiting ACK for " + this);                    }                    // Don't send our close too many times.                    try {                        long nextTry = Math.min(20000, closingFor);                        if (nextTry > 0 && isConnected()) {                            closeLock.wait(nextTry);                        }                    } catch (InterruptedException woken) {                        Thread.interrupted();                        break;                    }                }                if (isConnected()) {                    // Last ditch close attempt                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Still connected at end of timeout. Forcing closed." + this);                    }                    sendClose();                    throw new SocketTimeoutException("Failed to receive close ack from remote connection.");                }            }        } finally {            // No matter what else happens at the end of close() we are no            // longer connected and no longer bound.            setConnected(false);            unbind();            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("Socket closed : " + this);            }        }    }    /**     * This is called when closure is initiated on the remote side. By     * convention we receive the close message only after we have ACKed the last     * data segment.     * <p/>     * We soft-close the InputStream which allows us to read data already     * received.     * <p/>     * We hard-close our output stream and discard all queued, unACKed data     * as the remote side doesn't want to receive it (the remote side has     * already unbound themselves, they just want our close ACK in order to clean     * up.)     *     * @throws IOException if an I/O error occurs     */    protected void closeFromRemote() throws IOException {        synchronized (closeLock) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.INFO)) {                LOG.info("Received a remote close request." + this);            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -