⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtaserversocket.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     *     *  <p/>The default timeout is 60 seconds (60000ms).     */    public Socket accept() throws IOException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        if (!isBound()) {            throw new SocketException("Socket is not bound yet");        }        try {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Waiting for a connection");            }            while (true) {                Message msg = (Message) queue.poll(timeout, TimeUnit.MILLISECONDS);                if (msg == null) {                    throw new SocketTimeoutException("Timeout reached");                }                JxtaSocket socket = processMessage(msg);                // make sure we have a socket returning                if (socket != null) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Waiting for a connection");                    }                    return socket;                } else if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("No connection");                }            }        } catch (InterruptedException ie) {            throw new SocketException("interrupted");        }    }    /**     *  Gets the group associated with this JxtaServerSocket object     *     * @return    The group value     */    public PeerGroup getGroup() {        return group;    }    /**     *  Gets the PipeAdvertisement associated with this JxtaServerSocket object     *     * @return    The pipeAdv value     */    public PipeAdvertisement getPipeAdv() {        return pipeadv;    }    /**     *  {@inheritDoc}     *     *  <p/>The default timeout is 60 seconds (60000ms).     */    public void close() throws IOException {        synchronized (closeLock) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Closing ServerSocket");            }            if (isClosed()) {                return;            }            if (isBound()) {                // close all the pipe                serverPipe.close();            }            queue.clear();            closed = true;        }    }    /**     *  Sets the bound attribute of the JxtaServerSocket object     */    protected void setBound() {        bound = true;    }    /**     *  {@inheritDoc}     *     *  <p/>The default timeout is 60 seconds (60000ms).     */    public synchronized int getSoTimeout()    throws IOException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        if (timeout > Integer.MAX_VALUE) {            return 0;        } else {            return (int) timeout;        }    }    /**     *  {@inheritDoc}     *     *  <p/>The default timeout is 60 seconds (60000ms).     */    public synchronized void setSoTimeout(int timeout)    throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        if (timeout < 0) {            throw new IllegalArgumentException("timeout must be >= 0");        }        if (0 == timeout) {            this.timeout = Long.MAX_VALUE;        } else {            this.timeout = (long) timeout;        }    }    /**     *  {@inheritDoc}     */    public boolean isClosed() {        synchronized (closeLock) {            return closed;        }    }    /**     *  {@inheritDoc}     */    public boolean isBound() {        return bound;    }    /**     *  {@inheritDoc}     */    public void pipeMsgEvent(PipeMsgEvent event) {        // deal with messages as they come in        Message message = event.getMessage();        if (message == null) {            return;        }        boolean pushed = false;        try {            pushed = queue.offer(message, timeout, TimeUnit.MILLISECONDS);        } catch (InterruptedException e) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Interrupted", e);            }        }        if (!pushed && LOG.isEnabledFor(Level.WARN)) {            LOG.warn("backlog queue full, connect request dropped");        }    }    /**     * processMessage is the main mechanism in establishing bi-directional connections     * <p>     * It accepts connection messages and constructs a JxtaSocket with a ephemeral     * InputPipe and a messenger.     * <p>     * The ResponseMessage is created and sent.     *      * @param msg The client connection request (assumed not null)     * @return JxtaSocket Which may be null if an error occurs.     */    private JxtaSocket processMessage(Message msg) {        PipeAdvertisement outputPipeAdv = null;        PeerAdvertisement  peerAdv = null;        StructuredDocument credDoc = null;        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Processing a connection message");        }        try {            MessageElement el = msg.getMessageElement(nameSpace, credTag);            if (el != null) {                InputStream in = el.getStream();                credDoc = (StructuredDocument)                          StructuredDocumentFactory.newStructuredDocument(el.getMimeType(), in);            }            el = msg.getMessageElement(nameSpace, reqPipeTag);            if (el != null) {                InputStream in = el.getStream();                outputPipeAdv = (PipeAdvertisement)                                AdvertisementFactory.newAdvertisement(el.getMimeType(), in);            }            el = msg.getMessageElement(nameSpace, remPeerTag);            if (el != null) {                InputStream in = el.getStream();                peerAdv = (PeerAdvertisement)                          AdvertisementFactory.newAdvertisement(el.getMimeType(), in);            }            el = msg.getMessageElement(nameSpace, streamTag);            boolean isStream = false;            if (el != null) {                isStream = (el.toString().equals("true"));                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Connection request [isStream] :" + isStream);                }            }            Messenger msgr = JxtaSocket.lightweightOutputPipe(group, outputPipeAdv, peerAdv);            if (msgr != null) {                PipeAdvertisement newpipe = newInputPipe(group, outputPipeAdv);                JxtaSocket newsoc = new JxtaSocket(group, msgr, newpipe, credDoc, isStream);                sendResponseMessage(group, msgr, newpipe);                return newsoc;            }        } catch (IOException e) {            // deal with the error            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("IOException occured", e);            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Connection processing did not result in a connection");        }        return null;    }    /**     * Method sendResponseMessage get the createResponseMessage and sends it.     *      * @param group     * @param msgr     * @param pipeAd     */    protected void sendResponseMessage(PeerGroup group, Messenger msgr, PipeAdvertisement pipeAd) throws IOException {        Message msg = new Message();        PeerAdvertisement peerAdv = group.getPeerAdvertisement();        if (myCredentialDoc == null) {            myCredentialDoc = JxtaSocket.getCredDoc(group);        }        if (myCredentialDoc != null) {            msg.addMessageElement(JxtaServerSocket.nameSpace,                                  new InputStreamMessageElement(credTag, MimeMediaType.XMLUTF8, myCredentialDoc.getStream(), null));        }        msg.addMessageElement(JxtaServerSocket.nameSpace,                              new TextDocumentMessageElement(remPipeTag, (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));        msg.addMessageElement(nameSpace, new TextDocumentMessageElement(remPeerTag, (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));        msgr.sendMessageB(msg, null, null);    }    /**     * Utility method newInputPipe is used to create an ephemeral pipe advertisement (w/random pipe ID) from old one.     * <p>     * Called by JxtaSocket to make pipe (name -> name.remote) for open message     * <p>     * Called by JxtaServerSocket to make pipe (name.remote -> name.remote.remote) for response message     *      * @param group     * @param pipeadv to get the basename and type from     * @return PipeAdvertisement a new pipe advertisement     */    protected static PipeAdvertisement newInputPipe(PeerGroup group, PipeAdvertisement pipeadv) {        PipeAdvertisement adv = null;        adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());        adv.setPipeID(IDFactory.newPipeID((PeerGroupID) group.getPeerGroupID()));        adv.setName(pipeadv.getName() + ".remote");        adv.setType(pipeadv.getType());        return adv;    }    /**     *  Sets the connection credential doc     *  If no credentials are set, the default group credential will be used     *  @param doc Credential StructuredDocument      */    public void setCredentialDoc(StructuredDocument doc) {        this.myCredentialDoc = doc;    }    /**     * @return the server socket's JxtaSocketAddress     * @see java.net.ServerSocket#getLocalSocketAddress()     */    public SocketAddress getLocalSocketAddress() {        return new JxtaSocketAddress(getGroup(), getPipeAdv(), getGroup().getPeerID());    }    /**     *  {@inheritDoc}     *     *  <p/>Closes the JxtaServerPipe.     */    protected synchronized void finalize()    throws Throwable {        super.finalize();        if (!closed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("JxtaServerSocket is being finalized without being previously closed. This is likely a users bug.");            }        }        close();    }    /**     *{@inheritDoc}     */    public String toString() {        if (!isBound()) {            return "JxtaServerSocket[unbound]";        }        return "JxtaServerSocket[pipe id=" + pipeadv.getPipeID().toString() + "]";    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -