📄 piperesolver.java
字号:
} } localInputPipes.clear(); } /** * {@inheritDoc} */ public boolean register(InputPipe ip) { PipeID pipeID = (PipeID) ip.getPipeID(); synchronized (this) { if (localInputPipes.containsKey(pipeID)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Existing registered InputPipe for " + pipeID); } return false; } // Register this input pipe boolean registered = endpoint.addIncomingMessageListener((EndpointListener) ip, "PipeService", pipeID.toString()); if (!registered) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Existing registered Endpoint Listener for " + pipeID); } return false; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Registering local InputPipe for " + pipeID); } localInputPipes.put(pipeID, ip); } // Announce the pipe to SRDI so that others will know we are listening. pushSrdi(ip, true); // Call anyone who may be listening for this input pipe. callListener(0, pipeID, ip.getType(), myGroup.getPeerID(), false); return true; } /** * Return the local {@link net.jxta.pipe.InputPipe InputPipe}, if any, for the * specified {@link net.jxta.pipe.PipeID PipeID}. * * @param pipeID the PipeID who's InputPipe is desired. * @return The InputPipe object. */ public InputPipe findLocal(PipeID pipeID) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Find local InputPipe for " + pipeID); } // First look if the pipe is a local InputPipe InputPipe ip = localInputPipes.get(pipeID); // Found it. if ((null != ip) && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("found local InputPipe for " + pipeID); } return ip; } /** * {@inheritDoc} */ public boolean forget(InputPipe pipe) { PipeID pipeID = (PipeID) pipe.getPipeID(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Unregistering local InputPipe for " + pipeID); } // Unconditionally announce the change to SRDI. // FIXME 20040529 bondolo This is overkill, it should be able to wait // until the deltas are pushed. pushSrdi(pipe, false); InputPipe ip; synchronized (this) { ip = localInputPipes.remove(pipeID); } if (pipe != ip) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Pipe removed was not the same as the pipe to be removed!"); } } if (null != ip) { // remove the queue for the general demux EndpointListener removed = endpoint.removeIncomingMessageListener("PipeService", pipeID.toString()); if ((null == removed) || (pipe != removed)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("removeIncomingMessageListener() did not remove correct pipe!"); } } } return (ip != null); } /** * Add a pipe resolver listener * * @param listener listener * @param queryID The query this callback is being made in response to. * @param pipeID The pipe which is the subject of the event. * @return true if sucessfully added */ synchronized boolean addListener(ID pipeID, Listener listener, int queryID) { Map<Integer, Listener> perpipelisteners = outputpipeListeners.get(pipeID); // if no map for this pipeid, make one and add it to the top map. if (null == perpipelisteners) { perpipelisteners = new HashMap<Integer, Listener>(); outputpipeListeners.put(pipeID, perpipelisteners); } boolean alreadyThere = perpipelisteners.containsKey(queryID); if (!alreadyThere) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("adding listener for " + pipeID + " / " + queryID); } perpipelisteners.put(queryID, listener); } return alreadyThere; } /** * Call the listener for the specified pipe id informing it about the * specified peer. * * @param qid The query this callback is being made in response to. * @param pipeID The pipe which is the subject of the event. * @param type The type of the pipe which is the subject of the event. * @param peer The peer on which the remote input pipe was found. * @param NAK indicate whether the event is a nack */ void callListener(int qid, ID pipeID, String type, PeerID peer, boolean NAK) { Event newevent = new Event(this, peer, pipeID, type, qid); boolean handled = false; while (!handled) { Listener pipeListener; synchronized (this) { Map<Integer, Listener> perpipelisteners = outputpipeListeners.get(pipeID); if (null == perpipelisteners) { if ((ANYQUERY != qid) && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No listener for pipe " + pipeID); } break; } pipeListener = perpipelisteners.get(qid); } if (null != pipeListener) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Calling Pipe resolver listener " + (NAK ? "NAK " : "") + "for " + pipeID); } try { if (NAK) { handled = pipeListener.pipeNAKEvent(newevent); } else { handled = pipeListener.pipeResolveEvent(newevent); } } catch (Throwable ignored) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING , "Uncaught Throwable in listener for: " + pipeID + "(" + pipeListener.getClass().getName() + ")", ignored); } } } // if we havent tried it already, try it with the ANYQUERY if (ANYQUERY == qid) { break; } qid = ANYQUERY; } } /** * Remove a pipe resolver listener * * @param pipeID listener to remove * @param queryID matching queryid. * @return listener object removed */ synchronized Listener removeListener(ID pipeID, int queryID) { Map<Integer, Listener> perpipelisteners = outputpipeListeners.get(pipeID); if (null == perpipelisteners) { return null; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("removing listener for " + pipeID + " / " + queryID); } Listener removedListener = perpipelisteners.remove(queryID); if (perpipelisteners.isEmpty()) { outputpipeListeners.remove(pipeID); } return removedListener; } /** * Send a request to find an input pipe * * @param adv the advertisement for the pipe we are seeking. * @param acceptablePeers the set of peers at which we wish the pipe to * be resolved. We will not accept responses from peers other than those * in this set. Empty set means all peers are acceptable. * @param queryID the query ID to use for the query. if zero then a query * ID will be generated * @return the query id under which the request was sent */ int sendPipeQuery(PipeAdvertisement adv, Set<? extends ID> acceptablePeers, int queryID) { // choose a query id if non-prechosen. if (0 == queryID) { queryID = getNextQueryID(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine((acceptablePeers.isEmpty() ? "Undirected" : "Directed") + " query (" + queryID + ") for " + adv.getPipeID()); } Collection<? extends ID> targetPeers = new ArrayList<ID>(acceptablePeers); // check local srdi to see if we have a potential answer List<? extends ID> knownLocations = srdiIndex.query(adv.getType(), PipeAdvertisement.IdTag, adv.getPipeID().toString(), 100); if (!knownLocations.isEmpty()) { // we think we know where the pipe might be... if (!acceptablePeers.isEmpty()) { // only keep those peers which are acceptable. knownLocations.retainAll(acceptablePeers); } // if the known locations contain any of the acceptable peers then // we will send a directed query to ONLY those peers. if (!knownLocations.isEmpty()) { targetPeers = knownLocations; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Using SRDI cache results for directed query (" + queryID + ") for " + adv.getPipeID()); } } } // build the pipe query message. PipeResolverMessage pipeQry = new PipeResolverMsg(); pipeQry.setMsgType(MessageType.QUERY); pipeQry.setPipeID(adv.getPipeID()); pipeQry.setPipeType(adv.getType()); for (Object targetPeer : targetPeers) { pipeQry.addPeerID((PeerID) targetPeer); } StructuredTextDocument asDoc = (StructuredTextDocument) pipeQry.getDocument(MimeMediaType.XMLUTF8); // build the resolver query ResolverQuery query = new ResolverQuery(); query.setHandlerName(PipeResolverName); query.setQueryId(queryID); query.setSrcPeer(myGroup.getPeerID()); query.setQuery(asDoc.toString()); CurrentCredential current = currentCredential; if (null != current) { query.setCredential(current.credentialDoc); } if (targetPeers.isEmpty()) { // we have no idea, walk the tree if (myGroup.isRendezvous()) { // We are a rdv, then send it to the replica peer. PeerID peer = srdi.getReplicaPeer(pipeQry.getPipeType() + PipeAdvertisement.IdTag + pipeQry.getPipeID().toString()); if (null != peer) { srdi.forwardQuery(peer, query); return queryID; } } resolver.sendQuery(null, query); } else { // send it only to the peers whose result we would accept. for (ID targetPeer : targetPeers) { resolver.sendQuery(targetPeer.toString(), query); } } return queryID; } /** * {@inheritDoc} */ SrdiIndex getSrdiIndex() { return srdiIndex; } /** * {@inheritDoc} * <p/> * This implementation knows nothing of deltas, it just pushes it all. */ private void pushSrdi(PeerID peer, boolean all) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Pushing " + (all ? "all" : "deltas") + " SRDI to " + peer); } Map<String, List<Entry>> types = new HashMap<String, List<Entry>>(); synchronized (this) { for (InputPipe ip : localInputPipes.values()) { Entry entry = new Entry(PipeAdvertisement.IdTag, ip.getPipeID().toString(), Long.MAX_VALUE); String type = ip.getType(); List<Entry> entries = types.get(type); if (null == entries) { entries = new ArrayList<Entry>(); types.put(type, entries); } entries.add(entry); } } for (String type : types.keySet()) { List<Entry> entries = types.get(type); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a Pipe SRDI messsage in " + myGroup.getPeerGroupID() + " of " + entries.size() + " entries of type " + type); } SrdiMessage srdiMsg = new SrdiMessageImpl(myGroup.getPeerID(), 1, type, entries); if (null == peer) { srdi.pushSrdi(null, srdiMsg); } else { srdi.pushSrdi(peer, srdiMsg); } } } /** * Push SRDI entry for the specified pipe * * @param ip the pipe who's entry we are pushing * @param adding adding an entry for the pipe or expiring the entry? */ private void pushSrdi(InputPipe ip, boolean adding) { srdiIndex.add(ip.getType(), PipeAdvertisement.IdTag, ip.getPipeID().toString(), myGroup.getPeerID(), adding ? Long.MAX_VALUE : 0); SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(myGroup.getPeerID(), 1, // ttl ip.getType(), PipeAdvertisement.IdTag, ip.getPipeID().toString(), adding ? Long.MAX_VALUE : 0); if (myGroup.isRendezvous()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Replicating a" + (adding ? "n add" : " remove") + " Pipe SRDI entry for pipe [" + ip.getPipeID() + "] of type " + ip.getType()); } srdi.replicateEntries(srdiMsg); } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a" + (adding ? "n add" : " remove") + " Pipe SRDI messsage for pipe [" + ip.getPipeID() + "] of type " + ip.getType()); } srdi.pushSrdi(null, srdiMsg); } } catch (Throwable e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Uncaught throwable pushing SRDI entries", e); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -