📄 piperesolver.java
字号:
doc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, queryReader); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("discarding malformed request ", e); } // no sense in re-propagation here return ResolverService.OK; } finally { try { queryReader.close(); } catch (IOException ignored) { ; } queryReader = null; } PipeResolverMessage pipeQuery; try { pipeQuery = new PipeResolverMsg(doc); } catch (IllegalArgumentException badDoc) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("discarding malformed request ", badDoc); } // no sense in re-propagation here return ResolverService.OK; } finally { doc = null; } // is it a query? if (!pipeQuery.getMsgType().equals(MessageType.QUERY)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("expected query - discarding."); } // no sense in re-propagation here return ResolverService.OK; } // see if it is a query directed at our peer. Set destPeers = pipeQuery.getPeerIDs(); boolean directedQuery = !destPeers.isEmpty(); boolean queryForMe = !directedQuery; if (directedQuery) { Iterator eachDestPeer = destPeers.iterator(); while (eachDestPeer.hasNext()) { ID aPeer = (ID) eachDestPeer.next(); if (aPeer.equals(myGroup.getPeerID())) { queryForMe = true; break; } } if (!queryForMe) { // It is an directed query, but request wasn't for this peer. if (query.getSrc().equals(queryFrom)) { // we only respond if the original src was not the query forwarder if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("discarding query. Query not for us."); } // tell the resolver no further action is needed. return ResolverService.OK; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("responding to 'misdirected' forwarded query."); } responseDest = queryFrom; } } PeerID peerID = null; if (queryForMe) { // look locally. InputPipe ip = findLocal((PipeID) pipeQuery.getPipeID()); if ((ip != null) && (ip.getType().equals(pipeQuery.getPipeType()))) { peerID = myGroup.getPeerID(); } } if ((null == peerID) && !directedQuery) { // This request was sent to everyone. if (myGroup.isRendezvous()) { // We are a RDV, allow the ResolverService to repropagate the query List results = srdiIndex.query(pipeQuery.getPipeType(), PipeAdvertisement.IdTag, pipeQuery.getPipeID().toString(), 20); if (results.size() > 0) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("forwarding query to " + results.size() + " peers"); } Iterator eachPeer = results.iterator(); while (eachPeer.hasNext()) { PeerID aPeer = (PeerID) eachPeer.next(); srdi.forwardQuery(aPeer, query); } // tell the resolver no further action is needed. return ResolverService.OK; } // we don't know anything, continue the walk. return ResolverService.Repropagate; } else { // We are an edge if (query.getSrc().equals(queryFrom)) { // we only respond if the original src was not the query forwarder if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("discarding query."); } // tell the resolver no further action is needed. return ResolverService.OK; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("responding to query forwarded for 'misdirected' query."); } responseDest = queryFrom; } } // Build the answer PipeResolverMessage pipeResp = new PipeResolverMsg(); pipeResp.setMsgType(MessageType.ANSWER); pipeResp.setPipeID(pipeQuery.getPipeID()); pipeResp.setPipeType(pipeQuery.getPipeType()); if (null == peerID) { // respond negative. pipeResp.addPeerID(myGroup.getPeerID()); pipeResp.setFound(false); } else { pipeResp.addPeerID(peerID); pipeResp.setFound(true); pipeResp.setInputPeerAdv(myGroup.getPeerAdvertisement()); } // make a response from the incoming query ResolverResponseMsg res = query.makeResponse(); res.setCredential(credentialDoc); res.setResponse(pipeResp.getDocument(MimeMediaType.XMLUTF8).toString()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending answer for query '" + query.getQueryId() + "' to : " + responseDest); } resolver.sendResponse(responseDest, res); return ResolverService.OK; } /** * {@inheritDoc} **/ public void processResponse(ResolverResponseMsg response) { processResponse(response, null); } /** * {@inheritDoc} **/ public void processResponse(ResolverResponseMsg response, EndpointAddress srcAddr) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("got a response for '" + response.getQueryId() + "'"); } Reader resp = new StringReader(response.getResponse()); StructuredTextDocument doc = null; try { doc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, resp); } catch (Throwable e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("malformed response - discard", e); } return; } finally { try { resp.close(); } catch (IOException ignored) { ; } resp = null; } PipeResolverMessage pipeResp; try { pipeResp = new PipeResolverMsg(doc); } catch (Throwable caught) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("malformed response - discarding.", caught); } return; } finally { doc = null; } // check if it's a response. if (!pipeResp.getMsgType().equals(MessageType.ANSWER)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("expected response - discarding."); } return; } PeerAdvertisement padv = pipeResp.getInputPeerAdv(); if ((null != padv) && !(myGroup.getPeerID().equals(padv.getPeerID()))) { try { // This is not our own peer adv so we keep it only for the default // expiration time. if (null == discovery) { discovery = myGroup.getDiscoveryService(); } if (null != discovery) { discovery.publish(padv, DiscoveryService.DEFAULT_EXPIRATION, DiscoveryService.DEFAULT_EXPIRATION); } } catch (IOException ignored) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("could not publish peer adv"); } } } String ipId = pipeResp.getPipeID().toString(); Set peerRsps = pipeResp.getPeerIDs(); Iterator eachResp = peerRsps.iterator(); while (eachResp.hasNext()) { // process each peer for which this response is about. PeerID peer = (PeerID) eachResp.next(); if (!pipeResp.isFound()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("NACK for pipe '" + ipId + "' from peer " + peer); } // We have received a NACK. Remove that entry. srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, 0); } else { long exp = getEntryExp(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer); if ((PipeServiceImpl.VERIFYINTERVAL / 2) > exp) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Using Expiration "+(PipeServiceImpl.VERIFYINTERVAL / 2) +" which is > "+ exp); } // create antry only if one does not exist,or entry exists with // lesser lifetime // cache the result for half the verify interval srdiIndex.add(pipeResp.getPipeType(), PipeAdvertisement.IdTag, ipId, peer, (PipeServiceImpl.VERIFYINTERVAL / 2)); } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("DB Expiration "+ exp + " > "+(PipeServiceImpl.VERIFYINTERVAL / 2)+ " overriding attempt to decrease lifetime"); } } } // call listener for pipeid callListener(response.getQueryId(), (PipeID) pipeResp.getPipeID(), pipeResp.getPipeType(), peer, !pipeResp.isFound()); } } /** * Given a list of Entries returns the exp record of peerid * otherwise a -1 */ private long getEntryExp(String pkey, String skey, String value, PeerID peerid){ List list = srdiIndex.getRecord(pkey, skey, value); Iterator it=list.iterator(); while (it.hasNext()){ SrdiIndex.Entry entry = (SrdiIndex.Entry) it.next(); if (entry.peerid.equals(peerid)) { // exp in millis return TimeUtils.toRelativeTimeMillis(entry.expiration); } } return -1; } /** * {@inheritDoc} **/ public boolean processSrdi(ResolverSrdiMsg message) { if (message == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("no SRDI message"); } return false; } if (message.getPayload() == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("empty SRDI message"); } return false; } SrdiMessage srdiMsg; try { StructuredTextDocument asDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload())); srdiMsg = new SrdiMessageImpl(asDoc); } catch (Throwable e) { // we don't understand this msg, let's skip it if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Invalid SRDI message", e); } return false; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received an SRDI messsage with " + srdiMsg.getEntries().size() + " entries from " + srdiMsg.getPeerID()); } // add the entries too Iterator eachEntry = srdiMsg.getEntries().iterator(); while (eachEntry.hasNext()) { Entry entry = (Entry) eachEntry.next(); srdiIndex.add(srdiMsg.getPrimaryKey(), entry.key, entry.value, srdiMsg.getPeerID(), entry.expiration); } if (!PipeService.PropagateType.equals(srdiMsg.getPrimaryKey())) { // don't replicate entries for propagate pipes. For unicast type // pipes the replica is useful in finding pipe instances. Since // walking rather than searching is done for propagate pipes this // appropriate. srdi.replicateEntries(srdiMsg); } return true; } /** * {@inheritDoc} **/ public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {// so what. } /** * {@inheritDoc} **/ public void pushEntries(boolean all) { pushSrdi((PeerID) null, all); } /** * unregisters the resolver handler **/ void stop() { resolver.unregisterHandler(PipeResolverName); resolver.unregisterSrdiHandler(PipeResolverName); srdiIndex.stop(); srdiIndex = null; // stop the srdi thread if (srdiThread != null) { srdi.stop(); } srdiThread = null; srdi = null; membership.removePropertyChangeListener("defaultCredential", membershipCredListener); membershipCredListener = null; credential = null; credentialDoc = null; // Avoid cross-reference problems with GC myGroup = null; resolver = null; discovery = null; membership = null; outputpipeListeners.clear(); // close the local pipes Iterator eachLocalInputPipe = Arrays.asList(localInputPipes.values().toArray()).iterator(); while (eachLocalInputPipe.hasNext()) { InputPipe aPipe = (InputPipe) eachLocalInputPipe.next(); try { aPipe.close(); } catch (Exception failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure closing " + aPipe); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -