📄 piperesolver.java
字号:
return processQuery(query, null); } /** * {@inheritDoc} */ public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddr) { String queryFrom; if (null != srcAddr) { if ("jxta".equals(srcAddr.getProtocolName())) { queryFrom = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress(); } else { // we don't know who routed us the query. Assume it came from the source. queryFrom = query.getSrcPeer().toString(); } } else { // we don't know who routed us the query. Assume it came from the source. queryFrom = query.getSrcPeer().toString(); } String responseDest = query.getSrcPeer().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Starting for :" + query.getQueryId() + " from " + srcAddr); } Reader queryReader = new StringReader(query.getQuery()); StructuredTextDocument doc = null; try { doc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, queryReader); } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "discarding malformed request ", e); } // no sense in re-propagation here return ResolverService.OK; } finally { try { queryReader.close(); } catch (IOException ignored) { // ignored } queryReader = null; } PipeResolverMessage pipeQuery; try { pipeQuery = new PipeResolverMsg(doc); } catch (IllegalArgumentException badDoc) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "discarding malformed request ", badDoc); } // no sense in re-propagation here return ResolverService.OK; } finally { doc = null; } // is it a query? if (!pipeQuery.getMsgType().equals(MessageType.QUERY)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("expected query - discarding."); } // no sense in re-propagation here return ResolverService.OK; } // see if it is a query directed at our peer. Set<ID> destPeers = pipeQuery.getPeerIDs(); boolean directedQuery = !destPeers.isEmpty(); boolean queryForMe = !directedQuery; if (directedQuery) { for (Object destPeer : destPeers) { ID aPeer = (ID) destPeer; if (aPeer.equals(myGroup.getPeerID())) { queryForMe = true; break; } } if (!queryForMe) { // It is an directed query, but request wasn't for this peer. if (query.getSrcPeer().toString().equals(queryFrom)) { // we only respond if the original src was not the query forwarder if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("discarding query. Query not for us."); } // tell the resolver no further action is needed. return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("responding to \'misdirected\' forwarded query."); } responseDest = queryFrom; } } PeerID peerID = null; if (queryForMe) { // look locally. InputPipe ip = findLocal((PipeID) pipeQuery.getPipeID()); if ((ip != null) && (ip.getType().equals(pipeQuery.getPipeType()))) { peerID = myGroup.getPeerID(); } } if ((null == peerID) && !directedQuery) { // This request was sent to everyone. if (myGroup.isRendezvous()) { // We are a RDV, allow the ResolverService to repropagate the query List<PeerID> results = srdiIndex.query(pipeQuery.getPipeType(), PipeAdvertisement.IdTag, pipeQuery.getPipeID().toString(), 20); if (!results.isEmpty()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("forwarding query to " + results.size() + " peers"); } srdi.forwardQuery(results, query); // tell the resolver no further action is needed. return ResolverService.OK; } // we don't know anything, continue the walk. return ResolverService.Repropagate; } else { // We are an edge if (query.getSrcPeer().toString().equals(queryFrom)) { // we only respond if the original src was not the query forwarder if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("discarding query."); } // tell the resolver no further action is needed. return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("responding to query forwarded for \'misdirected\' query."); } responseDest = queryFrom; } } // Build the answer PipeResolverMessage pipeResp = new PipeResolverMsg(); pipeResp.setMsgType(MessageType.ANSWER); pipeResp.setPipeID(pipeQuery.getPipeID()); pipeResp.setPipeType(pipeQuery.getPipeType()); if (null == peerID) { // respond negative. pipeResp.addPeerID(myGroup.getPeerID()); pipeResp.setFound(false); } else { pipeResp.addPeerID(peerID); pipeResp.setFound(true); pipeResp.setInputPeerAdv(myGroup.getPeerAdvertisement()); } // make a response from the incoming query ResolverResponseMsg res = query.makeResponse(); CurrentCredential current = currentCredential; if (null != current) { res.setCredential(current.credentialDoc); } res.setResponse(pipeResp.getDocument(MimeMediaType.XMLUTF8).toString()); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending answer for query \'" + query.getQueryId() + "\' to : " + responseDest); } resolver.sendResponse(responseDest, res); return ResolverService.OK; } /** * {@inheritDoc} */ public void processResponse(ResolverResponseMsg response) { processResponse(response, null); } /** * {@inheritDoc} */ public void processResponse(ResolverResponseMsg response, EndpointAddress srcAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("got a response for \'" + response.getQueryId() + "\'"); } Reader resp = new StringReader(response.getResponse()); StructuredTextDocument doc = null; try { doc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, resp); } catch (Throwable e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "malformed response - discard", e); } return; } finally { try { resp.close(); } catch (IOException ignored) {// ignored } resp = null; } PipeResolverMessage pipeResp; try { pipeResp = new PipeResolverMsg(doc); } catch (Throwable caught) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "malformed response - discarding.", caught); } return; } finally { doc = null; } // check if it's a response. if (!pipeResp.getMsgType().equals(MessageType.ANSWER)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("expected response - discarding."); } return; } PeerAdvertisement padv = pipeResp.getInputPeerAdv(); if ((null != padv) && !(myGroup.getPeerID().equals(padv.getPeerID()))) { try { // This is not our own peer adv so we keep it only for the default // expiration time. if (null == discovery) { discovery = myGroup.getDiscoveryService(); } if (null != discovery) { discovery.publish(padv, DiscoveryService.DEFAULT_EXPIRATION, DiscoveryService.DEFAULT_EXPIRATION); } } catch (IOException ignored) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("could not publish peer adv"); } } } String ipId = pipeResp.getPipeID().toString(); Set<ID> peerRsps = pipeResp.getPeerIDs(); for (Object peerRsp : peerRsps) { // process each peer for which this response is about. PeerID peer = (PeerID) peerRsp; if (!pipeResp.isFound()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("NACK for pipe \'" + ipId + "\' from peer " + peer); } // We have received a NACK. Remove that entry. srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, 0); } else { long exp = getEntryExp(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer); if ((PipeServiceImpl.VERIFYINTERVAL / 2) > exp) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Using Expiration " + (PipeServiceImpl.VERIFYINTERVAL / 2) + " which is > " + exp); } // create antry only if one does not exist,or entry exists with // lesser lifetime // cache the result for half the verify interval srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, (PipeServiceImpl.VERIFYINTERVAL / 2)); } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("DB Expiration " + exp + " > " + (PipeServiceImpl.VERIFYINTERVAL / 2) + " overriding attempt to decrease lifetime"); } } } // call listener for pipeid callListener(response.getQueryId(), pipeResp.getPipeID(), pipeResp.getPipeType(), peer, !pipeResp.isFound()); } } private long getEntryExp(String pkey, String skey, String value, PeerID peerid) { List<SrdiIndex.Entry> list = srdiIndex.getRecord(pkey, skey, value); for (SrdiIndex.Entry entry : list) { if (entry.peerid.equals(peerid)) { // exp in millis return TimeUtils.toRelativeTimeMillis(entry.expiration); } } return -1; } /** * {@inheritDoc} */ public boolean processSrdi(ResolverSrdiMsg message) { if (!isRendezvous()) { // avoid caching in non rendezvous mode return true; } if (message == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("no SRDI message"); } return false; } if (message.getPayload() == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("empty SRDI message"); } return false; } SrdiMessage srdiMsg; try { StructuredTextDocument asDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload())); srdiMsg = new SrdiMessageImpl(asDoc); } catch (Throwable e) { // we don't understand this msg, let's skip it if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Invalid SRDI message", e); } return false; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Received an SRDI messsage with " + srdiMsg.getEntries().size() + " entries from " + srdiMsg.getPeerID()); } for (Object o : srdiMsg.getEntries()) { Entry entry = (Entry) o; srdiIndex.add(srdiMsg.getPrimaryKey(), entry.key, entry.value, srdiMsg.getPeerID(), entry.expiration); } if (!PipeService.PropagateType.equals(srdiMsg.getPrimaryKey())) { // don't replicate entries for propagate pipes. For unicast type // pipes the replica is useful in finding pipe instances. Since // walking rather than searching is done for propagate pipes this // appropriate. srdi.replicateEntries(srdiMsg); } return true; } /** * {@inheritDoc} */ public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {// so what. } /** * {@inheritDoc} */ public void pushEntries(boolean all) { pushSrdi((PeerID) null, all); } /** * unregisters the resolver handler */ void stop() { resolver.unregisterHandler(PipeResolverName); resolver.unregisterSrdiHandler(PipeResolverName); srdiIndex.stop(); srdiIndex = null; // stop the srdi thread if (srdiThread != null) { srdi.stop(); } srdiThread = null; srdi = null; membership.removePropertyChangeListener("defaultCredential", membershipCredListener); currentCredential = null; // Avoid cross-reference problems with GC myGroup = null; resolver = null; discovery = null; membership = null; outputpipeListeners.clear(); // close the local pipes List<InputPipe> openLocalPipes = new ArrayList<InputPipe>(localInputPipes.values()); for (InputPipe aPipe : openLocalPipes) { try { aPipe.close(); } catch (Exception failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Failure closing " + aPipe); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -