📄 jxtasocket.java
字号:
connect(group, pipeAd, timeout); } /** * Connects to a remote JxtaSocket on any peer within a timeout specified in milliseconds * *@param group group context *@param pipeAd PipeAdvertisement *@exception IOException if an io error occurs */ public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException { connect(group, null, pipeAd, timeout); } /** * Connects to a remote JxtaSocket on a specific peer within a timeout specified in milliseconds * *@param group group context *@param peerid peer to connect to *@param pipeAd PipeAdvertisement *@param timeout timeout in milliseconds *@exception IOException if an io error occurs */ public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout) throws IOException { if (pipeAd.getType() != null && pipeAd.getType().equals(PipeService.PropagateType)) { throw new IOException("Propagate pipe advertisements are not supported"); } this.group = group; this.pipeAdv = pipeAd; this.timeout = timeout; pipeSvc = group.getPipeService(); myPipeAdv = JxtaServerSocket.newInputPipe(group, pipeAd); this.in = pipeSvc.createInputPipe(myPipeAdv, this); this.peerid = peerid; Message openMsg = createOpenMessage(group, myPipeAdv); // create the output pipe and send this message // Need to retry the call to createOutputPipe. If there is no // rendezvous yet and the destination is not reachable by mcast, // then createOutputPipe has no effect. // We repeat it with exponential delays. long delay = 2000; if (peerid == null) { pipeSvc.createOutputPipe(pipeAd, this); } else { pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this); } while(true) { try { synchronized(acceptLock) { // check connectOutpipe within lock to prevent a race with modification. if (connectOutpipe != null) { break; } // If initial TO is exactly 0 we never get to here. // If this delay exceeds remaining timeout, wait // for the remainder. if (delay >= timeout) { delay = timeout; } // We take an initial TO < 0 as meaning. No wait. if (delay < 0) { break; } acceptLock.wait(delay); timeout -= delay; if (timeout <= 0) { // If we've exhausted the initial TO, leave. // So we never end-up with wait(<=0) unless the // initial TO was exactly 0. break; } // Next delay will be twice as long, remaining TO // permitting. delay *= 2; } } catch (InterruptedException ie) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Interrupted", ie); } } } if (connectOutpipe == null) { throw new SocketTimeoutException("connection timeout"); } // send connect message waiting = true; connectOutpipe.send(openMsg); //wait for the second op try { synchronized (finalLock) { if (waiting) { finalLock.wait(timeout); } if (msgr == null) { throw new SocketTimeoutException("connection timeout"); } } } catch (InterruptedException ie) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Interrupted", ie); } } setBound(); } /** * obtain the cred doc from the group object * *@param group group context *@return The credDoc value */ protected static StructuredDocument getCredDoc(PeerGroup group) { try { MembershipService membership = group.getMembershipService(); Credential credential = membership.getDefaultCredential(); if (credential != null) { return credential.getDocument(MimeMediaType.XMLUTF8); } } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to get credential", e); } } return null; } /** * get the remote credential doc * * @return Credential StructuredDocument */ public StructuredDocument getCredentialDoc() { return credentialDoc; } /** * Sets the connection credential doc * If no credentials are set, the default group credential will be used * * @param doc Credential StructuredDocument */ public void setCredentialDoc(StructuredDocument doc) { this.myCredentialDoc = doc; } /** * Create a connection request message * *@param group group context *@param pipeAd pipe advertisement *@return the Message object */ protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException { Message msg = new Message(); PeerAdvertisement peerAdv = group.getPeerAdvertisement(); if (myCredentialDoc == null) { myCredentialDoc = getCredDoc(group); } if (myCredentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) { throw new IOException("No credentials established to initiate a secure connection"); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Requesting connection [isStream] :"+isStream); } try { if (myCredentialDoc != null) { msg.addMessageElement(JxtaServerSocket.nameSpace, new TextDocumentMessageElement(JxtaServerSocket.credTag, (XMLDocument) myCredentialDoc, null)); } msg.addMessageElement(JxtaServerSocket.nameSpace, new TextDocumentMessageElement(JxtaServerSocket.reqPipeTag, (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(JxtaServerSocket.nameSpace, new StringMessageElement(JxtaServerSocket.streamTag, Boolean.toString(isStream), null)); msg.addMessageElement(JxtaServerSocket.nameSpace, new TextDocumentMessageElement(JxtaServerSocket.remPeerTag, (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null)); return msg; } catch (Throwable t) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("error getting element stream", t); } return null; } } /** * Unsupported operation, use JxtaServerSocket to listen for connections. * * @deprecated This method is no longer supported. * *@param backlog the maximum length of the queue. *@exception IOException Will always be thrown as unsupported exception */ public void listen(int backlog) throws IOException { throw new IOException("Unsupported operation, use a JxtaServerSocket instead"); } /** * Sets the bound attribute of the JxtaSocket object */ private void setBound() { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Socket Connected"); } bound = true; } /** * {@inheritDoc} */ public boolean isBound() { return bound; } /** * Internal routine. Assumes all the necessary conditions are * fulfilled. Called only by code that knows what it's doing. */ private void createRis() { if (outgoing == null) { outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout); } if (ris == null) { ris = new ReliableInputStream(outgoing, soTimeout); } } /** * Returns the internal output stream buffer size * @return the internal buffer size */ public synchronized int getOutputStreamBufferSize() { return outputBufferSize; } /** * Sets the internal output stream buffer size * @param size the internal buffer size */ public synchronized void setOutputStreamBufferSize(int size) throws IOException { if (size < 1) { throw new IllegalArgumentException("negative/zero buffer size"); } if (osCreated) { throw new IOException("Can not reset buffersize, OutputStream is already created"); } outputBufferSize = size; } /** * {@inheritDoc} */ public InputStream getInputStream() throws IOException { checkState(); if (isStream) { if (outgoing == null) { outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout); } if (ris == null) { // isBound should already have returned false, so this is // only a bug detection feature. throw new IOException("Reliable stream not initialized"); } } return new JxtaSocketInputStream(this); } /** * {@inheritDoc} */ public OutputStream getOutputStream() throws IOException { checkState(); if (isStream) { if (outgoing == null) { outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout); } if (ros == null) { ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize)); } } osCreated = true; return new JxtaSocketOutputStream(this, outputBufferSize); } /** * {@inheritDoc} */ public void close() throws IOException { synchronized (closeLock) { if (closed) { return; } sendClose(); bound = false; closed = true; } // This is called when closure is initiated on this side. // First make sure whatever is in the output queue is sent // If it cannot be sent within the specified timeout, throw an // IOException. // isStream is currently synonymous with isReliable. For reliable // streams, the outgoing side that decides to close, must perform // lingering. For now, we assume that it no-longer cares for incoming // data. In other words, the side that does not decide to close just // tears down everything without a care for the contens of its output // queue. if (isStream) { long quitAt = System.currentTimeMillis() + timeout; while (true) { if (ros == null) { // Nothing to worry about. break; } // ros will not take any new message, now. ros.setClosing(); ris.setClosing(); if (ros.getMaxAck() == ros.getSeqNumber()) { break; } // By default wait forever. long left = 0; // If timeout is not zero. Then compute the waiting time // left. if (timeout != 0) { left = quitAt - System.currentTimeMillis(); if (left < 0) { // Too late closeCommon(); throw new IOException("Close timeout"); } } try { if (!ros.isQueueEmpty()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for Output stream queue event"); } ros.waitQueueEvent(left); } break; } catch (InterruptedException ie) { // give up, then. throw new IOException("Close interrupted"); } } // We are initiating the close. We do not want to receive // anything more. So we can close the ris right away. ris.close(); } // Lingering done, if needed. We can tell the other side it can close, // and then tear down everything. closeCommon(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Socket closed"); } } /** This is called when closure is initiated on the remote side. * output is closed without any precaution since the remote side is * no longer interrested, but let the input drain, so that all the packets * the other side flushed before closing have been actually delivered. * If needed, ris will be close itself when read first return -1. */ protected void closeFromRemote() throws IOException { synchronized (closeLock) { sendCloseACK(); if (closed) { return; } bound = false; closed = true; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received a remote close request, shutting down connection"); } if (isStream) { long quitAt = System.currentTimeMillis() + timeout; while (true) { if (ros == null) { // Nothing to worry about. break; } // ros will not take any new message, now. ros.setClosing(); ris.setClosing(); if (ros.getMaxAck() == ros.getSeqNumber()) { break; } // By default wait forever.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -