📄 jxtasocket.java
字号:
if (isConnected()) { setConnected(false); if (isReliable) { ris.softClose(); ros.hardClose(); } else { nonReliableInputStream.softClose(); nonReliableOutputStream.hardClose(); } } // If we are still bound then send them a close ACK. if (isBound() && (ros != null && ros.isQueueEmpty())) { // do not ack until the queue is empty sendCloseACK(); } if (closeAckReceived) { closeLock.notifyAll(); } } } /** * Closes the input pipe which we use to receive messages and the messenger * used for sending messages. */ protected synchronized void unbind() { if (!isBound()) { return; } if (isReliable) { try { ris.close(); } catch (IOException ignored) {// ignored } ros.hardClose(); } else { nonReliableInputStream.close(); nonReliableOutputStream.hardClose(); } // We are no longer bound setBound(false); // close pipe and messenger if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Closing ephemeral input pipe"); } localEphemeralPipeIn.close(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Closing remote ephemeral pipe messenger"); } if(null != outgoing) { outgoing.close(); } remoteEphemeralPipeMsgr.close(); } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.log(Level.FINER, "Pipe Message Event for " + this + "\n\t" + event.getMessage() + " for " + event.getPipeID()); } Message message = event.getMessage(); if (message == null) { return; } // look for close request/ack MessageElement element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.closeTag); if (element != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Handling a close message " + this + " : " + element.toString()); } if (JxtaServerSocket.closeReqValue.equals(element.toString())) { try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Received a close request"); } closeFromRemote(); } catch (IOException ie) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "failed during closeFromRemote", ie); } } } else if (JxtaServerSocket.closeAckValue.equals(element.toString())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Received a close acknowledgement"); } synchronized (closeLock) { closeAckReceived = true; setConnected(false); closeLock.notifyAll(); } } return; } if (!isConnected()) { // connect response if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Processing connect response : " + message); } // look for a remote pipe answer element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.remPipeTag); PipeAdvertisement incomingPipeAdv = null; if (element != null) { try { XMLDocument pipeAdvDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element); incomingPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(pipeAdvDoc); } catch (IOException badPipeAdv) {// ignored } } element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.remPeerTag); PeerAdvertisement incomingRemotePeerAdv = null; if (element != null) { try { XMLDocument peerAdvDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element); incomingRemotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(peerAdvDoc); } catch (IOException badPeerAdv) {// ignored } } element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.credTag); Credential incomingCredential = null; if (element != null) { try { StructuredDocument incomingCredentialDoc = StructuredDocumentFactory.newStructuredDocument(element); incomingCredential = group.getMembershipService().makeCredential(incomingCredentialDoc); } catch (Exception failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Unable to generate credential for " + this, failed); } } } element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.streamTag); boolean incomingIsReliable = isReliable; if (element != null) { incomingIsReliable = Boolean.valueOf(element.toString()); } if ((null != incomingPipeAdv) && (null != incomingRemotePeerAdv)) { if ((null != remotePeerID) && (remotePeerID != incomingRemotePeerAdv.getPeerID())) { // let the connection attempt timeout if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning( "Connection response from wrong peer! " + remotePeerID + " != " + incomingRemotePeerAdv.getPeerID()); } return; } synchronized (socketConnectLock) { if (!isConnected()) { remoteCredential = incomingCredential; remotePeerAdv = incomingRemotePeerAdv; remotePeerID = incomingRemotePeerAdv.getPeerID(); remoteEphemeralPipeAdv = incomingPipeAdv; isReliable = incomingIsReliable; // Force the creation of the inputStream now. Waiting until someone // calls getInputStream() would likely cause us to drop messages. // FIXME: it would be even better if we could create the // input stream BEFORE having the output pipe resolved, but // that would force us to have the MsrgAdaptor block // until we can give it the real pipe or msgr... later. try { connect(); } catch (IOException failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Connection failed : " + this, failed); } return; } socketConnectLock.notify(); if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "New Socket Connection : " + this); } } } return; } } // Often we are called to handle data before the socket is connected. synchronized (socketConnectLock) { long timeoutAt = System.currentTimeMillis() + timeout; if (timeoutAt < timeout) { timeoutAt = Long.MAX_VALUE; } while (!isClosed() && !isConnected()) { long waitFor = timeoutAt - System.currentTimeMillis(); if (waitFor <= 0) { break; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Holding " + message + " for " + timeout); } try { socketConnectLock.wait(timeout); } catch (InterruptedException woken) { return; } } } if (!isReliable) { // is there data ? Iterator<MessageElement> dataElements = message.getMessageElements(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.dataTag); while (dataElements.hasNext()) { MessageElement anElement = dataElements.next(); nonReliableInputStream.enqueue(anElement); } } else { // Give ACKs to the Reliable Output Stream if (ros != null) { ros.recv(message); } // Give data blocks to the Reliable Input Stream if (ris != null) { ris.recv(message); } } } /** * {@inheritDoc} */ public void outputPipeEvent(OutputPipeEvent event) { OutputPipe op = event.getOutputPipe(); if (op.getAdvertisement() == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("The output pipe has no internal pipe advertisement. discarding event"); } return; } // name can be different, therefore check the id + type if (pipeAdv.getID().equals(op.getAdvertisement().getID()) && pipeAdv.getType().equals(op.getAdvertisement().getType())) { synchronized (pipeResolveLock) { // modify op within lock to prevent a race with the if. if (connectOutpipe == null) { connectOutpipe = op; // if not null, will be closed. op = null; } pipeResolveLock.notify(); } // Ooops one too many, we were too fast re-trying. if (op != null) { op.close(); } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Unexpected OutputPipe :" + op); } } } /** * A lightweight output pipe constructor, note the return type * Since all the info needed is available, there's no need for to use the pipe service * to resolve the pipe we have all we need to construct a messenger. * * @param group group context * @param pipeAdv Remote Pipe Advertisement * @param peerAdv Remote Peer Advertisement * @return Messenger */ protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peerAdv) { EndpointService endpoint = group.getEndpointService(); ID opId = pipeAdv.getPipeID(); String destPeer = peerAdv.getPeerID().getUniqueValue().toString(); // Get an endpoint messenger to that address EndpointAddress addr; RouteAdvertisement routeHint = net.jxta.impl.endpoint.EndpointUtils.extractRouteAdv(peerAdv); if (pipeAdv.getType().equals(PipeService.UnicastType)) { addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString()); } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) { addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString()); } else { // not a supported type throw new IllegalArgumentException(pipeAdv.getType() + " is not a supported pipe type"); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("New pipe lightweight messenger for " + addr); } return endpoint.getMessenger(addr, routeHint); } /** * Sends a close message * @throws IOException if an io error occurs */ private void sendClose() throws IOException { Message msg = new Message(); msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE , new StringMessageElement(JxtaServerSocket.closeTag, JxtaServerSocket.closeReqValue, null)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a close request " + this + " : " + msg); } remoteEphemeralPipeMsgr.sendMessageN(msg, null, null); } /** * Sends a close ack message * @throws IOException if an io error occurs */ private void sendCloseACK() throws IOException { Message msg = new Message(); msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, new StringMessageElement(JxtaServerSocket.closeTag, JxtaServerSocket.closeAckValue, null)); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a close ACK " + this + " : " + msg); } remoteEphemeralPipeMsgr.sendMessageN(msg, null, null); } /** * {@inheritDoc} */ @Override public int getSoTimeout() throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } if (timeout > Integer.MAX_VALUE) { return 0; } else { return (int) timeout; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -