⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtabidipipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
            } catch (SocketTimeoutException io) {                if (msgr instanceof TcpMessenger) {                    ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);                    return true;                } else {                    return msgr.sendMessage(msg, null, null);                }            } catch (IOException io) {                closePipe(true);                IOException exp = new IOException("IO error occured during sendMessage()");                exp.initCause(io);                throw exp;            }        }    }    private void dequeue() {        if (!dequeued && (null != msgListener)) {            while (queue != null && !queue.isEmpty()) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("dequeing messages onto message listener");                }                try {                    msgListener.pipeMsgEvent(queue.take());                } catch (InterruptedException e) {                    //ignored                }            }            dequeued = false;        }    }    /**     * {@inheritDoc}     */    public void outputPipeEvent(OutputPipeEvent event) {        OutputPipe op = event.getOutputPipe();        if (op.getAdvertisement() == null) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("The output pipe has no internal pipe advertisement. Continueing anyway.");            }        }        if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) {            synchronized (acceptLock) {                // modify op within lock to prevent a race with the if.                if (connectOutpipe == null) {                    connectOutpipe = op;                    // set to null to avoid closure                    op = null;                }                acceptLock.notifyAll();            }            // Ooops one too many, we were too fast re-trying.            if (op != null) {                op.close();            }        } else {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Unexpected OutputPipe :" + op);            }        }    }    /**     * A lightweight direct messenger output pipe constructor, note the return type     * Since all the info needed is available, there's no need for to     * use the pipe service to resolve the pipe we have all we need     * to construct a messenger.     *     * @param group   group context     * @param pipeAdv Remote Pipe Advertisement     * @param peer    Remote Peer advertisement     * @return Messenger     */    protected static Messenger getDirectMessenger(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {        // Get an endpoint messenger to that address        if (pipeAdv.getType().equals(PipeService.PropagateType)) {            throw new IllegalArgumentException("Invalid pipe type " + pipeAdv.getType());        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Creating a Direct Messenger");        }        if (pipeAdv.getType().equals(PipeService.UnicastType)) {            EndpointService endpoint = group.getEndpointService();            EndpointAddress pipeEndpoint = new EndpointAddress("jxta",                                                       (peer.getPeerID().getUniqueValue()).toString(),                                                       "PipeService",                                                       pipeAdv.getPipeID().toString());            return endpoint.getDirectMessenger(pipeEndpoint, peer, true);        }        return null;    }    /**     * A lightweight output pipe constructor, note the return type     * Since all the info needed is available, there's no need for to     * use the pipe service to resolve the pipe we have all we need     * to construct a messenger.     *     * @param group   group context     * @param pipeAdv Remote Pipe Advertisement     * @param peer    Remote Peer advertisement     * @return Messenger     */    protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {        EndpointService endpoint = group.getEndpointService();        ID opId = pipeAdv.getPipeID();        String destPeer = (peer.getPeerID().getUniqueValue()).toString();        // Get an endpoint messenger to that address        EndpointAddress addr;        if (pipeAdv.getType().equals(PipeService.UnicastType)) {            addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString());        } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) {            addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString());        } else {            // not a supported type            return null;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Creating a lightweightOutputPipe()");        }        return endpoint.getMessenger(addr);    }    /**     * Not implemented yet     *     * @param cred the credential document     * @return always returns true     */    protected boolean checkCred(StructuredDocument cred) {        // FIXME need to check credentials        return true;    }    /**     * Send a close message to the remote side     */    private void sendClose() {        if (!direct && isReliable && ros.isClosed()) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("ReliableOutputStream is already closed. Skipping close message");            }            return;        }        Message msg = new Message();        msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.closeTag, "close", null));        try {            sendMessage(msg);            // ros will not take any new message, now.            if (!direct && ros != null) {                ros.close();            }        } catch (IOException ie) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.SEVERE, "failed during close", ie);            }        }    }    /**     * Returns the message listener for this pipe     *     * @return PipeMsgListener     * @deprecated use getMessageListener instead     */    @Deprecated    public PipeMsgListener getListener() {        return getMessageListener();    }    /**     * Returns the message listener for this pipe     *     * @return PipeMsgListener     */    public PipeMsgListener getMessageListener() {        return msgListener;    }    /**     * Sets message listener for a pipe spawned by the JxtaServerPipe.     * There is a window where a message could arrive prior to listener being     * registered therefore a message queue is created to queue messages, once     * a listener is registered these messages will be dequeued by calling the     * listener until the queue is empty     *     * @param msgListener New value of property listener.     * @deprecated use setMessageListener instead     */    @Deprecated    public void setListener(PipeMsgListener msgListener) {        setMessageListener(msgListener);    }    /**     * Sets message listener for a pipe spawned by the JxtaServerPipe.     * There is a window where a message could arrive prior to listener being     * registered therefore a message queue is created to queue messages, once     * a listener is registered these messages will be dequeued by calling the     * listener until the queue is empty.     * <p/>     * Sending messages vis {@link #sendMessage(Message)} from within a      * {@code PipeMsgListener} may result in a deadlock due to contention     * between the sending and receiving portions of BiDi pipes.      *     * @param msgListener New value of property listener.     */    public void setMessageListener(PipeMsgListener msgListener) {        this.msgListener = msgListener;        // if there are messages enqueued then dequeue them onto the msgListener        dequeue();    }    /**     * Sets a Pipe event listener, set listener to null to unset the listener     *     * @param eventListener New value of property listener.     * @deprecated use setPipeEventListener instead     */    @Deprecated    public void setListener(PipeEventListener eventListener) {        setPipeEventListener(eventListener);    }    /**     * Sets a Pipe event listener, set listener to null to unset the listener     *     * @param eventListener New value of property listener.     */    public void setPipeEventListener(PipeEventListener eventListener) {        this.eventListener = eventListener;    }    /**     * Returns the Pipe event listener for this pipe     *     * @return PipeMsgListener     */    public PipeEventListener getPipeEventListener() {        return eventListener;    }    /**     * Sets a Pipe state listener, set listener to null to unset the listener     *     * @param stateListener New value of property listener.     */    public void setPipeStateListener(PipeStateListener stateListener) {        this.stateListener = stateListener;    }    /**     * Returns the Pipe state listener for this pipe     *     * @return PipeMsgListener     */    public PipeStateListener getPipeStateListener() {        return stateListener;    }    /**     * Gets a message from the queue. If no Object is immediately available,     * then wait the specified amount of time for a message to be inserted.     *     * @param timeout Amount of time to wait in milliseconds for an object to     *                be available. Per Java convention, a timeout of zero (0) means wait an     *                infinite amount of time. Negative values mean do not wait at all.     * @return The next message in the queue. if a listener is registered calls     *         to this method will return null     * @throws InterruptedException if the operation is interrupted before     *                              the timeout interval is completed.     */    public Message getMessage(int timeout) throws InterruptedException {        if (queue == null || msgListener != null) {            return null;        } else {            PipeMsgEvent ev = queue.poll(timeout, TimeUnit.MILLISECONDS);            if (ev != null) {                return ev.getMessage();            } else {                return null;            }        }    }    /**     * Returns the Assigned PipeAdvertisement     *     * @return the Assigned PipeAdvertisement     */    public PipeAdvertisement getPipeAdvertisement() {        return pipeAdv;    }    /**     * {@inheritDoc}     * <p/>     * Closes the JxtaBiDiPipe.     */    @Override    protected synchronized void finalize() throws Throwable {        if (!closed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug.");            }            close();        }        super.finalize();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -