⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtasocket.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        connect(group, pipeAd, timeout);    }    /**     *  Connects to a remote JxtaSocket on any peer within a timeout specified in milliseconds     *     *@param  group            group context     *@param  pipeAd             PipeAdvertisement     *@exception  IOException  if an io error occurs     */    public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException {        connect(group, null, pipeAd, timeout);    }    /**     *  Connects to a remote JxtaSocket on a specific peer within a timeout specified in milliseconds     *     *@param  group            group context     *@param  peerid           peer to connect to     *@param  pipeAd             PipeAdvertisement     *@param  timeout          timeout in milliseconds     *@exception  IOException  if an io error occurs     */    public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout) throws IOException {        if (pipeAd.getType() != null && pipeAd.getType().equals(PipeService.PropagateType)) {            throw new IOException("Propagate pipe advertisements are not supported");        }        this.group = group;        this.pipeAdv = pipeAd;        this.timeout = timeout;        pipeSvc = group.getPipeService();        myPipeAdv = JxtaServerSocket.newInputPipe(group, pipeAd);        this.in = pipeSvc.createInputPipe(myPipeAdv, this);        this.peerid = peerid;        Message openMsg = createOpenMessage(group, myPipeAdv);        // create the output pipe and send this message        // Need to retry the call to createOutputPipe. If there is no        // rendezvous yet and the destination is not reachable by mcast,        // then createOutputPipe has no effect.        // We repeat it with exponential delays.        long delay = 2000;        if (peerid == null) {            pipeSvc.createOutputPipe(pipeAd, this);        } else {            pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this);        }        while(true) {            try {                synchronized(acceptLock) {                    // check connectOutpipe within lock to prevent a race with modification.                    if (connectOutpipe != null) {                        break;                    }                    // If initial TO is exactly 0 we never get to here.                    // If this delay exceeds remaining timeout, wait                    // for the remainder.                    if (delay >= timeout) {                        delay = timeout;                    }                    // We take an initial TO < 0 as meaning. No wait.                    if (delay < 0) {                        break;                    }                    acceptLock.wait(delay);                    timeout -= delay;                    if (timeout <= 0) {                        // If we've exhausted the initial TO, leave.                        // So we never end-up with wait(<=0) unless the                        // initial TO was exactly 0.                        break;                    }                    // Next delay will be twice as long, remaining TO                    // permitting.                    delay *= 2;                }            } catch (InterruptedException ie) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Interrupted", ie);                }            }        }        if (connectOutpipe == null) {            throw new SocketTimeoutException("connection timeout");        }        // send connect message        waiting = true;        connectOutpipe.send(openMsg);        //wait for the second op        try {            synchronized (finalLock) {                if (waiting) {                    finalLock.wait(timeout);                }                if (msgr == null) {                    throw new SocketTimeoutException("connection timeout");                }            }        } catch (InterruptedException ie) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Interrupted", ie);            }        }        setBound();    }    /**     *  obtain the cred doc from the group object     *     *@param  group  group context     *@return        The credDoc value     */    protected static StructuredDocument getCredDoc(PeerGroup group) {        try {            MembershipService membership = group.getMembershipService();            Credential credential = membership.getDefaultCredential();            if (credential != null) {                return credential.getDocument(MimeMediaType.XMLUTF8);            }        } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("failed to get credential", e);            }        }        return null;    }    /**     *  get the remote credential doc     *     *  @return Credential StructuredDocument      */    public StructuredDocument getCredentialDoc() {        return credentialDoc;    }    /**     *  Sets the connection credential doc     *  If no credentials are set, the default group credential will be used     *     *  @param doc Credential StructuredDocument      */    public  void setCredentialDoc(StructuredDocument doc) {        this.myCredentialDoc = doc;    }    /**     *  Create a connection request message     *     *@param  group   group context     *@param  pipeAd  pipe advertisement     *@return         the Message  object     */    protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {        Message msg = new Message();        PeerAdvertisement peerAdv = group.getPeerAdvertisement();        if (myCredentialDoc == null) {            myCredentialDoc = getCredDoc(group);        }        if (myCredentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {            throw new IOException("No credentials established to initiate a secure connection");        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Requesting connection [isStream] :"+isStream);        }        try {            if (myCredentialDoc != null) {                msg.addMessageElement(JxtaServerSocket.nameSpace,                                      new TextDocumentMessageElement(JxtaServerSocket.credTag,                                                                     (XMLDocument) myCredentialDoc, null));            }            msg.addMessageElement(JxtaServerSocket.nameSpace,                                  new TextDocumentMessageElement(JxtaServerSocket.reqPipeTag,                                                                 (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));            msg.addMessageElement(JxtaServerSocket.nameSpace,                                  new StringMessageElement(JxtaServerSocket.streamTag,                                                           Boolean.toString(isStream),                                                           null));            msg.addMessageElement(JxtaServerSocket.nameSpace,                                  new TextDocumentMessageElement(JxtaServerSocket.remPeerTag,                                                                 (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));            return msg;        } catch (Throwable t) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("error getting element stream", t);            }            return null;        }    }    /**     *  Unsupported operation, use JxtaServerSocket to listen for connections.     *     *  @deprecated This method is no longer supported.     *     *@param  backlog          the maximum length of the queue.     *@exception  IOException  Will always be thrown as unsupported exception     */    public void listen(int backlog) throws IOException {        throw new IOException("Unsupported operation, use a JxtaServerSocket instead");    }    /**     *  Sets the bound attribute of the JxtaSocket object     */    private void setBound() {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Socket Connected");        }        bound = true;    }    /**     * {@inheritDoc}     */    public boolean isBound() {        return bound;    }    /**     * Internal routine. Assumes all the necessary conditions are     * fulfilled. Called only by code that knows what it's doing.     */    private void createRis() {        if (outgoing == null) {            outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);        }        if (ris == null) {            ris = new ReliableInputStream(outgoing, soTimeout);        }    }    /**     * Returns the internal output stream buffer size     * @return the internal buffer size     */    public synchronized int getOutputStreamBufferSize() {        return outputBufferSize;    }    /**     * Sets the internal output stream buffer size     * @param size the internal buffer size     */    public synchronized void setOutputStreamBufferSize(int size) throws IOException {        if (size < 1) {            throw new IllegalArgumentException("negative/zero buffer size");        }        if (osCreated) {            throw new IOException("Can not reset buffersize, OutputStream is already created");        }        outputBufferSize = size;    }    /**     * {@inheritDoc}     */    public InputStream getInputStream() throws IOException {        checkState();        if (isStream) {            if (outgoing == null) {                outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);            }            if (ris == null) {                // isBound should already have returned false, so this is                // only a bug detection feature.                throw new IOException("Reliable stream not initialized");            }        }        return new JxtaSocketInputStream(this);    }    /**     * {@inheritDoc}     */    public OutputStream getOutputStream() throws IOException {        checkState();        if (isStream) {            if (outgoing == null) {                outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);            }            if (ros == null) {                ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize));            }        }        osCreated = true;        return new JxtaSocketOutputStream(this, outputBufferSize);    }    /**     * {@inheritDoc}     */    public void close() throws IOException {        synchronized (closeLock) {            if (closed) {                return;            }            sendClose();            bound = false;            closed = true;        }        // This is called when closure is initiated on this side.        // First make sure whatever is in the output queue is sent        // If it cannot be sent within the specified timeout, throw an        // IOException.        // isStream is currently synonymous with isReliable. For reliable        // streams, the outgoing side that decides to close, must perform        // lingering. For now, we assume that it no-longer cares for incoming        // data. In other words, the side that does not decide to close just        // tears down everything without a care for the contens of its output        // queue.        if (isStream) {            long quitAt = System.currentTimeMillis() + timeout;            while (true) {                if (ros == null) {                    // Nothing to worry about.                    break;                }                // ros will not take any new message, now.                ros.setClosing();                ris.setClosing();                if (ros.getMaxAck() == ros.getSeqNumber()) {                    break;                }                // By default wait forever.                long left = 0;                // If timeout is not zero. Then compute the waiting time                // left.                if (timeout != 0) {                    left = quitAt - System.currentTimeMillis();                    if (left < 0) {                        // Too late                        closeCommon();                        throw new IOException("Close timeout");                    }                }                try {                    if (!ros.isQueueEmpty()) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Waiting for Output stream queue event");                        }                        ros.waitQueueEvent(left);                    }                    break;                } catch (InterruptedException ie) {                    // give up, then.                    throw new IOException("Close interrupted");                }            }            // We are initiating the close. We do not want to receive            // anything more. So we can close the ris right away.            ris.close();        }        // Lingering done, if needed. We can tell the other side it can close,        // and then tear down everything.        closeCommon();        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Socket closed");        }    }    /** This is called when closure is initiated on the remote side.     *  output is closed without any precaution since the remote side is     *  no longer interrested, but let the input drain, so that all the packets     *  the other side flushed before closing have been actually delivered.     *  If needed, ris will be close itself when read first return -1.     */    protected void closeFromRemote() throws IOException {        synchronized (closeLock) {            sendCloseACK();            if (closed) {                return;            }            bound = false;            closed = true;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Received a remote close request, shutting down connection");        }        if (isStream) {            long quitAt = System.currentTimeMillis() + timeout;            while (true) {                if (ros == null) {                    // Nothing to worry about.                    break;                }                // ros will not take any new message, now.                ros.setClosing();                ris.setClosing();                if (ros.getMaxAck() == ros.getSeqNumber()) {                    break;                }                // By default wait forever.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -