📄 jxtabidipipe.java
字号:
} } /** * Toggles reliability * *@param reliable Toggles reliability to reliable * @throws IOEXecption if pipe is bound */ public void setReliable(boolean reliable) throws IOException { if (isBound()) { throw new IOException("Can not set reliability after pipe is bound"); } this.isReliable = reliable; } /** * obtain the cred doc from the group object * *@param group group context *@return The credDoc value */ protected static StructuredDocument getCredDoc(PeerGroup group) { try { MembershipService membership = group.getMembershipService(); Credential credential = membership.getDefaultCredential(); if (credential != null) { return credential.getDocument(MimeMediaType.XMLUTF8); } } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to get credential", e); } } return null; } /** * get the remote credential doc * @return Credential StructuredDocument */ public StructuredDocument getCredentialDoc() { return credentialDoc; } /** * Sets the connection credential doc * If no credentials are set, the default group credential will be used * @param doc Credential StructuredDocument */ public void setCredentialDoc(StructuredDocument doc) { this.myCredentialDoc = doc; } /** * Create a connection request message * *@param group group context *@param pipeAd pipe advertisement *@return the Message object */ protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException { Message msg = new Message(); PeerAdvertisement peerAdv = group.getPeerAdvertisement(); if (myCredentialDoc == null) { myCredentialDoc = getCredDoc(group); } if (myCredentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) { throw new IOException("No credentials established to initiate a secure connection"); } try { if (myCredentialDoc != null) { msg.addMessageElement(JxtaServerPipe.nameSpace, new TextDocumentMessageElement(JxtaServerPipe.credTag, (XMLDocument) myCredentialDoc, null)); } msg.addMessageElement(JxtaServerPipe.nameSpace, new TextDocumentMessageElement(JxtaServerPipe.reqPipeTag, (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.reliableTag, Boolean.toString(isReliable), null)); msg.addMessageElement(JxtaServerPipe.nameSpace, new TextDocumentMessageElement(JxtaServerPipe.remPeerTag, (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null)); return msg; } catch (Throwable t) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("error getting element stream", t); } return null; } } /** * Accepts a connection * *@param s the accepted connection. *@exception IOException if an I/O error occurs when accepting the * connection. */ protected void accept(JxtaBiDiPipe s) throws IOException { if (closed) { throw new IOException("Pipe is closed"); } if (!isBound()) { throw new IOException("Pipe not bound"); } try { synchronized (acceptLock) { // check connectOutpipe within lock to prevent a race with modification. if (connectOutpipe == null) { acceptLock.wait(timeout); } } } catch (InterruptedException ie) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Interrupted", ie); } } } /** * Sets the bound attribute of the JxtaServerPipe object */ void setBound() { bound = true; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Pipe Bound :true"); } } /** * Returns the binding state of the JxtaServerPipe. * * @return true if the ServerSocket successfully bound to an address */ public boolean isBound() { return bound; } /** * Returns an input stream for this socket. * *@return a stream for reading from this socket. *@exception IOException if an I/O error occurs when creating the * input stream. */ public InputPipe getInputPipe() throws IOException { return in; } protected synchronized void waiter(int timeMilisecs) { try { wait(timeMilisecs); } catch(Exception e) { LOG.error("error waiting",e); } } /** * Returns remote PeerAdvertisement * @return remote PeerAdvertisement */ public PeerAdvertisement getRemotePeerAdvertisement() { return remotePeerAdv; } /** * Returns remote PipeAdvertisement * @return remote PipeAdvertisement */ public PipeAdvertisement getRemotePipeAdvertisement() { return remotePipeAdv; } /** * Sets the remote PeerAdvertisement * @param peer Remote PeerAdvertisement */ protected void setRemotePeerAdvertisement(PeerAdvertisement peer) { this.remotePeerAdv = peer; } /** * Sets the remote PipeAdvertisement * @param pipe PipeAdvertisement */ protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) { this.remotePipeAdv = pipe; } /** * Closes this pipe. * *@exception IOException if an I/O error occurs when closing this * socket. */ public void close() throws IOException { sendClose(); closePipe(); } protected void closePipe() throws IOException { // close both pipes synchronized (closeLock) { if (closed) { return; } closed = true; bound = false; } if (isReliable) { long quitAt = System.currentTimeMillis() + timeout; while (true) { if (ros == null) { // Nothing to worry about. break; } // ros will not take any new message, now. ros.setClosing(); if (ros.getMaxAck() == ros.getSeqNumber()) { break; } // By default wait forever. long left = 0; // If timeout is not zero. Then compute the waiting time // left. if (timeout != 0) { left = quitAt - System.currentTimeMillis(); if (left < 0) { // Too late sendClose(); throw new IOException("Close timeout"); } } try { if (!ros.isQueueEmpty()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for Output stream queue event"); } ros.waitQueueEvent(left); } break; } catch (InterruptedException ie) { // give up, then. throw new IOException("Close interrupted"); } } // We are initiating the close. We do not want to receive // anything more. So we can close the ris right away. ris.close(); } if (isReliable && ros != null) { ros.close(); } // close the pipe in.close(); msgr.close(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Pipe close complete"); } if (eventListener != null) { try { eventListener.pipeEvent(PIPE_CLOSED_EVENT); } catch (Throwable th) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("error during pipe event callback", th); } } } } /** * Sets the inputPipe attribute of the JxtaBiDiPipe object * *@param in The new inputPipe value */ protected void setInputPipe(InputPipe in) { this.in = in; } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { Message message = event.getMessage(); if (message == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Empty event"); } return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Pipe message arrived"); } MessageElement element = null; if (!bound) { // look for a remote pipe answer element = (MessageElement) message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPipeTag); if (element != null) { // connect response try { StructuredDocument CredDoc=null; InputStream in = element.getStream(); remotePipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(element.getMimeType(), in); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Recevied a pipe Advertisement :" +remotePipeAdv.getName()); } element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPeerTag); if (element != null) { in = element.getStream(); remotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(element.getMimeType(), in); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Recevied an Peer Advertisement :" +remotePeerAdv.getName()); } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn(" BAD connect response"); } return; } element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.credTag); if (element != null) { in = element.getStream(); CredDoc = (StructuredDocument) StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), in); } if (pipeAdv.getType().equals(PipeService.UnicastSecureType) && (CredDoc ==null || !checkCred(CredDoc))) { // we're done here if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Invalid remote credential doc"); } return; } element = message.getMessageElement (JxtaServerPipe.nameSpace, JxtaServerPipe.reliableTag); if (element != null) { isReliable = (Boolean.valueOf(element.toString())).booleanValue(); } msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Reliability set to :"+isReliable); } if (isReliable) { createRLib(); } synchronized (finalLock) { waiting = false; finalLock.notifyAll(); } } catch (IOException e) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("failed to process response message", e); } } return; } } if(isReliable) { //let reliabilty deal with the message receiveMessage(message); return; } if (!hasClose(message)) { push(event); } } private boolean hasClose(Message message) { // look for close request MessageElement element = (MessageElement) message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.closeTag); if (element != null) { try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Recevied a pipe close request, closing pipes"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -