⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtabidipipe.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        }    }    /**     * Toggles reliability     *      *@param  reliable Toggles reliability to reliable     * @throws IOEXecption if pipe is bound     */    public void setReliable(boolean reliable) throws IOException {        if (isBound()) {            throw new IOException("Can not set reliability after pipe is bound");        }        this.isReliable = reliable;    }    /**     *  obtain the cred doc from the group object     *     *@param  group  group context     *@return        The credDoc value     */    protected static StructuredDocument getCredDoc(PeerGroup group) {        try {            MembershipService membership = group.getMembershipService();            Credential credential = membership.getDefaultCredential();            if (credential != null) {                return credential.getDocument(MimeMediaType.XMLUTF8);            }        } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("failed to get credential", e);            }        }        return null;    }    /**     *  get the remote credential doc     *  @return Credential StructuredDocument      */    public StructuredDocument getCredentialDoc() {        return credentialDoc;    }    /**     *  Sets the connection credential doc     *  If no credentials are set, the default group credential will be used     *  @param doc Credential StructuredDocument      */    public  void setCredentialDoc(StructuredDocument doc) {        this.myCredentialDoc = doc;    }    /**     *  Create a connection request message     *     *@param  group   group context     *@param  pipeAd  pipe advertisement     *@return         the Message  object     */    protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {        Message msg = new Message();        PeerAdvertisement peerAdv = group.getPeerAdvertisement();        if (myCredentialDoc == null) {            myCredentialDoc = getCredDoc(group);        }        if (myCredentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {            throw new IOException("No credentials established to initiate a secure connection");        }        try {            if (myCredentialDoc != null) {                msg.addMessageElement(JxtaServerPipe.nameSpace,                                  new TextDocumentMessageElement(JxtaServerPipe.credTag,                                                                     (XMLDocument) myCredentialDoc, null));            }            msg.addMessageElement(JxtaServerPipe.nameSpace,                                  new TextDocumentMessageElement(JxtaServerPipe.reqPipeTag,                                                                 (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));            msg.addMessageElement(JxtaServerPipe.nameSpace,                                  new StringMessageElement(JxtaServerPipe.reliableTag,                                                           Boolean.toString(isReliable),                                                           null));            msg.addMessageElement(JxtaServerPipe.nameSpace,                                  new TextDocumentMessageElement(JxtaServerPipe.remPeerTag,                                                                 (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));            return msg;        } catch (Throwable t) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("error getting element stream", t);            }            return null;        }    }    /**     *  Accepts a connection     *     *@param  s                the accepted connection.     *@exception  IOException  if an I/O error occurs when accepting the     *      connection.     */    protected void accept(JxtaBiDiPipe s) throws IOException {        if (closed) {            throw new IOException("Pipe is closed");        }        if (!isBound()) {            throw new IOException("Pipe not bound");        }        try {            synchronized (acceptLock) {                // check connectOutpipe within lock to prevent a race with modification.                if (connectOutpipe == null) {                    acceptLock.wait(timeout);                }            }        } catch (InterruptedException ie) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Interrupted", ie);            }        }    }    /**     *  Sets the bound attribute of the JxtaServerPipe object     */    void setBound() {        bound = true;        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Pipe Bound :true");        }    }    /**     * Returns the binding state of the JxtaServerPipe.     *     * @return    true if the ServerSocket successfully bound to an address     */    public boolean isBound() {        return bound;    }    /**     *  Returns an input stream for this socket.     *     *@return                  a stream for reading from this socket.     *@exception  IOException  if an I/O error occurs when creating the     *      input stream.     */    public InputPipe getInputPipe() throws IOException {        return in;    }    protected synchronized void waiter(int timeMilisecs) {        try {            wait(timeMilisecs);        } catch(Exception e) {            LOG.error("error waiting",e);        }    }    /**     * Returns remote PeerAdvertisement     * @return remote PeerAdvertisement     */    public PeerAdvertisement getRemotePeerAdvertisement() {        return remotePeerAdv;    }    /**     * Returns remote PipeAdvertisement     * @return remote PipeAdvertisement     */    public PipeAdvertisement getRemotePipeAdvertisement() {        return remotePipeAdv;    }    /**     * Sets the remote PeerAdvertisement     * @param peer Remote PeerAdvertisement     */    protected void setRemotePeerAdvertisement(PeerAdvertisement peer) {        this.remotePeerAdv = peer;    }    /**     * Sets the remote PipeAdvertisement     * @param pipe PipeAdvertisement     */    protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) {        this.remotePipeAdv = pipe;    }    /**     *  Closes this pipe.     *     *@exception  IOException  if an I/O error occurs when closing this     *      socket.     */    public void close() throws IOException {        sendClose();        closePipe();    }    protected void closePipe() throws IOException {        // close both pipes        synchronized (closeLock) {            if (closed) {                return;            }            closed = true;            bound = false;        }        if (isReliable) {            long quitAt = System.currentTimeMillis() + timeout;            while (true) {                if (ros == null) {                    // Nothing to worry about.                    break;                }                // ros will not take any new message, now.                ros.setClosing();                if (ros.getMaxAck() == ros.getSeqNumber()) {                    break;                }                // By default wait forever.                long left = 0;                // If timeout is not zero. Then compute the waiting time                // left.                if (timeout != 0) {                    left = quitAt - System.currentTimeMillis();                    if (left < 0) {                        // Too late                        sendClose();                        throw new IOException("Close timeout");                    }                }                try {                    if (!ros.isQueueEmpty()) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Waiting for Output stream queue event");                        }                        ros.waitQueueEvent(left);                    }                    break;                } catch (InterruptedException ie) {                    // give up, then.                    throw new IOException("Close interrupted");                }            }            // We are initiating the close. We do not want to receive            // anything more. So we can close the ris right away.            ris.close();        }                if (isReliable && ros != null) {            ros.close();        }        // close the pipe        in.close();        msgr.close();        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Pipe close complete");        }        if (eventListener != null) {            try {                eventListener.pipeEvent(PIPE_CLOSED_EVENT);            } catch (Throwable th) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("error during pipe event callback", th);                }            }        }    }    /**     *  Sets the inputPipe attribute of the JxtaBiDiPipe object     *     *@param  in  The new inputPipe value     */    protected void setInputPipe(InputPipe in) {        this.in = in;    }    /**     * {@inheritDoc}     */    public void pipeMsgEvent(PipeMsgEvent event) {        Message message = event.getMessage();        if (message == null) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Empty event");            }            return;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Pipe message arrived");        }        MessageElement element = null;        if (!bound) {            // look for a remote pipe answer            element = (MessageElement)                      message.getMessageElement(JxtaServerPipe.nameSpace,                                                JxtaServerPipe.remPipeTag);            if (element != null) {                // connect response                try {                    StructuredDocument CredDoc=null;                    InputStream in = element.getStream();                    remotePipeAdv = (PipeAdvertisement)                                    AdvertisementFactory.newAdvertisement(element.getMimeType(), in);                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Recevied a pipe Advertisement :" +remotePipeAdv.getName());                    }                    element = message.getMessageElement(JxtaServerPipe.nameSpace,                                                        JxtaServerPipe.remPeerTag);                    if (element != null) {                        in = element.getStream();                        remotePeerAdv = (PeerAdvertisement)                                        AdvertisementFactory.newAdvertisement(element.getMimeType(), in);                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Recevied an Peer Advertisement :" +remotePeerAdv.getName());                        }                    } else {                        if (LOG.isEnabledFor(Level.WARN)) {                            LOG.warn(" BAD connect response");                        }                        return;                    }                    element = message.getMessageElement(JxtaServerPipe.nameSpace,                                                        JxtaServerPipe.credTag);                    if (element != null) {                        in = element.getStream();                        CredDoc = (StructuredDocument)                                  StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), in);                    }                    if (pipeAdv.getType().equals(PipeService.UnicastSecureType) && (CredDoc ==null || !checkCred(CredDoc))) {                        // we're done here                        if (LOG.isEnabledFor(Level.ERROR)) {                            LOG.error("Invalid remote credential doc");                        }                        return;                    }                    element = message.getMessageElement (JxtaServerPipe.nameSpace,                                                         JxtaServerPipe.reliableTag);                    if (element != null) {                        isReliable = (Boolean.valueOf(element.toString())).booleanValue();                    }                    msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Reliability set to :"+isReliable);                    }                    if (isReliable) {                        createRLib();                    }                    synchronized (finalLock) {                        waiting = false;                        finalLock.notifyAll();                    }                } catch (IOException e) {                    if (LOG.isEnabledFor(Level.ERROR)) {                        LOG.error("failed to process response message", e);                    }                }                return;            }        }        if(isReliable) {            //let reliabilty deal with the message            receiveMessage(message);            return;        }        if (!hasClose(message)) {            push(event);        }    }    private boolean hasClose(Message message) {        // look for close request        MessageElement element = (MessageElement)                                 message.getMessageElement(JxtaServerPipe.nameSpace,                                                           JxtaServerPipe.closeTag);        if (element != null) {            try {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Recevied a pipe close request, closing pipes");                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -