📄 jxtasocket.java
字号:
/** * Sets our credential to be used by this socket connection. If no * credentials are set, the default group credential will be used. * * @param localCredential The credential to be used for connection responses * or <tt>null</tt> if the default credential is to be used. */ public void setCredential(Credential localCredential) { if (localCredential == null) { this.localCredential = localCredential; } else { try { MembershipService membership = group.getMembershipService(); this.localCredential = membership.getDefaultCredential(); } catch (Exception failed) { this.localCredential = null; } } } /** * Create a connection request/response message * * @param group The group in which the socket is being used. * @param pipeAdv Advertisement for our ephemeral pipe. * @param credential Our credential or null if default credential is to * be used. * @param isReliable The socket is to be reliable (stream). * @param initiator indicates initiator * @return The message. * @throws IOException if an I/O error occurs */ protected Message createConnectMessage(PeerGroup group, PipeAdvertisement pipeAdv, Credential credential, boolean isReliable, boolean initiator) throws IOException { Message msg = new Message(); if (credential == null) { credential = getDefaultCredential(group); } if ((credential == null) && PipeService.UnicastSecureType.equals(pipeAdv.getType())) { throw new IOException("Credentials must be established to initiate a secure connection."); } if (credential != null) { try { XMLDocument credDoc = (XMLDocument) credential.getDocument(MimeMediaType.XMLUTF8); msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, new TextDocumentMessageElement(JxtaServerSocket.credTag, credDoc, null)); } catch (Exception failed) { IOException failure = new IOException("Could not generate credential element."); failure.initCause(failed); throw failure; } } msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, new TextDocumentMessageElement(initiator ? JxtaServerSocket.reqPipeTag : JxtaServerSocket.remPipeTag, (XMLDocument) pipeAdv.getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, new TextDocumentMessageElement(JxtaServerSocket.remPeerTag, (XMLDocument) group.getPeerAdvertisement().getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, new StringMessageElement(JxtaServerSocket.streamTag, Boolean.toString(isReliable), null)); return msg; } /** * Create a pipe advertisement for an ephemeral pipe (w/random pipe ID) from an existing pipe advertisement. * The specified pipe adveritsement is only used for the name and type * * @param pipeAdv to get the basename and type from * @return A new pipe advertisement for an ephemeral pipe. */ protected static PipeAdvertisement newEphemeralPipeAdv(PipeAdvertisement pipeAdv) { PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); PeerGroupID gid = (PeerGroupID) ((PipeID) pipeAdv.getPipeID()).getPeerGroupID(); adv.setPipeID(IDFactory.newPipeID(gid)); adv.setName(pipeAdv.getName() + ".remote"); adv.setType(pipeAdv.getType()); return adv; } /** * {@inheritDoc} */ @Override public boolean isBound() { return bound; } /** * Sets whether this socket is currently bound or not. A socket is considered bound if the local resources required * in order to interact with a remote peer are allocated and open. * * @param boundState The new bound state. */ private void setBound(boolean boundState) { bound = boundState; } /** * {@inheritDoc} */ @Override public boolean isConnected() { return connected; } /** * Sets whether this socket is currently connected or not. A socket is * considered connected if is believed that the Socket's remote peer has * the resources required in order to interact with a local peer (us) * allocated and open. * * @param connectedState The new connected state. */ private void setConnected(boolean connectedState) { connected = connectedState; } /** * Opens our ephemeral input pipe enabling us to receive messages. * * @throws IOException Thrown for errors in creating the input pipe. */ private void bind() throws IOException { this.localEphemeralPipeIn = pipeSvc.createInputPipe(localEphemeralPipeAdv, this); // The socket is bound now. setBound(true); } /** * Create an appropriate Outgoing Adaptor. This method exists primarily * so that sub-classes can substitute a different Outgoing sub-class. * * @param msgr The messenger to be wrapped. * @param timeout The timeout value; * @return Outgoing The messenger wrapped in an appropriate adaptor. */ protected Outgoing makeOutgoing(Messenger msgr, long timeout) { return new OutgoingMsgrAdaptor(msgr, (int) timeout); } /** * Opens the ephemeral output pipe for the remote peer. Also opens the * input and output streams. (delaying adds complexity). * * @throws IOException Thrown for errors in opening resources. */ private void connect() throws IOException { remoteEphemeralPipeMsgr = lightweightOutputPipe(group, remoteEphemeralPipeAdv, remotePeerAdv); if (remoteEphemeralPipeMsgr == null) { throw new IOException("Could not create messenger back to connecting peer"); } // Force the buffer size smaller if user set it too high. if (remoteEphemeralPipeMsgr.getMTU() < outputBufferSize) { outputBufferSize = Math.min((int) remoteEphemeralPipeMsgr.getMTU(), DEFAULT_OUTPUT_BUFFER_SIZE); } if (outputBufferSize == -1) { outputBufferSize = Math.min((int) remoteEphemeralPipeMsgr.getMTU(), DEFAULT_OUTPUT_BUFFER_SIZE); } // Force the creation of the inputStream now. Waiting until someone // calls getInputStream() would likely cause us to drop messages. if (isReliable) { outgoing = makeOutgoing(remoteEphemeralPipeMsgr, retryTimeout); ris = new ReliableInputStream(outgoing, soTimeout); ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize)); try { ros.setSendBufferSize(outputBufferSize); } catch (IOException ignored) {// it's only a preference... } } else { nonReliableInputStream = new JxtaSocketInputStream(this, windowSize); nonReliableOutputStream = new JxtaSocketOutputStream(this, outputBufferSize); } // the socket is now connected! setConnected(true); } /** * Returns the internal output stream buffer size * * @return the internal buffer size. * @deprecated Use the standard {@link #getSendBufferSize()} method instead. */ @Deprecated public int getOutputStreamBufferSize() { return (outputBufferSize == -1) ? DEFAULT_OUTPUT_BUFFER_SIZE : outputBufferSize; } /** * Sets the internal output stream buffer size. * * @param size The internal buffer size. * @throws IOException if an I/O error occurs * @deprecated Use the standard {@link #setSendBufferSize(int)} method instead. */ @Deprecated public void setOutputStreamBufferSize(int size) throws IOException { setSendBufferSize(size); } /** * {@inheritDoc} */ @Override public InputStream getInputStream() throws IOException { checkState(); if (isInputShutdown()) { throw new SocketException("Input already shutdown."); } if (isReliable) { return ris; } else { return nonReliableInputStream; } } /** * {@inheritDoc} */ @Override public OutputStream getOutputStream() throws IOException { checkState(); if (isOutputShutdown()) { throw new SocketException("Output already shutdown."); } return isReliable ? ros : nonReliableOutputStream; } /** * {@inheritDoc} * <p/> * We hard-close both the input and output streams. Nothing can be * written or read to the socket after this method. Any queued incoming * data is discarded. Any additional incoming messages will be ACKed but * their content will be discarded. We will attempt to send any data which * has already been written to the OutputStream. * <p/> * Once the output queue is empty we will send a close message to tell * the remote side that no more data is coming. * <p/> * This is the only method in this class which is {@code synchronized}. * All others use internal synchronization. */ @Override public synchronized void close() throws IOException { try { synchronized (closeLock) { long closeEndsAt = System.currentTimeMillis() + timeout; if (closeEndsAt < timeout) { closeEndsAt = Long.MAX_VALUE; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Closing " + this + " timeout=" + timeout + "ms."); } if (closed) { return; } closed = true; shutdownOutput(); shutdownInput(); while (isConnected()) { long closingFor = closeEndsAt - System.currentTimeMillis(); if (closingFor <= 0) { break; } if (isReliable) { try { if (ros.isQueueEmpty()) { // Only send a close if the queue is empty. sendClose(); } else { // Reliable Output Stream not empty. Don't send close yet. ros.waitQueueEmpty(1000); continue; } } catch (InterruptedException woken) { Thread.interrupted(); break; } } else { sendClose(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent close, awaiting ACK for " + this); } // Don't send our close too many times. try { long nextTry = Math.min(20000, closingFor); if (nextTry > 0 && isConnected()) { closeLock.wait(nextTry); } } catch (InterruptedException woken) { Thread.interrupted(); break; } } if (isConnected()) { // Last ditch close attempt if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Still connected at end of timeout. Forcing closed." + this); } sendClose(); throw new SocketTimeoutException("Failed to receive close ack from remote connection."); } } } finally { // No matter what else happens at the end of close() we are no // longer connected and no longer bound. setConnected(false); unbind(); if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Socket closed : " + this); } } } /** * This is called when closure is initiated on the remote side. By * convention we receive the close message only after we have ACKed the last * data segment. * <p/> * We soft-close the InputStream which allows us to read data already * received. * <p/> * We hard-close our output stream and discard all queued, unACKed data * as the remote side doesn't want to receive it (the remote side has * already unbound themselves, they just want our close ACK in order to clean * up.) * * @throws IOException if an I/O error occurs */ protected void closeFromRemote() throws IOException { synchronized (closeLock) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.INFO)) { LOG.info("Received a remote close request." + this); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -