⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtasocket.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
            if (isConnected()) {                setConnected(false);                if (isReliable) {                    ris.softClose();                    ros.hardClose();                } else {                    nonReliableInputStream.softClose();                    nonReliableOutputStream.hardClose();                }            }            // If we are still bound then send them a close ACK.            if (isBound() && (ros != null && ros.isQueueEmpty())) {                // do not ack until the queue is empty                sendCloseACK();            }            if (closeAckReceived) {                closeLock.notifyAll();            }        }    }    /**     * Closes the input pipe which we use to receive messages and the messenger     * used for sending messages.     */    protected synchronized void unbind() {        if (!isBound()) {            return;        }        if (isReliable) {            try {                ris.close();            } catch (IOException ignored) {// ignored            }            ros.hardClose();        } else {            nonReliableInputStream.close();            nonReliableOutputStream.hardClose();        }        // We are no longer bound        setBound(false);        // close pipe and messenger        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Closing ephemeral input pipe");        }        localEphemeralPipeIn.close();        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Closing remote ephemeral pipe messenger");        }        if(null != outgoing) {            outgoing.close();        }        remoteEphemeralPipeMsgr.close();    }    /**     * {@inheritDoc}     */    public void pipeMsgEvent(PipeMsgEvent event) {        if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {            LOG.log(Level.FINER, "Pipe Message Event for " + this + "\n\t" + event.getMessage() + " for " + event.getPipeID());        }        Message message = event.getMessage();        if (message == null) {            return;        }        // look for close request/ack        MessageElement element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.closeTag);        if (element != null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Handling a close message " + this + " : " + element.toString());            }            if (JxtaServerSocket.closeReqValue.equals(element.toString())) {                try {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Received a close request");                    }                    closeFromRemote();                } catch (IOException ie) {                    if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                        LOG.log(Level.SEVERE, "failed during closeFromRemote", ie);                    }                }            } else if (JxtaServerSocket.closeAckValue.equals(element.toString())) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Received a close acknowledgement");                }                synchronized (closeLock) {                    closeAckReceived = true;                    setConnected(false);                    closeLock.notifyAll();                }            }            return;        }        if (!isConnected()) {            // connect response            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "Processing connect response : " + message);            }            // look for a remote pipe answer            element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.remPipeTag);            PipeAdvertisement incomingPipeAdv = null;            if (element != null) {                try {                    XMLDocument pipeAdvDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);                    incomingPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(pipeAdvDoc);                } catch (IOException badPipeAdv) {// ignored                }            }            element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.remPeerTag);            PeerAdvertisement incomingRemotePeerAdv = null;            if (element != null) {                try {                    XMLDocument peerAdvDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);                    incomingRemotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(peerAdvDoc);                } catch (IOException badPeerAdv) {// ignored                }            }            element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.credTag);            Credential incomingCredential = null;            if (element != null) {                try {                    StructuredDocument incomingCredentialDoc = StructuredDocumentFactory.newStructuredDocument(element);                    incomingCredential = group.getMembershipService().makeCredential(incomingCredentialDoc);                } catch (Exception failed) {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.log(Level.WARNING, "Unable to generate credential for " + this, failed);                    }                }            }            element = message.getMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE, JxtaServerSocket.streamTag);            boolean incomingIsReliable = isReliable;            if (element != null) {                incomingIsReliable = Boolean.valueOf(element.toString());            }            if ((null != incomingPipeAdv) && (null != incomingRemotePeerAdv)) {                if ((null != remotePeerID) && (remotePeerID != incomingRemotePeerAdv.getPeerID())) {                    // let the connection attempt timeout                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.warning(                                "Connection response from wrong peer! " + remotePeerID + " != "                                + incomingRemotePeerAdv.getPeerID());                    }                    return;                }                synchronized (socketConnectLock) {                    if (!isConnected()) {                        remoteCredential = incomingCredential;                        remotePeerAdv = incomingRemotePeerAdv;                        remotePeerID = incomingRemotePeerAdv.getPeerID();                        remoteEphemeralPipeAdv = incomingPipeAdv;                        isReliable = incomingIsReliable;                        // Force the creation of the inputStream now. Waiting until someone                        // calls getInputStream() would likely cause us to drop messages.                        // FIXME: it would be even better if we could create the                        // input stream BEFORE having the output pipe resolved, but                        // that would force us to have the MsrgAdaptor block                        // until we can give it the real pipe or msgr... later.                        try {                            connect();                        } catch (IOException failed) {                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                LOG.log(Level.WARNING, "Connection failed : " + this, failed);                            }                            return;                        }                        socketConnectLock.notify();                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                            LOG.log(Level.INFO, "New Socket Connection : " + this);                        }                    }                }                return;            }        }        // Often we are called to handle data before the socket is connected.        synchronized (socketConnectLock) {            long timeoutAt = System.currentTimeMillis() + timeout;            if (timeoutAt < timeout) {                timeoutAt = Long.MAX_VALUE;            }            while (!isClosed() && !isConnected()) {                long waitFor = timeoutAt - System.currentTimeMillis();                if (waitFor <= 0) {                    break;                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.log(Level.FINE, "Holding " + message + " for " + timeout);                }                try {                    socketConnectLock.wait(timeout);                } catch (InterruptedException woken) {                    return;                }            }        }        if (!isReliable) {            // is there data ?            Iterator<MessageElement> dataElements = message.getMessageElements(JxtaServerSocket.MSG_ELEMENT_NAMESPACE,                    JxtaServerSocket.dataTag);            while (dataElements.hasNext()) {                MessageElement anElement = dataElements.next();                nonReliableInputStream.enqueue(anElement);            }        } else {            // Give ACKs to the Reliable Output Stream            if (ros != null) {                ros.recv(message);            }            // Give data blocks to the Reliable Input Stream            if (ris != null) {                ris.recv(message);            }        }    }    /**     * {@inheritDoc}     */    public void outputPipeEvent(OutputPipeEvent event) {        OutputPipe op = event.getOutputPipe();        if (op.getAdvertisement() == null) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("The output pipe has no internal pipe advertisement. discarding event");            }            return;        }        // name can be different, therefore check the id + type        if (pipeAdv.getID().equals(op.getAdvertisement().getID()) && pipeAdv.getType().equals(op.getAdvertisement().getType())) {            synchronized (pipeResolveLock) {                // modify op within lock to prevent a race with the if.                if (connectOutpipe == null) {                    connectOutpipe = op;                    // if not null, will be closed.                    op = null;                }                pipeResolveLock.notify();            }            // Ooops one too many, we were too fast re-trying.            if (op != null) {                op.close();            }        } else {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Unexpected OutputPipe :" + op);            }        }    }    /**     * A lightweight output pipe constructor, note the return type     * Since all the info needed is available, there's no need for to use the pipe service     * to resolve the pipe we have all we need to construct a messenger.     *     * @param group   group context     * @param pipeAdv Remote Pipe Advertisement     * @param peerAdv Remote Peer Advertisement     * @return Messenger     */    protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peerAdv) {        EndpointService endpoint = group.getEndpointService();        ID opId = pipeAdv.getPipeID();        String destPeer = peerAdv.getPeerID().getUniqueValue().toString();        // Get an endpoint messenger to that address        EndpointAddress addr;        RouteAdvertisement routeHint = net.jxta.impl.endpoint.EndpointUtils.extractRouteAdv(peerAdv);        if (pipeAdv.getType().equals(PipeService.UnicastType)) {            addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString());        } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) {            addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString());        } else {            // not a supported type            throw new IllegalArgumentException(pipeAdv.getType() + " is not a supported pipe type");        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("New pipe lightweight messenger for " + addr);        }        return endpoint.getMessenger(addr, routeHint);    }    /**     * Sends a close message     * @throws IOException if an io error occurs     */    private void sendClose() throws IOException {        Message msg = new Message();        msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE                ,                new StringMessageElement(JxtaServerSocket.closeTag, JxtaServerSocket.closeReqValue, null));        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Sending a close request " + this + " : " + msg);        }        remoteEphemeralPipeMsgr.sendMessageN(msg, null, null);    }    /**     * Sends a close ack message     * @throws IOException if an io error occurs     */    private void sendCloseACK() throws IOException {        Message msg = new Message();        msg.addMessageElement(JxtaServerSocket.MSG_ELEMENT_NAMESPACE,                new StringMessageElement(JxtaServerSocket.closeTag, JxtaServerSocket.closeAckValue, null));        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Sending a close ACK " + this + " : " + msg);        }        remoteEphemeralPipeMsgr.sendMessageN(msg, null, null);    }    /**     * {@inheritDoc}     */    @Override    public int getSoTimeout() throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        if (timeout > Integer.MAX_VALUE) {            return 0;        } else {            return (int) timeout;        }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -