📄 jxtasocket.java
字号:
long left = 0; // If timeout is not zero. Then compute the waiting time // left. if (timeout != 0) { left = quitAt - System.currentTimeMillis(); if (left < 0) { // Too late closeCommon(); throw new IOException("Close timeout"); } } try { if (!ros.isQueueEmpty()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for Output stream queue event"); } ros.waitQueueEvent(left); } break; } catch (InterruptedException ie) { // give up, then. throw new IOException("Close interrupted"); } } } closeCommon(); } /** * In stream mode, closes everything but the input * stream. closeFromRemote() leaves it open until EOF is * reached. That is, until all messages in the input queue have * been read. At which point, ris will close() itself. */ protected void closeCommon() throws IOException { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Shutting down socket"); } queue.interrupt(); if (isStream) { // close the reliable streams. if (ros != null) { ros.close(); } if (ris != null) { ris.close(); } } // close pipe, messenger, and queue if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Closing input pipe"); } in.close(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Closing messenger"); } msgr.close(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Closing message queue"); } queue.close(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Close complete"); } } /** * Sets the inputPipe attribute of the JxtaSocket object * *@param in The new inputPipe value */ protected void setInputPipe(InputPipe in) { this.in = in; } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { Message message = event.getMessage(); if (message == null) { return; } MessageElement element = null; if (!bound) { // look for a remote pipe answer element = (MessageElement) message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.remPipeTag); if (element != null) { // connect response try { PeerAdvertisement peerAdv = null; InputStream in = element.getStream(); PipeAdvertisement pa = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(element.getMimeType(), in); element = message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.remPeerTag); if (element != null) { in = element.getStream(); peerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(element.getMimeType(), in); } else { return; } element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.credTag); if (element != null) { in = element.getStream(); credentialDoc = (StructuredDocument) StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), in); } element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.streamTag); if (element != null) { isStream = (element.toString().equals("true")); } msgr = lightweightOutputPipe(group, pa, peerAdv); if (msgr == null) { // let the connection attempt timeout if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Unable to obtain a back messenger"); } return; } if (isStream) { // Create the input stream right away, otherwise // the first few messages from remote will be lost, unless // we use an intermediate queue. // FIXME: it would be even better if we could create the // input stream BEFORE having the output pipe resolved, but // that would force us to have the MsrgAdaptor block // until we can give it the real pipe or msgr... later. createRis(); } synchronized (finalLock) { waiting = false; finalLock.notifyAll(); } } catch (IOException e) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("failed to process response message", e); } } } } //net.jxta.impl.util.MessageUtil.printMessageStats(message, true); // look for close request element = (MessageElement) message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.closeTag); if (element != null) { if (element.toString().equals("close")) { try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received a close request"); } closeFromRemote(); } catch (IOException ie) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("failed during closeFromRemote", ie); } } } else if (element.toString().equals("closeACK")) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received a close acknowledgement"); } synchronized(closeLock) { closeLock.notify(); } } } if (!isStream) { // isthere data ? element = (MessageElement) message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.dataTag); if (element == null) { return; } try { queue.push(element, -1); } catch (InterruptedException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Interrupted", e); } } return; } Iterator i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK); if (i != null && i.hasNext()) { if (ros != null) { ros.recv(message); } return; } i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK); if (i != null && i.hasNext()) { // It can happen that we receive messages for the input stream // while we have not finished creating it. try { synchronized (finalLock) { while (waiting) { finalLock.wait(timeout); } } } catch (InterruptedException ie) {} if (ris != null) { ris.recv(message); } } } /** * {@inheritDoc} */ public void outputPipeEvent(OutputPipeEvent event) { OutputPipe op = event.getOutputPipe(); if (op.getAdvertisement() == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("The output pipe has no internal pipe advertisement. Continueing anyway."); } } if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) { synchronized (acceptLock) { // modify op within lock to prevent a race with the if. if (connectOutpipe == null) { connectOutpipe = op; op = null; // if not null, will be closed. } acceptLock.notifyAll(); } // Ooops one too many, we were too fast re-trying. if (op != null) { op.close(); } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unexpected OutputPipe :"+op); } } } /** * A lightweight output pipe constructor, note the return type * Since all the info needed is available, there's no need for to use the pipe service * to resolve the pipe we have all we need to construct a messenger. * *@param group group context *@param pipeAdv Remote Pipe Advertisement *@param peer Remote Peer Advertisment *@return Messenger */ protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) { EndpointService endpoint = group.getEndpointService(); ID opId = pipeAdv.getPipeID(); String destPeer = (peer.getPeerID().getUniqueValue()).toString(); // Get an endpoint messenger to that address EndpointAddress addr; if (pipeAdv.getType().equals(PipeService.UnicastType)) { addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString()); } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) { addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString()); } else { // not a supported type throw new IllegalArgumentException(pipeAdv.getType()+" is not a supported pipe type"); } return endpoint.getMessenger(addr, null); } private void sendClose() { Message msg = new Message(); msg.addMessageElement(JxtaServerSocket.nameSpace, new StringMessageElement(JxtaServerSocket.closeTag, "close", null)); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending a close request"); } synchronized(closeLock) { try { //allow for any last bits to hit the wire closeLock.wait(200); } catch (InterruptedException ie) {} } msgr.sendMessage(msg, null, null, new CloseListener()); synchronized(closeLock) { try { closeLock.wait(timeout); } catch (InterruptedException ie) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("wait for SendClose interrupted"); } } } } private void sendCloseACK() { Message msg = new Message(); msg.addMessageElement(JxtaServerSocket.nameSpace, new StringMessageElement(JxtaServerSocket.closeTag, "closeACK", null)); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending a close acknowledgement"); } msgr.sendMessage(msg, null, null, new CloseListener()); } /** * {@inheritDoc} */ public synchronized int getSoTimeout() throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } return soTimeout; } /** * {@inheritDoc} */ public synchronized void setSoTimeout(int soTimeout) throws SocketException { if (soTimeout < 0) { throw new IllegalArgumentException("Invalid Socket timeout :"+soTimeout); } this.soTimeout = soTimeout; if (ris != null) { ris.setTimeout(soTimeout); } } /** * Gets the Maximum Retry Timeout of the reliability layer * * @return The maximum retry Timeout value */ public synchronized int getMaxRetryTimeout() { return maxRetryTimeout; } /** * Gets the Maximum Retry Timeout of the reliability layer * * @param maxRetryTimeout The new maximum retry timeout value * @exception IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout */ public synchronized void setMaxRetryTimeout(int maxRetryTimeout) { if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) { throw new IllegalArgumentException("Invalid Maximum retry timeout :"+maxRetryTimeout+" Exceed Global maximum retry timeout :"+MAXRETRYTIMEOUT); } this.maxRetryTimeout = maxRetryTimeout; } /** * Gets the Retry Timeout of the reliability layer * * @return The retry Timeout value */ public synchronized int getRetryTimeout() { return retryTimeout; } /** * Sets the Retry Timeout of the underlying reliability layer * . * In reliable mode it is possible for this call to block * trying to obtain a lock on reliable input stream * * @param retryTimeout The new retry timeout value * @exception SocketException if an I/O error occurs */ public synchronized void setRetryTimeout(int retryTimeout) throws SocketException { if (retryTimeout <= 0 || retryTimeout > maxRetryTimeout) { throw new IllegalArgumentException("Invalid Retry Socket timeout :"+retryTimeout); } this.retryTimeout = retryTimeout; if (outgoing != null) { outgoing.setTimeout(retryTimeout); } } /** * When in reliable mode, gets the Reliable library window size * * @return The windowSize value * @exception IOException if an I/O error occurs */ public synchronized int getWindowSize() { return windowSize; } /** * When in reliable mode, sets the Reliable library window size * * @param windowSize The new window size value * @exception IOException if an I/O error occurs */ public synchronized void setWindowSize(int windowSize) throws SocketException { if (isBound()) { throw new SocketException("Socket bound. Can not change the window size");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -