📄 jxtabidipipe.java
字号:
this.group = group; this.msgListener = msgListener; if (msgListener != null) { dequeued = true; } this.isReliable = reliable; pipeSvc = group.getPipeService(); this.timeout = timeout; myPipeAdv = JxtaServerPipe.newInputPipe(group, pipeAd); this.inputPipe = pipeSvc.createInputPipe(myPipeAdv, this); this.credentialDoc = credentialDoc != null ? credentialDoc : getCredDoc(group); Message openMsg = createOpenMessage(group, myPipeAdv); // create the output pipe and send this message if (peerid == null) { pipeSvc.createOutputPipe(pipeAd, this); } else { pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this); } try { synchronized (acceptLock) { // check connectOutpipe within lock to prevent a race with modification. if (connectOutpipe == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting for " + timeout + " msec"); } acceptLock.wait(timeout); } } } catch (InterruptedException ie) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Interrupted", ie); } Thread.interrupted(); IOException exp = new IOException("Interrupted"); exp.initCause(ie); throw exp; } if (connectOutpipe == null) { throw new IOException("connection timeout"); } // send connect message waiting = true; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a backchannel message"); } connectOutpipe.send(openMsg); // wait for the second op try { synchronized (finalLock) { if (waiting) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting for " + timeout + " msec for back channel to be established"); } finalLock.wait(timeout); // Need to check for creation if (msgr == null) { throw new IOException("connection timeout"); } } } } catch (InterruptedException ie) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Interrupted", ie); } Thread.interrupted(); IOException exp = new IOException("Interrupted"); exp.initCause(ie); throw exp; } setBound(); notifyListeners(PipeStateListener.PIPE_OPENED_EVENT); } /** * creates all the reliability objects */ private void createRLib() { if (isReliable) { if (outgoing == null) { outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout); } if (ros == null) { ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize)); } if (ris == null) { ris = new ReliableInputStream(outgoing, retryTimeout, this); } } } /** * Toggles reliability * * @param reliable Toggles reliability to reliable * @throws IOException if pipe is bound */ public void setReliable(boolean reliable) throws IOException { if (isBound()) { throw new IOException("Can not set reliability after pipe is bound"); } this.isReliable = reliable; } /** * Obtain the cred doc from the group object. * * @param group group context * @return The credDoc value */ protected static StructuredDocument getCredDoc(PeerGroup group) { try { MembershipService membership = group.getMembershipService(); Credential credential = membership.getDefaultCredential(); if (credential != null) { return credential.getDocument(MimeMediaType.XMLUTF8); } } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "failed to get credential", e); } } return null; } /** * get the remote credential doc * * @return Credential StructuredDocument */ public StructuredDocument getCredentialDoc() { return credentialDoc; } /** * Sets the connection credential doc. * If no credentials are set, the default group credential are used. * * @param doc Credential StructuredDocument */ public void setCredentialDoc(StructuredDocument doc) { this.credentialDoc = doc; } /** * Creates a connection request message * * @param group group context * @param pipeAd pipe advertisement * @return the Message object * @throws IOException if an io error occurs */ protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException { Message msg = new Message(); PeerAdvertisement peerAdv = group.getPeerAdvertisement(); if (credentialDoc == null) { credentialDoc = getCredDoc(group); } if (credentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) { throw new IOException("No credentials established to initiate a secure connection"); } try { if (credentialDoc != null) { msg.addMessageElement(JxtaServerPipe.nameSpace, new TextDocumentMessageElement(JxtaServerPipe.credTag, (XMLDocument) credentialDoc, null)); } msg.addMessageElement(JxtaServerPipe.nameSpace, new TextDocumentMessageElement(JxtaServerPipe.reqPipeTag, (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.reliableTag, Boolean.toString(isReliable), null)); msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.directSupportedTag, Boolean.toString(true), null)); msg.addMessageElement(JxtaServerPipe.nameSpace, new TextDocumentMessageElement(JxtaServerPipe.remPeerTag, (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null)); return msg; } catch (Throwable t) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "error getting element stream", t); } return null; } } /** * Sets the bound attribute of the JxtaServerPipe object */ void setBound() { bound = true; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pipe Bound :true"); } } /** * Returns the binding state of the JxtaServerPipe. * * @return true if the ServerSocket successfully bound to an address */ public boolean isBound() { return bound; } /** * Returns an input stream for this socket. * * @return a stream for reading from this socket. * @throws IOException if an I/O error occurs when creating the * input stream. */ public InputPipe getInputPipe() throws IOException { return inputPipe; } /** * Returns remote PeerAdvertisement * * @return remote PeerAdvertisement */ public PeerAdvertisement getRemotePeerAdvertisement() { return remotePeerAdv; } /** * Returns remote PipeAdvertisement * * @return remote PipeAdvertisement */ public PipeAdvertisement getRemotePipeAdvertisement() { return remotePipeAdv; } /** * Sets the remote PeerAdvertisement * * @param peer Remote PeerAdvertisement */ protected void setRemotePeerAdvertisement(PeerAdvertisement peer) { this.remotePeerAdv = peer; } /** * Sets the remote PipeAdvertisement * * @param pipe PipeAdvertisement */ protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) { this.remotePipeAdv = pipe; } /** * Closes this pipe. * * @throws IOException if an I/O error occurs when closing this * socket. */ public void close() throws IOException { sendClose(); closePipe(false); bound = false; } protected void closePipe(boolean fastClose) throws IOException { // close both pipes synchronized (closeLock) { if (closed) { return; } closed = true; bound = false; } if (!fastClose && isReliable && !direct) { /* * This implements linger! */ long quitAt = System.currentTimeMillis() + timeout; while (true) { //FIXME hamada this does not loop if (ros == null || ros.getMaxAck() == ros.getSeqNumber()) { // Nothing to worry about. break; } // By default wait forever. long left = 0; // If timeout is not zero. Then compute the waiting time // left. if (timeout != 0) { left = quitAt - System.currentTimeMillis(); if (left < 0) { // Too late sendClose(); throw new IOException("Close timeout"); } } try { if (!ros.isQueueEmpty()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting for Output stream queue event"); } ros.waitQueueEvent(left); } break; } catch (InterruptedException ie) { // give up, then. throw new IOException("Close interrupted"); } } // We are initiating the close. We do not want to receive // anything more. So we can close the ris right away. ris.close(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -