⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 piperesolver.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            doc = (StructuredTextDocument)                    StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, queryReader);        } catch (IOException e) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("discarding malformed request ", e);            }            // no sense in re-propagation here            return ResolverService.OK;        }        finally {            try {                queryReader.close();            } catch (IOException ignored) {                ;            }            queryReader = null;        }                PipeResolverMessage pipeQuery;        try {            pipeQuery = new PipeResolverMsg(doc);        } catch (IllegalArgumentException badDoc) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("discarding malformed request ", badDoc);            }            // no sense in re-propagation here            return ResolverService.OK;        }        finally {            doc = null;        }                // is it a query?        if (!pipeQuery.getMsgType().equals(MessageType.QUERY)) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("expected query - discarding.");            }            // no sense in re-propagation here            return ResolverService.OK;        }                // see if it is a query directed at our peer.        Set destPeers = pipeQuery.getPeerIDs();        boolean directedQuery = !destPeers.isEmpty();        boolean queryForMe = !directedQuery;        if (directedQuery) {            Iterator eachDestPeer = destPeers.iterator();                        while (eachDestPeer.hasNext()) {                ID aPeer = (ID) eachDestPeer.next();                if (aPeer.equals(myGroup.getPeerID())) {                    queryForMe = true;                    break;                }            }                        if (!queryForMe) {                // It is an directed query, but request wasn't for this peer.                if (query.getSrc().equals(queryFrom)) {                    // we only respond if the original src was not the query forwarder                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("discarding query. Query not for us.");                    }                                        // tell the resolver no further action is needed.                    return ResolverService.OK;                }                                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("responding to 'misdirected' forwarded query.");                }                                responseDest = queryFrom;            }        }                PeerID peerID = null;        if (queryForMe) {            // look locally.            InputPipe ip = findLocal((PipeID) pipeQuery.getPipeID());                        if ((ip != null) && (ip.getType().equals(pipeQuery.getPipeType()))) {                peerID = myGroup.getPeerID();            }        }                if ((null == peerID) && !directedQuery) {            // This request was sent to everyone.            if (myGroup.isRendezvous()) {                // We are a RDV, allow the ResolverService to repropagate the query                List results = srdiIndex.query(pipeQuery.getPipeType(), PipeAdvertisement.IdTag, pipeQuery.getPipeID().toString(), 20);                                if (results.size() > 0) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("forwarding query to " + results.size() + " peers");                    }                                        Iterator eachPeer = results.iterator();                    while (eachPeer.hasNext()) {                        PeerID aPeer = (PeerID) eachPeer.next();                        srdi.forwardQuery(aPeer, query);                    }                                        // tell the resolver no further action is needed.                    return ResolverService.OK;                }                                // we don't know anything, continue the walk.                return ResolverService.Repropagate;            } else {                // We are an edge                if (query.getSrc().equals(queryFrom)) {                    // we only respond if the original src was not the query forwarder                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("discarding query.");                    }                                        // tell the resolver no further action is needed.                    return ResolverService.OK;                }                                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("responding to query forwarded for 'misdirected' query.");                }                                responseDest = queryFrom;            }        }                // Build the answer        PipeResolverMessage pipeResp = new PipeResolverMsg();                pipeResp.setMsgType(MessageType.ANSWER);        pipeResp.setPipeID(pipeQuery.getPipeID());        pipeResp.setPipeType(pipeQuery.getPipeType());        if (null == peerID) {            // respond negative.            pipeResp.addPeerID(myGroup.getPeerID());            pipeResp.setFound(false);        } else {            pipeResp.addPeerID(peerID);            pipeResp.setFound(true);            pipeResp.setInputPeerAdv(myGroup.getPeerAdvertisement());        }                // make a response from the incoming query        ResolverResponseMsg res = query.makeResponse();        res.setCredential(credentialDoc);        res.setResponse(pipeResp.getDocument(MimeMediaType.XMLUTF8).toString());                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Sending answer for query '" + query.getQueryId() + "' to : " + responseDest);        }        resolver.sendResponse(responseDest, res);                return ResolverService.OK;    }        /**     *  {@inheritDoc}     **/    public void processResponse(ResolverResponseMsg response) {        processResponse(response, null);    }        /**     *  {@inheritDoc}     **/    public void processResponse(ResolverResponseMsg response, EndpointAddress srcAddr) {                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("got a response for '" + response.getQueryId() + "'");        }                Reader resp = new StringReader(response.getResponse());                StructuredTextDocument doc = null;        try {            doc = (StructuredTextDocument)                    StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, resp);        } catch (Throwable e) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("malformed response - discard", e);            }            return;        }        finally {            try {                resp.close();            } catch (IOException ignored) {                ;            }            resp = null;        }                PipeResolverMessage pipeResp;        try {            pipeResp = new PipeResolverMsg(doc);        } catch (Throwable caught) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("malformed response - discarding.", caught);            }            return;        }        finally {            doc = null;        }                // check if it's a response.        if (!pipeResp.getMsgType().equals(MessageType.ANSWER)) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("expected response - discarding.");            }            return;        }                PeerAdvertisement padv = pipeResp.getInputPeerAdv();        if ((null != padv) && !(myGroup.getPeerID().equals(padv.getPeerID()))) {            try {                // This is not our own peer adv so we keep it only for the default                // expiration time.                if (null == discovery) {                    discovery = myGroup.getDiscoveryService();                }                                if (null != discovery) {                    discovery.publish(padv, DiscoveryService.DEFAULT_EXPIRATION, DiscoveryService.DEFAULT_EXPIRATION);                }            } catch (IOException ignored) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("could not publish peer adv");                }            }        }                String ipId = pipeResp.getPipeID().toString();                Set peerRsps = pipeResp.getPeerIDs();                Iterator eachResp = peerRsps.iterator();                while (eachResp.hasNext()) {            // process each peer for which this response is about.            PeerID peer = (PeerID) eachResp.next();            if (!pipeResp.isFound()) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("NACK for pipe '" + ipId + "' from peer " + peer);                }                                // We have received a NACK. Remove that entry.                srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, 0);            } else {                long exp = getEntryExp(pipeResp.getPipeType(),                                        PipeAdvertisement.IdTag,                                       ipId,                                       peer);                if ((PipeServiceImpl.VERIFYINTERVAL / 2) > exp) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Using Expiration "+(PipeServiceImpl.VERIFYINTERVAL / 2) +" which is > "+ exp);                    }                    // create antry only if one does not exist,or entry exists with                    // lesser lifetime                    // cache the result for half the verify interval                    srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, (PipeServiceImpl.VERIFYINTERVAL / 2));                } else {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("DB Expiration "+ exp + " > "+(PipeServiceImpl.VERIFYINTERVAL / 2)+ " overriding attempt to decrease lifetime");                    }                }            }                        // call listener for pipeid            callListener(response.getQueryId(), (PipeID) pipeResp.getPipeID(), pipeResp.getPipeType(), peer, !pipeResp.isFound());        }    }        /**     * Given a list of Entries returns the exp record of peerid     * otherwise a -1     */    private long getEntryExp(String pkey, String skey, String value, PeerID peerid){        List list = srdiIndex.getRecord(pkey, skey, value);        Iterator it=list.iterator();        while (it.hasNext()){            SrdiIndex.Entry entry = (SrdiIndex.Entry) it.next();            if (entry.peerid.equals(peerid)) {                // exp in millis                return TimeUtils.toRelativeTimeMillis(entry.expiration);            }        }        return -1;    }        /**     *  {@inheritDoc}     **/    public boolean processSrdi(ResolverSrdiMsg message) {                if (message == null) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("no SRDI message");            }            return false;        }                if (message.getPayload() == null) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("empty SRDI message");            }            return false;        }                SrdiMessage srdiMsg;        try {            StructuredTextDocument asDoc = (StructuredTextDocument)                    StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload()));                        srdiMsg = new SrdiMessageImpl(asDoc);        } catch (Throwable e) {            // we don't understand this msg, let's skip it            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Invalid SRDI message", e);            }            return false;        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Received an SRDI messsage with " + srdiMsg.getEntries().size() + " entries from " + srdiMsg.getPeerID());        }                // add the entries too        Iterator eachEntry = srdiMsg.getEntries().iterator();                while (eachEntry.hasNext()) {            Entry entry = (Entry) eachEntry.next();            srdiIndex.add(srdiMsg.getPrimaryKey(), entry.key, entry.value, srdiMsg.getPeerID(), entry.expiration);        }                if (!PipeService.PropagateType.equals(srdiMsg.getPrimaryKey())) {            // don't replicate entries for propagate pipes. For unicast type            // pipes the replica is useful in finding pipe instances. Since            // walking rather than searching is done for propagate pipes this            // appropriate.            srdi.replicateEntries(srdiMsg);        }                return true;    }        /**     *  {@inheritDoc}     **/    public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {// so what.    }        /**     *  {@inheritDoc}     **/    public void pushEntries(boolean all) {                pushSrdi((PeerID) null, all);    }        /**     * unregisters the resolver handler     **/    void stop() {                resolver.unregisterHandler(PipeResolverName);                resolver.unregisterSrdiHandler(PipeResolverName);                srdiIndex.stop();        srdiIndex = null;        // stop the srdi thread        if (srdiThread != null) {            srdi.stop();        }        srdiThread = null;        srdi = null;                membership.removePropertyChangeListener("defaultCredential", membershipCredListener);        membershipCredListener = null;        credential = null;        credentialDoc = null;                // Avoid cross-reference problems with GC        myGroup = null;        resolver = null;        discovery = null;        membership = null;                outputpipeListeners.clear();                // close the local pipes        Iterator eachLocalInputPipe = Arrays.asList(localInputPipes.values().toArray()).iterator();        while (eachLocalInputPipe.hasNext()) {            InputPipe aPipe = (InputPipe) eachLocalInputPipe.next();            try {                aPipe.close();            } catch (Exception failed) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Failure closing " + aPipe);                }            }        }        

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -