📄 jxtabidipipe.java
字号:
if (isReliable && ros != null) { ros.close(); } // close the pipe inputPipe.close(); msgr.close(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pipe close complete"); } notifyListeners(PIPE_CLOSED_EVENT); } private void notifyListeners(int event) { try { if (eventListener != null) { eventListener.pipeEvent(event); } else if (stateListener != null) { stateListener.stateEvent(this, event); } } catch (Throwable th) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "error during pipe event callback", th); } } } /** * Sets the inputPipe attribute of the JxtaBiDiPipe object * * @param inputPipe The new inputPipe value */ protected void setInputPipe(InputPipe inputPipe) { this.inputPipe = inputPipe; } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { Message message = event.getMessage(); if (message == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Empty event"); } return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pipe message arrived"); } MessageElement element; if (!bound) { // look for a remote pipe answer element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPipeTag); if (element != null) { // connect response try { XMLDocument CredDoc = null; XMLDocument remotePipeDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element); remotePipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(remotePipeDoc); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Recevied a pipe Advertisement :" + remotePipeAdv.getName()); } element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPeerTag); if (element != null) { XMLDocument remotePeerDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element); remotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(remotePeerDoc); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Recevied an Peer Advertisement :" + remotePeerAdv.getName()); } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning(" BAD connect response"); } return; } element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.credTag); if (element != null) { CredDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element); } if (pipeAdv.getType().equals(PipeService.UnicastSecureType) && (CredDoc == null || !checkCred(CredDoc))) { // we're done here if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Missing remote credential doc"); } return; } element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.reliableTag); if (element != null) { isReliable = Boolean.valueOf(element.toString()); } boolean directSupported = false; element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.directSupportedTag); if (element != null) { directSupported = Boolean.valueOf(element.toString()); } if (directSupported) { msgr = getDirectMessenger(group, remotePipeAdv, remotePeerAdv); if (msgr != null) { this.direct = true; } else { msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv); } } else { msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Reliability set to :" + isReliable); } if (isReliable && !direct) { createRLib(); } synchronized (finalLock) { waiting = false; finalLock.notifyAll(); } } catch (IOException e) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "failed to process response message", e); } } return; } } if (isReliable && !direct) { // let reliabilty deal with the message receiveMessage(message); return; } if (!hasClose(message)) { push(event); } } private boolean hasClose(Message message) { // look for close request MessageElement element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.closeTag); if (element != null) { try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Recevied a pipe close request, closing pipes"); } if (ros != null) { ros.hardClose(); } closePipe(false); } catch (IOException ie) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "failed during close", ie); } } return true; } return false; } private void receiveMessage(Message message) { Iterator<MessageElement> i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK); if (i.hasNext()) { if (ros != null) { ros.recv(message); } return; } i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK); if (i.hasNext()) { // It can happen that we receive messages for the input stream // while we have not finished creating it. try { synchronized (finalLock) { while (waiting) { finalLock.wait(timeout); } } } catch (InterruptedException ie) {// ignored } if (ris != null) { ris.recv(message); } } } /** * Gets the Maximum Retry Timeout of the reliability layer * * @return The maximum retry Timeout value */ public synchronized int getMaxRetryTimeout() { return maxRetryTimeout; } /** * Gets the Maximum Retry Timeout of the reliability layer * * @param maxRetryTimeout The new maximum retry timeout value * @throws IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout */ public synchronized void setMaxRetryTimeout(int maxRetryTimeout) { if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) { throw new IllegalArgumentException( "Invalid Maximum retry timeout :" + maxRetryTimeout + " Exceed Global maximum retry timeout :" + MAXRETRYTIMEOUT); } this.maxRetryTimeout = maxRetryTimeout; } /** * Gets the Retry Timeout of the reliability layer * * @return The retry Timeout value */ public synchronized int getRetryTimeout() { return retryTimeout; } /** * Sets the Retry Timeout of the underlying reliability layer * . * In reliable mode it is possible for this call to block * trying to obtain a lock on reliable input stream * * @param retryTimeout The new retry timeout value * @throws IOException if an I/O error occurs */ public synchronized void setRetryTimeout(int retryTimeout) throws IOException { if (timeout <= 0) { throw new IllegalArgumentException("Invalid Socket timeout :" + retryTimeout); } this.retryTimeout = retryTimeout; if (outgoing != null) { outgoing.setTimeout(retryTimeout); } } /** * When in reliable mode, gets the Reliable library window size * * @return The windowSize value */ public synchronized int getWindowSize() { return windowSize; } /** * When in reliable mode, sets the Reliable library window size * * @param windowSize The new window size value * @throws IOException if an I/O error occurs */ public synchronized void setWindowSize(int windowSize) throws IOException { if (isBound()) { throw new IOException("Socket bound. Can not change the window size"); } this.windowSize = windowSize; } /** * This method is invoked by the Reliablity library for each incoming data message * * @param message Incoming message */ public void processIncomingMessage(Message message) { if (!hasClose(message)) { PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID) inputPipe.getPipeID()); push(event); } } private void push(PipeMsgEvent event) { if (msgListener == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("push message onto queue"); } queue.offer(event); } else { dequeue(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("calling message listener"); } msgListener.pipeMsgEvent(event); } } /** * Send a message * <p/> * <code>Messenger</code> * * @param msg Message to send to the remote side * @return true if message was successfully enqueued * @throws IOException if the underlying messenger breaks, either due to * a physical address change, reliability issue. * @see net.jxta.endpoint.Message */ public boolean sendMessage(Message msg) throws IOException { if (isReliable && !direct) { int seqn = ros.send(msg); return (seqn > 0); } else { try { if (msgr instanceof TcpMessenger) { ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true); return true; } else { return msgr.sendMessage(msg, null, null); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -