⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtaserverpipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    }    /**     * Gets the Timeout attribute of the JxtaServerPipe     *     * @return The soTimeout value     * @throws IOException if an I/O error occurs     */    public synchronized int getPipeTimeout() throws IOException {        if (isClosed()) {            throw new SocketException("Server Pipe is closed");        }        if (timeout > Integer.MAX_VALUE) {            return 0;        } else {            return (int) timeout;        }    }    /**     * Sets the Timeout attribute of the JxtaServerPipe a timeout of 0 blocks forever.     *     * @param timeout The new soTimeout value     * @throws SocketException if an I/O error occurs     */    public synchronized void setPipeTimeout(int timeout) throws SocketException {        if (isClosed()) {            throw new SocketException("Server Pipe is closed");        }        if (timeout < 0) {            throw new IllegalArgumentException("Negative timeout values are not allowed.");        }        if (0 == timeout) {            this.timeout = Long.MAX_VALUE;        } else {            this.timeout = (long) timeout;        }    }    /**     * Returns the closed state of the JxtaServerPipe.     *     * @return true if the socket has been closed     */    public boolean isClosed() {        synchronized (closeLock) {            return closed;        }    }    /**     * Returns the binding state of the JxtaServerPipe.     *     * @return true if the ServerSocket successfully bound to an address     */    public boolean isBound() {        return bound;    }    /**     * {@inheritDoc}     */    public void pipeMsgEvent(PipeMsgEvent event) {        Message message = event.getMessage();        if (message == null) {            return;        }        ConnectionProcessor processor = new ConnectionProcessor(message);        executor.execute(processor);    }    /**     * Method processMessage is the heart of this class.     * <p/>     * This takes new incoming connect messages and constructs the JxtaBiDiPipe     * to talk to the new client.     * <p/>     * The ResponseMessage is created and sent.     *     * @param msg The client connection request (assumed not null)     * @return JxtaBiDiPipe Which may be null if an error occurs.     */    private JxtaBiDiPipe processMessage(Message msg) {        PipeAdvertisement outputPipeAdv = null;        PeerAdvertisement peerAdv = null;        StructuredDocument credDoc = null;        try {            MessageElement el = msg.getMessageElement(nameSpace, credTag);            if (el != null) {                credDoc = StructuredDocumentFactory.newStructuredDocument(el);            }            el = msg.getMessageElement(nameSpace, reqPipeTag);            if (el != null) {                XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);                outputPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(asDoc);            }            el = msg.getMessageElement(nameSpace, remPeerTag);            if (el != null) {                XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);                peerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(asDoc);            }            el = msg.getMessageElement(nameSpace, reliableTag);            boolean isReliable = false;            if (el != null) {                isReliable = Boolean.valueOf((el.toString()));                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Connection request [isReliable] :" + isReliable);                }            }            el = msg.getMessageElement(nameSpace, directSupportedTag);            boolean directSupported = false;            if (el != null) {                directSupported = Boolean.valueOf((el.toString()));                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Connection request [directSupported] :" + directSupported);                }            }            Messenger msgr;            boolean direct = false;            if (directSupported) {                msgr = JxtaBiDiPipe.getDirectMessenger(group, outputPipeAdv, peerAdv);                if (msgr == null) {                    msgr = JxtaBiDiPipe.lightweightOutputPipe(group, outputPipeAdv, peerAdv);                } else {                    direct = true;                }            } else {                msgr = JxtaBiDiPipe.lightweightOutputPipe(group, outputPipeAdv, peerAdv);            }            if (msgr != null) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Reliability set to :" + isReliable);                }                PipeAdvertisement newpipe = newInputPipe(group, outputPipeAdv);                JxtaBiDiPipe pipe = new JxtaBiDiPipe(group, msgr, newpipe, credDoc, isReliable, direct);                pipe.setRemotePeerAdvertisement(peerAdv);                pipe.setRemotePipeAdvertisement(outputPipeAdv);                sendResponseMessage(group, msgr, newpipe);                return pipe;            }        } catch (IOException e) {            // deal with the error            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "IOException occured", e);            }        }        return null;    }    /**     * Method sendResponseMessage get the createResponseMessage and sends it.     *     * @param group  the peer group     * @param msgr   the remote node messenger     * @param pipeAd the pipe advertisement     * @throws IOException for failures sending the response message.     */    protected void sendResponseMessage(PeerGroup group, Messenger msgr, PipeAdvertisement pipeAd) throws IOException {        Message msg = new Message();        PeerAdvertisement peerAdv = group.getPeerAdvertisement();        if (myCredentialDoc == null) {            myCredentialDoc = JxtaBiDiPipe.getCredDoc(group);        }        if (myCredentialDoc != null) {            msg.addMessageElement(JxtaServerPipe.nameSpace,                    new TextDocumentMessageElement(credTag, (XMLDocument) myCredentialDoc, null));        }        msg.addMessageElement(JxtaServerPipe.nameSpace,                new StringMessageElement(JxtaServerPipe.directSupportedTag, Boolean.toString(true), null));        msg.addMessageElement(JxtaServerPipe.nameSpace,                new TextDocumentMessageElement(remPipeTag, (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));        msg.addMessageElement(nameSpace,                new TextDocumentMessageElement(remPeerTag, (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));        if (msgr instanceof TcpMessenger) {            ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);        } else {            msgr.sendMessage(msg);        }    }    /**     * Utility method newInputPipe is used to get new pipe advertisement (w/random pipe ID) from old one.     * <p/>     * Called by JxtaSocket to make pipe (name -> name.remote) for open message     * <p/>     * Called by JxtaServerSocket to make pipe (name.remote -> name.remote.remote) for response message     *     * @param group   the peer group     * @param pipeadv to get the basename and type from     * @return PipeAdvertisement a new pipe advertisement     */    protected static PipeAdvertisement newInputPipe(PeerGroup group, PipeAdvertisement pipeadv) {        PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());        adv.setPipeID(IDFactory.newPipeID(group.getPeerGroupID()));        adv.setName(pipeadv.getName());        adv.setType(pipeadv.getType());        return adv;    }    /**     * get the credential doc     *     * @return Credential StructuredDocument     */    public StructuredDocument getCredentialDoc() {        return myCredentialDoc;    }    /**     * Sets the connection credential doc     * If no credentials are set, the default group credential will be used     *     * @param doc Credential StructuredDocument     */    public void setCredentialDoc(StructuredDocument doc) {        this.myCredentialDoc = doc;    }    /**     * {@inheritDoc}     * <p/>     * Closes the JxtaServerPipe.     */    @Override    protected synchronized void finalize() throws Throwable {        if (!closed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("JxtaServerPipe is being finalized without being previously closed. This is likely a user's bug.");            }        }        close();        super.finalize();    }    /**     * A small class for processing individual messages.     */    private class ConnectionProcessor implements Runnable {        private Message message;        ConnectionProcessor(Message message) {            this.message = message;        }        public void run() {            JxtaBiDiPipe bidi = processMessage(message);            // make sure we have a socket returning            if (bidi != null) {                try {                    connectionQueue.offer(bidi, timeout, TimeUnit.MILLISECONDS);                } catch (InterruptedException e) {                    Thread.interrupted();                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -