📄 jxtaserversocket.java
字号:
* * <p/>The default timeout is 60 seconds (60000ms). */ public Socket accept() throws IOException { if (isClosed()) { throw new SocketException("Socket is closed"); } if (!isBound()) { throw new SocketException("Socket is not bound yet"); } try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for a connection"); } while (true) { Message msg = (Message) queue.poll(timeout, TimeUnit.MILLISECONDS); if (msg == null) { throw new SocketTimeoutException("Timeout reached"); } JxtaSocket socket = processMessage(msg); // make sure we have a socket returning if (socket != null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for a connection"); } return socket; } else if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No connection"); } } } catch (InterruptedException ie) { throw new SocketException("interrupted"); } } /** * Gets the group associated with this JxtaServerSocket object * * @return The group value */ public PeerGroup getGroup() { return group; } /** * Gets the PipeAdvertisement associated with this JxtaServerSocket object * * @return The pipeAdv value */ public PipeAdvertisement getPipeAdv() { return pipeadv; } /** * {@inheritDoc} * * <p/>The default timeout is 60 seconds (60000ms). */ public void close() throws IOException { synchronized (closeLock) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Closing ServerSocket"); } if (isClosed()) { return; } if (isBound()) { // close all the pipe serverPipe.close(); } queue.clear(); closed = true; } } /** * Sets the bound attribute of the JxtaServerSocket object */ protected void setBound() { bound = true; } /** * {@inheritDoc} * * <p/>The default timeout is 60 seconds (60000ms). */ public synchronized int getSoTimeout() throws IOException { if (isClosed()) { throw new SocketException("Socket is closed"); } if (timeout > Integer.MAX_VALUE) { return 0; } else { return (int) timeout; } } /** * {@inheritDoc} * * <p/>The default timeout is 60 seconds (60000ms). */ public synchronized void setSoTimeout(int timeout) throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } if (timeout < 0) { throw new IllegalArgumentException("timeout must be >= 0"); } if (0 == timeout) { this.timeout = Long.MAX_VALUE; } else { this.timeout = (long) timeout; } } /** * {@inheritDoc} */ public boolean isClosed() { synchronized (closeLock) { return closed; } } /** * {@inheritDoc} */ public boolean isBound() { return bound; } /** * {@inheritDoc} */ public void pipeMsgEvent(PipeMsgEvent event) { // deal with messages as they come in Message message = event.getMessage(); if (message == null) { return; } boolean pushed = false; try { pushed = queue.offer(message, timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Interrupted", e); } } if (!pushed && LOG.isEnabledFor(Level.WARN)) { LOG.warn("backlog queue full, connect request dropped"); } } /** * processMessage is the main mechanism in establishing bi-directional connections * <p> * It accepts connection messages and constructs a JxtaSocket with a ephemeral * InputPipe and a messenger. * <p> * The ResponseMessage is created and sent. * * @param msg The client connection request (assumed not null) * @return JxtaSocket Which may be null if an error occurs. */ private JxtaSocket processMessage(Message msg) { PipeAdvertisement outputPipeAdv = null; PeerAdvertisement peerAdv = null; StructuredDocument credDoc = null; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Processing a connection message"); } try { MessageElement el = msg.getMessageElement(nameSpace, credTag); if (el != null) { InputStream in = el.getStream(); credDoc = (StructuredDocument) StructuredDocumentFactory.newStructuredDocument(el.getMimeType(), in); } el = msg.getMessageElement(nameSpace, reqPipeTag); if (el != null) { InputStream in = el.getStream(); outputPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(el.getMimeType(), in); } el = msg.getMessageElement(nameSpace, remPeerTag); if (el != null) { InputStream in = el.getStream(); peerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(el.getMimeType(), in); } el = msg.getMessageElement(nameSpace, streamTag); boolean isStream = false; if (el != null) { isStream = (el.toString().equals("true")); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Connection request [isStream] :" + isStream); } } Messenger msgr = JxtaSocket.lightweightOutputPipe(group, outputPipeAdv, peerAdv); if (msgr != null) { PipeAdvertisement newpipe = newInputPipe(group, outputPipeAdv); JxtaSocket newsoc = new JxtaSocket(group, msgr, newpipe, credDoc, isStream); sendResponseMessage(group, msgr, newpipe); return newsoc; } } catch (IOException e) { // deal with the error if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("IOException occured", e); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Connection processing did not result in a connection"); } return null; } /** * Method sendResponseMessage get the createResponseMessage and sends it. * * @param group * @param msgr * @param pipeAd */ protected void sendResponseMessage(PeerGroup group, Messenger msgr, PipeAdvertisement pipeAd) throws IOException { Message msg = new Message(); PeerAdvertisement peerAdv = group.getPeerAdvertisement(); if (myCredentialDoc == null) { myCredentialDoc = JxtaSocket.getCredDoc(group); } if (myCredentialDoc != null) { msg.addMessageElement(JxtaServerSocket.nameSpace, new InputStreamMessageElement(credTag, MimeMediaType.XMLUTF8, myCredentialDoc.getStream(), null)); } msg.addMessageElement(JxtaServerSocket.nameSpace, new TextDocumentMessageElement(remPipeTag, (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null)); msg.addMessageElement(nameSpace, new TextDocumentMessageElement(remPeerTag, (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null)); msgr.sendMessageB(msg, null, null); } /** * Utility method newInputPipe is used to create an ephemeral pipe advertisement (w/random pipe ID) from old one. * <p> * Called by JxtaSocket to make pipe (name -> name.remote) for open message * <p> * Called by JxtaServerSocket to make pipe (name.remote -> name.remote.remote) for response message * * @param group * @param pipeadv to get the basename and type from * @return PipeAdvertisement a new pipe advertisement */ protected static PipeAdvertisement newInputPipe(PeerGroup group, PipeAdvertisement pipeadv) { PipeAdvertisement adv = null; adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); adv.setPipeID(IDFactory.newPipeID((PeerGroupID) group.getPeerGroupID())); adv.setName(pipeadv.getName() + ".remote"); adv.setType(pipeadv.getType()); return adv; } /** * Sets the connection credential doc * If no credentials are set, the default group credential will be used * @param doc Credential StructuredDocument */ public void setCredentialDoc(StructuredDocument doc) { this.myCredentialDoc = doc; } /** * @return the server socket's JxtaSocketAddress * @see java.net.ServerSocket#getLocalSocketAddress() */ public SocketAddress getLocalSocketAddress() { return new JxtaSocketAddress(getGroup(), getPipeAdv(), getGroup().getPeerID()); } /** * {@inheritDoc} * * <p/>Closes the JxtaServerPipe. */ protected synchronized void finalize() throws Throwable { super.finalize(); if (!closed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("JxtaServerSocket is being finalized without being previously closed. This is likely a users bug."); } } close(); } /** *{@inheritDoc} */ public String toString() { if (!isBound()) { return "JxtaServerSocket[unbound]"; } return "JxtaServerSocket[pipe id=" + pipeadv.getPipeID().toString() + "]"; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -