📄 pipeserviceimpl.java
字号:
if (type == null) { throw new IllegalArgumentException("PipeAdvertisement type may not be null"); } PipeID pipeId = (PipeID) adv.getPipeID(); if (pipeId == null) { throw new IllegalArgumentException("PipeAdvertisement PipeID may not be null"); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Create " + type + " InputPipe for " + pipeId); } InputPipe inputPipe; // create an InputPipe. if (type.equals(PipeService.UnicastType)) { inputPipe = new InputPipeImpl(pipeResolver, adv, listener); } else if (type.equals(PipeService.UnicastSecureType)) { inputPipe = new SecureInputPipeImpl(pipeResolver, adv, listener); } else if (type.equals(PipeService.PropagateType)) { if (wirePipe != null) { inputPipe = wirePipe.createInputPipe(adv, listener); } else { throw new IOException("No propagated pipe servive available"); } } else { // Unknown type if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Cannot create pipe for unknown type : " + type); } throw new IOException("Cannot create pipe for unknown type : " + type); } return inputPipe; } /** * {@inheritDoc} */ public OutputPipe createOutputPipe(PipeAdvertisement pipeAdv, long timeout) throws IOException { return createOutputPipe(pipeAdv, Collections.<ID>emptySet(), timeout); } /** * {@inheritDoc} */ public OutputPipe createOutputPipe(PipeAdvertisement adv, Set<? extends ID> resolvablePeers, long timeout) throws IOException { // convert zero to max value. if (0 == timeout) { timeout = Long.MAX_VALUE; } long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout); // Make a listener, start async resolution and then wait until the timeout expires. syncListener localListener = new syncListener(); int queryid = PipeResolver.getNextQueryID(); createOutputPipe(adv, resolvablePeers, localListener, queryid); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting synchronously for " + timeout + "ms to resolve OutputPipe for " + adv.getPipeID()); } try { synchronized (localListener) { while ((null == localListener.event) && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), absoluteTimeOut) < 0)) { try { localListener.wait(TimeUtils.ASECOND); } catch (InterruptedException woken) { Thread.interrupted(); } } } } finally { // remove the listener we installed. removeOutputPipeListener(adv.getPipeID().toString(), queryid); } if (null != localListener.event) { return localListener.event.getOutputPipe(); } else { throw new IOException("Output Pipe could not be resolved after " + timeout + "ms."); } } /** * {@inheritDoc} */ public void createOutputPipe(PipeAdvertisement pipeAdv, OutputPipeListener listener) throws IOException { createOutputPipe(pipeAdv, Collections.<ID>emptySet(), listener); } /** * {@inheritDoc} */ public void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener) throws IOException { createOutputPipe(pipeAdv, resolvablePeers, listener, PipeResolver.getNextQueryID()); } private void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener, int queryid) throws IOException { if (!started) { throw new IOException("Pipe Service has not been started or has been stopped"); } // Recover the PipeId from the PipeServiceImpl Advertisement PipeID pipeId = (PipeID) pipeAdv.getPipeID(); String type = pipeAdv.getType(); if (null == type) { IllegalArgumentException failed = new IllegalArgumentException("Pipe type was not set"); if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, failed.getMessage(), failed); } throw failed; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Create " + type + " OutputPipe for " + pipeId); } if (PipeService.PropagateType.equals(type)) { OutputPipe op; if (resolvablePeers.size() == 1) { op = new BlockingWireOutputPipe(group, pipeAdv, (PeerID) resolvablePeers.iterator().next()); } else { if (wirePipe != null) { op = wirePipe.createOutputPipe(pipeAdv, resolvablePeers); } else { throw new IOException("No propagated pipe service available"); } } if (null != op) { OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeId.toString(), PipeResolver.ANYQUERY); try { listener.outputPipeEvent(newevent); } catch (Throwable ignored) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in listener for " + pipeId + " (" + listener.getClass().getName() + ")", ignored); } } } } else if (PipeService.UnicastType.equals(type) || PipeService.UnicastSecureType.equals(type)) { addOutputPipeListener(pipeId, new OutputPipeHolder(pipeAdv, resolvablePeers, listener, queryid)); pipeResolver.addListener(pipeId, this, queryid); pipeResolver.sendPipeQuery(pipeAdv, resolvablePeers, queryid); // look locally for the pipe if (resolvablePeers.isEmpty() || resolvablePeers.contains(group.getPeerID())) { InputPipe local = pipeResolver.findLocal(pipeId); // if we have a local instance, make sure the local instance is of the same type. if (null != local) { if (local.getType().equals(pipeAdv.getType())) { pipeResolver.callListener(queryid, pipeId, local.getType(), group.getPeerID(), false); } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning(MessageFormat.format("rejecting local pipe ({0}) because type is not ({1})", local.getType(), pipeAdv.getType())); } } } } } else { // Unknown type if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("createOutputPipe: cannot create pipe for unknown type : " + type); } throw new IOException("cannot create pipe for unknown type : " + type); } } /* * Add an output listener for the given pipeId. */ private void addOutputPipeListener(PipeID pipeId, OutputPipeHolder pipeHolder) { synchronized (outputPipeListeners) { Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeId); if (perpipelisteners == null) { perpipelisteners = new HashMap<Integer, OutputPipeHolder>(); outputPipeListeners.put(pipeId, perpipelisteners); } if (perpipelisteners.get(pipeHolder.queryid) != null) { LOG.warning("Clobbering output pipe listener for query " + pipeHolder.queryid); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Adding pipe listener for pipe " + pipeId + " and query " + pipeHolder.queryid); } perpipelisteners.put(pipeHolder.queryid, pipeHolder); } } /** * {@inheritDoc} */ public OutputPipeListener removeOutputPipeListener(String pipeID, OutputPipeListener listener) { throw new UnsupportedOperationException("Legacy method not supported. Use interface object if you need this method."); } /** * {@inheritDoc} */ public OutputPipeListener removeOutputPipeListener(ID pipeID, OutputPipeListener listener) { // remove all instances of this listener, regardless of queryid if (pipeResolver == null) { return null; } if (!(pipeID instanceof PipeID)) { throw new IllegalArgumentException("pipeID must be a PipeID."); } synchronized (outputPipeListeners) { Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID); if (perpipelisteners != null) { Set<Map.Entry<Integer, OutputPipeHolder>> entries = perpipelisteners.entrySet(); for (Map.Entry<Integer, OutputPipeHolder> entry : entries) { OutputPipeHolder pl = entry.getValue(); if (pl.listener == listener) { pipeResolver.removeListener((PipeID) pipeID, pl.queryid); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Removing listener for query " + pl.queryid); } perpipelisteners.remove(entry.getKey()); } } // clean up the map if there are no more listeners for the pipe if (perpipelisteners.isEmpty()) { outputPipeListeners.remove(pipeID); } } } return listener; } private OutputPipeListener removeOutputPipeListener(String opID, int queryID) { if (pipeResolver == null) { return null; } PipeID pipeID; try { URI aPipeID = new URI(opID); pipeID = (PipeID) IDFactory.fromURI(aPipeID); } catch (URISyntaxException badID) { throw new IllegalArgumentException("Bad pipe ID: " + opID); } catch (ClassCastException badID) { throw new IllegalArgumentException("id was not a pipe id: " + opID); } synchronized (outputPipeListeners) { Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID); if (perpipelisteners != null) { OutputPipeHolder pipeHolder = perpipelisteners.get(queryID); perpipelisteners.remove(queryID); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Removing listener for query " + queryID); } // clean up the map if there are no more listeners for the pipe if (perpipelisteners.isEmpty()) { outputPipeListeners.remove(pipeID); } pipeResolver.removeListener(pipeID, queryID); if (pipeHolder != null) { return pipeHolder.listener; } } } return null; } /** * {@inheritDoc} */ public boolean pipeResolveEvent(PipeResolver.Event event) { try { ID peerID = event.getPeerID(); ID pipeID = event.getPipeID(); int queryID = event.getQueryID(); OutputPipeHolder pipeHolder; synchronized (outputPipeListeners) { Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID); if (perpipelisteners == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No listener for event for pipe " + pipeID); } return false; } pipeHolder = perpipelisteners.get(queryID); if (pipeHolder == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No listener for event for query " + queryID); } return false; } } // check if they wanted a resolve from a specific peer. if (!pipeHolder.peers.isEmpty() && !pipeHolder.peers.contains(peerID)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Event was for wrong peer \'" + peerID + "\'. Discarding."); } return false; } // create op String type = pipeHolder.adv.getType(); OutputPipe op; if (PipeService.UnicastType.equals(type)) { op = new NonBlockingOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers); } else if (PipeService.UnicastSecureType.equals(type)) { op = new SecureOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers); } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Could not create output pipe of type \'" + type + "\'. Discarding."); } return false; } // Generate an event when the output pipe was succesfully opened. OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeID.toString(), queryID); try { pipeHolder.listener.outputPipeEvent(newevent); } catch (Throwable ignored) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE , "Uncaught Throwable in listener for " + pipeID + "(" + pipeHolder.getClass().getName() + ")", ignored); } } removeOutputPipeListener(pipeID.toString(), queryID); return true; } catch (IOException ie) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Error creating output pipe " + event.getPipeID(), ie); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No listener for event for " + event.getPipeID()); } return false; } /** * {@inheritDoc} * <p/> * We don't do anything with NAKs (yet) */ public boolean pipeNAKEvent(PipeResolver.Event event) { return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -