⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtabidipipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        this.group = group;        this.msgListener = msgListener;        if (msgListener != null) {            dequeued = true;        }        this.isReliable = reliable;        pipeSvc = group.getPipeService();        this.timeout = timeout;        myPipeAdv = JxtaServerPipe.newInputPipe(group, pipeAd);        this.inputPipe = pipeSvc.createInputPipe(myPipeAdv, this);        this.credentialDoc = credentialDoc != null ? credentialDoc : getCredDoc(group);        Message openMsg = createOpenMessage(group, myPipeAdv);        // create the output pipe and send this message        if (peerid == null) {            pipeSvc.createOutputPipe(pipeAd, this);        } else {            pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this);        }        try {            synchronized (acceptLock) {                // check connectOutpipe within lock to prevent a race with modification.                if (connectOutpipe == null) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Waiting for " + timeout + " msec");                    }                    acceptLock.wait(timeout);                }            }        } catch (InterruptedException ie) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "Interrupted", ie);            }            Thread.interrupted();            IOException exp = new IOException("Interrupted");            exp.initCause(ie);            throw exp;        }        if (connectOutpipe == null) {            throw new IOException("connection timeout");        }        // send connect message        waiting = true;        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Sending a backchannel message");        }        connectOutpipe.send(openMsg);        // wait for the second op        try {            synchronized (finalLock) {                if (waiting) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Waiting for " + timeout + " msec for back channel to be established");                    }                    finalLock.wait(timeout);                    // Need to check for creation                    if (msgr == null) {                        throw new IOException("connection timeout");                    }                }            }        } catch (InterruptedException ie) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "Interrupted", ie);            }            Thread.interrupted();            IOException exp = new IOException("Interrupted");            exp.initCause(ie);            throw exp;        }        setBound();        notifyListeners(PipeStateListener.PIPE_OPENED_EVENT);    }    /**     * creates all the reliability objects     */    private void createRLib() {        if (isReliable) {            if (outgoing == null) {                outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);            }            if (ros == null) {                ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize));            }            if (ris == null) {                ris = new ReliableInputStream(outgoing, retryTimeout, this);            }        }    }    /**     * Toggles reliability     *     * @param reliable Toggles reliability to reliable     * @throws IOException if pipe is bound     */    public void setReliable(boolean reliable) throws IOException {        if (isBound()) {            throw new IOException("Can not set reliability after pipe is bound");        }        this.isReliable = reliable;    }    /**     * Obtain the cred doc from the group object.     *     * @param group group context     * @return The credDoc value     */    protected static StructuredDocument getCredDoc(PeerGroup group) {        try {            MembershipService membership = group.getMembershipService();            Credential credential = membership.getDefaultCredential();            if (credential != null) {                return credential.getDocument(MimeMediaType.XMLUTF8);            }        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "failed to get credential", e);            }        }        return null;    }    /**     * get the remote credential doc     *     * @return Credential StructuredDocument     */    public StructuredDocument getCredentialDoc() {        return credentialDoc;    }    /**     * Sets the connection credential doc.     * If no credentials are set, the default group credential are used.     *     * @param doc Credential StructuredDocument     */    public void setCredentialDoc(StructuredDocument doc) {        this.credentialDoc = doc;    }    /**     * Creates a connection request message     *     * @param group  group context     * @param pipeAd pipe advertisement     * @return the Message  object     * @throws IOException if an io error occurs     */    protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {        Message msg = new Message();        PeerAdvertisement peerAdv = group.getPeerAdvertisement();        if (credentialDoc == null) {            credentialDoc = getCredDoc(group);        }        if (credentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {            throw new IOException("No credentials established to initiate a secure connection");        }        try {            if (credentialDoc != null) {                msg.addMessageElement(JxtaServerPipe.nameSpace,                        new TextDocumentMessageElement(JxtaServerPipe.credTag, (XMLDocument) credentialDoc, null));            }            msg.addMessageElement(JxtaServerPipe.nameSpace,                    new TextDocumentMessageElement(JxtaServerPipe.reqPipeTag,                            (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));            msg.addMessageElement(JxtaServerPipe.nameSpace,                    new StringMessageElement(JxtaServerPipe.reliableTag, Boolean.toString(isReliable), null));            msg.addMessageElement(JxtaServerPipe.nameSpace,                    new StringMessageElement(JxtaServerPipe.directSupportedTag, Boolean.toString(true), null));            msg.addMessageElement(JxtaServerPipe.nameSpace,                    new TextDocumentMessageElement(JxtaServerPipe.remPeerTag,                            (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));            return msg;        } catch (Throwable t) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "error getting element stream", t);            }            return null;        }    }    /**     * Sets the bound attribute of the JxtaServerPipe object     */    void setBound() {        bound = true;        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Pipe Bound :true");        }    }    /**     * Returns the binding state of the JxtaServerPipe.     *     * @return true if the ServerSocket successfully bound to an address     */    public boolean isBound() {        return bound;    }    /**     * Returns an input stream for this socket.     *     * @return a stream for reading from this socket.     * @throws IOException if an I/O error occurs when creating the     *                     input stream.     */    public InputPipe getInputPipe() throws IOException {        return inputPipe;    }    /**     * Returns remote PeerAdvertisement     *     * @return remote PeerAdvertisement     */    public PeerAdvertisement getRemotePeerAdvertisement() {        return remotePeerAdv;    }    /**     * Returns remote PipeAdvertisement     *     * @return remote PipeAdvertisement     */    public PipeAdvertisement getRemotePipeAdvertisement() {        return remotePipeAdv;    }    /**     * Sets the remote PeerAdvertisement     *     * @param peer Remote PeerAdvertisement     */    protected void setRemotePeerAdvertisement(PeerAdvertisement peer) {        this.remotePeerAdv = peer;    }    /**     * Sets the remote PipeAdvertisement     *     * @param pipe PipeAdvertisement     */    protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) {        this.remotePipeAdv = pipe;    }    /**     * Closes this pipe.     *     * @throws IOException if an I/O error occurs when closing this     *                     socket.     */    public void close() throws IOException {        sendClose();        closePipe(false);        bound = false;    }    protected void closePipe(boolean fastClose) throws IOException {        // close both pipes        synchronized (closeLock) {            if (closed) {                return;            }            closed = true;            bound = false;        }        if (!fastClose && isReliable && !direct) {            /*             *  This implements linger!             */            long quitAt = System.currentTimeMillis() + timeout;            while (true) {                //FIXME hamada this does not loop                if (ros == null || ros.getMaxAck() == ros.getSeqNumber()) {                    // Nothing to worry about.                    break;                }                // By default wait forever.                long left = 0;                // If timeout is not zero. Then compute the waiting time                // left.                if (timeout != 0) {                    left = quitAt - System.currentTimeMillis();                    if (left < 0) {                        // Too late                        sendClose();                        throw new IOException("Close timeout");                    }                }                try {                    if (!ros.isQueueEmpty()) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Waiting for Output stream queue event");                        }                        ros.waitQueueEvent(left);                    }                    break;                } catch (InterruptedException ie) {                    // give up, then.                    throw new IOException("Close interrupted");                }            }            // We are initiating the close. We do not want to receive            // anything more. So we can close the ris right away.            ris.close();        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -