⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 piperesolver.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        return processQuery(query, null);    }    /**     * {@inheritDoc}     */    public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddr) {        String queryFrom;        if (null != srcAddr) {            if ("jxta".equals(srcAddr.getProtocolName())) {                queryFrom = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();            } else {                // we don't know who routed us the query. Assume it came from the source.                queryFrom = query.getSrcPeer().toString();            }        } else {            // we don't know who routed us the query. Assume it came from the source.            queryFrom = query.getSrcPeer().toString();        }        String responseDest = query.getSrcPeer().toString();        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Starting for :" + query.getQueryId() + " from " + srcAddr);        }        Reader queryReader = new StringReader(query.getQuery());        StructuredTextDocument doc = null;        try {            doc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, queryReader);        } catch (IOException e) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "discarding malformed request ", e);            }            // no sense in re-propagation here            return ResolverService.OK;        } finally {            try {                queryReader.close();            } catch (IOException ignored) {                // ignored            }            queryReader = null;        }        PipeResolverMessage pipeQuery;        try {            pipeQuery = new PipeResolverMsg(doc);        } catch (IllegalArgumentException badDoc) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "discarding malformed request ", badDoc);            }            // no sense in re-propagation here            return ResolverService.OK;        } finally {            doc = null;        }        // is it a query?        if (!pipeQuery.getMsgType().equals(MessageType.QUERY)) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("expected query - discarding.");            }            // no sense in re-propagation here            return ResolverService.OK;        }        // see if it is a query directed at our peer.        Set<ID> destPeers = pipeQuery.getPeerIDs();        boolean directedQuery = !destPeers.isEmpty();        boolean queryForMe = !directedQuery;        if (directedQuery) {            for (Object destPeer : destPeers) {                ID aPeer = (ID) destPeer;                if (aPeer.equals(myGroup.getPeerID())) {                    queryForMe = true;                    break;                }            }            if (!queryForMe) {                // It is an directed query, but request wasn't for this peer.                if (query.getSrcPeer().toString().equals(queryFrom)) {                    // we only respond if the original src was not the query forwarder                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("discarding query. Query not for us.");                    }                    // tell the resolver no further action is needed.                    return ResolverService.OK;                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("responding to \'misdirected\' forwarded query.");                }                responseDest = queryFrom;            }        }        PeerID peerID = null;        if (queryForMe) {            // look locally.            InputPipe ip = findLocal((PipeID) pipeQuery.getPipeID());            if ((ip != null) && (ip.getType().equals(pipeQuery.getPipeType()))) {                peerID = myGroup.getPeerID();            }        }        if ((null == peerID) && !directedQuery) {            // This request was sent to everyone.            if (myGroup.isRendezvous()) {                // We are a RDV, allow the ResolverService to repropagate the query                List<PeerID> results = srdiIndex.query(pipeQuery.getPipeType(), PipeAdvertisement.IdTag,                        pipeQuery.getPipeID().toString(), 20);                if (!results.isEmpty()) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("forwarding query to " + results.size() + " peers");                    }                    srdi.forwardQuery(results, query);                    // tell the resolver no further action is needed.                    return ResolverService.OK;                }                // we don't know anything, continue the walk.                return ResolverService.Repropagate;            } else {                // We are an edge                if (query.getSrcPeer().toString().equals(queryFrom)) {                    // we only respond if the original src was not the query forwarder                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("discarding query.");                    }                    // tell the resolver no further action is needed.                    return ResolverService.OK;                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("responding to query forwarded for \'misdirected\' query.");                }                responseDest = queryFrom;            }        }        // Build the answer        PipeResolverMessage pipeResp = new PipeResolverMsg();        pipeResp.setMsgType(MessageType.ANSWER);        pipeResp.setPipeID(pipeQuery.getPipeID());        pipeResp.setPipeType(pipeQuery.getPipeType());        if (null == peerID) {            // respond negative.            pipeResp.addPeerID(myGroup.getPeerID());            pipeResp.setFound(false);        } else {            pipeResp.addPeerID(peerID);            pipeResp.setFound(true);            pipeResp.setInputPeerAdv(myGroup.getPeerAdvertisement());        }        // make a response from the incoming query        ResolverResponseMsg res = query.makeResponse();        CurrentCredential current = currentCredential;        if (null != current) {            res.setCredential(current.credentialDoc);        }        res.setResponse(pipeResp.getDocument(MimeMediaType.XMLUTF8).toString());        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Sending answer for query \'" + query.getQueryId() + "\' to : " + responseDest);        }        resolver.sendResponse(responseDest, res);        return ResolverService.OK;    }    /**     * {@inheritDoc}     */    public void processResponse(ResolverResponseMsg response) {        processResponse(response, null);    }    /**     * {@inheritDoc}     */    public void processResponse(ResolverResponseMsg response, EndpointAddress srcAddr) {        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("got a response for \'" + response.getQueryId() + "\'");        }        Reader resp = new StringReader(response.getResponse());        StructuredTextDocument doc = null;        try {            doc = (StructuredTextDocument)                    StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, resp);        } catch (Throwable e) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "malformed response - discard", e);            }            return;        } finally {            try {                resp.close();            } catch (IOException ignored) {// ignored            }            resp = null;        }        PipeResolverMessage pipeResp;        try {            pipeResp = new PipeResolverMsg(doc);        } catch (Throwable caught) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "malformed response - discarding.", caught);            }            return;        } finally {            doc = null;        }        // check if it's a response.        if (!pipeResp.getMsgType().equals(MessageType.ANSWER)) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("expected response - discarding.");            }            return;        }        PeerAdvertisement padv = pipeResp.getInputPeerAdv();        if ((null != padv) && !(myGroup.getPeerID().equals(padv.getPeerID()))) {            try {                // This is not our own peer adv so we keep it only for the default                // expiration time.                if (null == discovery) {                    discovery = myGroup.getDiscoveryService();                }                if (null != discovery) {                    discovery.publish(padv, DiscoveryService.DEFAULT_EXPIRATION, DiscoveryService.DEFAULT_EXPIRATION);                }            } catch (IOException ignored) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("could not publish peer adv");                }            }        }        String ipId = pipeResp.getPipeID().toString();        Set<ID> peerRsps = pipeResp.getPeerIDs();        for (Object peerRsp : peerRsps) {            // process each peer for which this response is about.            PeerID peer = (PeerID) peerRsp;            if (!pipeResp.isFound()) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("NACK for pipe \'" + ipId + "\' from peer " + peer);                }                // We have received a NACK. Remove that entry.                srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, 0);            } else {                long exp = getEntryExp(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer);                if ((PipeServiceImpl.VERIFYINTERVAL / 2) > exp) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Using Expiration " + (PipeServiceImpl.VERIFYINTERVAL / 2) + " which is > " + exp);                    }                    // create antry only if one does not exist,or entry exists with                    // lesser lifetime                    // cache the result for half the verify interval                    srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer,                            (PipeServiceImpl.VERIFYINTERVAL / 2));                } else {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("DB Expiration " + exp + " > " + (PipeServiceImpl.VERIFYINTERVAL / 2)                                + " overriding attempt to decrease lifetime");                    }                }            }            // call listener for pipeid            callListener(response.getQueryId(), pipeResp.getPipeID(), pipeResp.getPipeType(), peer, !pipeResp.isFound());        }    }    private long getEntryExp(String pkey, String skey, String value, PeerID peerid) {        List<SrdiIndex.Entry> list = srdiIndex.getRecord(pkey, skey, value);        for (SrdiIndex.Entry entry : list) {            if (entry.peerid.equals(peerid)) {                // exp in millis                return TimeUtils.toRelativeTimeMillis(entry.expiration);            }        }        return -1;    }    /**     * {@inheritDoc}     */    public boolean processSrdi(ResolverSrdiMsg message) {        if (!isRendezvous()) {            // avoid caching in non rendezvous mode            return true;        }        if (message == null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("no SRDI message");            }            return false;        }        if (message.getPayload() == null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("empty SRDI message");            }            return false;        }        SrdiMessage srdiMsg;        try {            StructuredTextDocument asDoc = (StructuredTextDocument)                    StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload()));            srdiMsg = new SrdiMessageImpl(asDoc);        } catch (Throwable e) {            // we don't understand this msg, let's skip it            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "Invalid SRDI message", e);            }            return false;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Received an SRDI messsage with " + srdiMsg.getEntries().size() + " entries from " + srdiMsg.getPeerID());        }        for (Object o : srdiMsg.getEntries()) {            Entry entry = (Entry) o;            srdiIndex.add(srdiMsg.getPrimaryKey(), entry.key, entry.value, srdiMsg.getPeerID(), entry.expiration);        }        if (!PipeService.PropagateType.equals(srdiMsg.getPrimaryKey())) {            // don't replicate entries for propagate pipes. For unicast type            // pipes the replica is useful in finding pipe instances. Since            // walking rather than searching is done for propagate pipes this            // appropriate.            srdi.replicateEntries(srdiMsg);        }        return true;    }    /**     * {@inheritDoc}     */    public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {// so what.    }    /**     * {@inheritDoc}     */    public void pushEntries(boolean all) {        pushSrdi((PeerID) null, all);    }    /**     * unregisters the resolver handler     */    void stop() {        resolver.unregisterHandler(PipeResolverName);        resolver.unregisterSrdiHandler(PipeResolverName);        srdiIndex.stop();        srdiIndex = null;        // stop the srdi thread        if (srdiThread != null) {            srdi.stop();        }        srdiThread = null;        srdi = null;        membership.removePropertyChangeListener("defaultCredential", membershipCredListener);        currentCredential = null;        // Avoid cross-reference problems with GC        myGroup = null;        resolver = null;        discovery = null;        membership = null;        outputpipeListeners.clear();        // close the local pipes        List<InputPipe> openLocalPipes = new ArrayList<InputPipe>(localInputPipes.values());        for (InputPipe aPipe : openLocalPipes) {            try {                aPipe.close();            } catch (Exception failed) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Failure closing " + aPipe);                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -