📄 pipeserviceimpl.java
字号:
if (type == null) { throw new IllegalArgumentException("PipeAdvertisement type may not be null"); } PipeID pipeId = (PipeID) adv.getPipeID(); if (pipeId == null) { throw new IllegalArgumentException("PipeAdvertisement PipeID may not be null"); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Create " + type + " InputPipe for " + pipeId); } InputPipe ip = null; // create an InputPipe. if(type.equals(PipeService.UnicastType)) { ip = new UnicastInputPipeImpl(pipeResolver, adv, listener); } else if(type.equals(PipeService.UnicastSecureType)) { ip = new SecureInputPipeImpl(pipeResolver, adv, listener); } else if(type.equals(PipeService.PropagateType)) { if (wirePipe != null) { ip = wirePipe.createInputPipe(adv, listener); } else { throw new IOException("No propagated pipe servive available"); } } else { // Unknown type if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Cannot create pipe for unknown type : " + type); } throw new IOException("Cannot create pipe for unknown type : " + type); } return ip; } /** * {@inheritDoc} */ public OutputPipe createOutputPipe(PipeAdvertisement pipeAdv, long timeout) throws IOException { return createOutputPipe(pipeAdv, Collections.EMPTY_SET, timeout); } /** * {@inheritDoc} */ public OutputPipe createOutputPipe(PipeAdvertisement adv, Set resolvablePeers, long timeout) throws IOException { // convert zero to max value. if(0 == timeout) { timeout = Long.MAX_VALUE; } long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout); // Make a listener, start async resolution and then wait until the timeout expires. syncListener localListener = new syncListener(); int queryid = PipeResolver.getNextQueryID(); createOutputPipe(adv, resolvablePeers, localListener, queryid); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting synchronously for " + timeout + "ms to resolve OutputPipe for " + adv.getPipeID()); } try { synchronized(localListener) { while((null == localListener.event) && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), absoluteTimeOut) < 0)) { try { localListener.wait(TimeUtils.ASECOND); } catch (InterruptedException woken) { Thread.interrupted(); } } } } finally { // remove the listener we installed. removeOutputPipeListener(adv.getPipeID().toString(), queryid); } if (null != localListener.event) { return localListener.event.getOutputPipe(); } else { throw new IOException("Output Pipe could not be resolved after " + timeout + "ms."); } } /** * {@inheritDoc} */ public void createOutputPipe(PipeAdvertisement pipeAdv, OutputPipeListener listener) throws IOException { createOutputPipe(pipeAdv, Collections.EMPTY_SET, listener); } /** * {@inheritDoc} */ public void createOutputPipe(PipeAdvertisement pipeAdv, Set resolvablePeers, OutputPipeListener listener) throws IOException { createOutputPipe(pipeAdv, resolvablePeers, listener, PipeResolver.getNextQueryID()); } private void createOutputPipe(PipeAdvertisement pipeAdv, Set resolvablePeers, OutputPipeListener listener, int queryid) throws IOException { if (!started) { throw new IllegalStateException("Pipe Service has not been started or has been stopped"); } // Recover the PipeId from the PipeServiceImpl Advertisement PipeID pipeId = (PipeID) pipeAdv.getPipeID(); String type = pipeAdv.getType(); if(null == type) { IllegalArgumentException failed = new IllegalArgumentException("Pipe type was not set"); if (LOG.isEnabledFor(Level.ERROR)) { LOG.error(failed, failed); } throw failed; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Create " + type + " OutputPipe for " + pipeId); } if (PipeService.PropagateType.equals(type)) { OutputPipe op = null; if (wirePipe != null) { op = wirePipe.createOutputPipe(pipeAdv, resolvablePeers); } else { throw new IOException("No propagated pipe service available"); } if(null != op) { OutputPipeEvent newevent = new OutputPipeEvent( this.getInterface(), op, pipeId.toString(), PipeResolver.ANYQUERY); try { listener.outputPipeEvent(newevent); } catch(Throwable ignored) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in listener for " + pipeId + " (" + listener.getClass().getName() + ")" , ignored); } } } return; } else if (PipeService.UnicastType.equals(type) || PipeService.UnicastSecureType.equals(type)) { addOutputPipeListener(pipeId, new OutputPipeHolder(pipeAdv, resolvablePeers, listener, queryid)); pipeResolver.addListener(pipeId, this, queryid); pipeResolver.sendPipeQuery(pipeAdv, resolvablePeers, queryid); // look locally for the pipe if(resolvablePeers.isEmpty() || resolvablePeers.contains(myGroup.getPeerID())) { InputPipe local = pipeResolver.findLocal(pipeId); // if we have a local instance, make sure the local instance is of the same type. if(null != local) { if (local.getType().equals(pipeAdv.getType())) { pipeResolver.callListener(queryid, pipeId, local.getType(), myGroup.getPeerID(), false ); } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("rejecting local pipe (" + local.getType() + ") because type is not (" + pipeAdv.getType() + ")"); } } } } } else { // Unknown type if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("createOutputPipe: cannot create pipe for unknown type : " + type); } throw new IOException("cannot create pipe for unknown type : " + type); } } /* * Add an output listener for the given pipeId. */ private void addOutputPipeListener(PipeID pipeId, OutputPipeHolder pipeHolder) { synchronized (outputPipeListeners) { Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeId); if (perpipelisteners == null) { perpipelisteners = new HashMap<Integer,OutputPipeHolder>(); outputPipeListeners.put(pipeId, perpipelisteners); } if (perpipelisteners.get(new Integer(pipeHolder.queryid)) != null) { LOG.warn("Clobbering output pipe listener for query " + pipeHolder.queryid); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Adding pipe listener for pipe " + pipeId.toString() + " and query " + pipeHolder.queryid); } perpipelisteners.put(new Integer(pipeHolder.queryid), pipeHolder); } } /** * {@inheritDoc} */ public OutputPipeListener removeOutputPipeListener(String opID, OutputPipeListener listener) { // remove all instances of this listener, regardless of queryid if (pipeResolver == null) { return null; } PipeID pipeID; try { URI aPipeID = new URI(opID); pipeID = (PipeID) IDFactory.fromURI(aPipeID); } catch (URISyntaxException badID) { throw new IllegalArgumentException("Bad pipe ID: " + opID); } catch (ClassCastException badID) { throw new IllegalArgumentException("id was not a pipe id: " + opID); } synchronized (outputPipeListeners) { Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID); if (perpipelisteners != null) { Set<Map.Entry<Integer,OutputPipeHolder>> entries = perpipelisteners.entrySet(); for (Map.Entry<Integer,OutputPipeHolder> entry : entries) { OutputPipeHolder pl = entry.getValue(); if (pl.listener == listener) { pipeResolver.removeListener(pipeID, pl.queryid); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Removing listener for query " + pl.queryid); } perpipelisteners.remove(entry.getKey()); } } // clean up the map if there are no more listeners for the pipe if (perpipelisteners.isEmpty()) { outputPipeListeners.remove(pipeID); } } } return listener; } private OutputPipeListener removeOutputPipeListener(String opID, int queryID) { if (pipeResolver == null) { return null; } PipeID pipeID; try { URI aPipeID = new URI(opID); pipeID = (PipeID) IDFactory.fromURI(aPipeID); } catch (URISyntaxException badID) { throw new IllegalArgumentException("Bad pipe ID: " + opID); } catch (ClassCastException badID) { throw new IllegalArgumentException("id was not a pipe id: " + opID); } synchronized (outputPipeListeners) { Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID); if (perpipelisteners != null) { Integer queryid = new Integer(queryID); OutputPipeHolder pl = perpipelisteners.get(queryid); perpipelisteners.remove(queryid); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Removing listener for query " + queryID); } // clean up the map if there are no more listeners for the pipe if (perpipelisteners.isEmpty()) { outputPipeListeners.remove(pipeID); } pipeResolver.removeListener(pipeID, queryID); if (pl!= null) { return pl.listener; } } } return null; } /** * {@inheritDoc} */ public boolean pipeResolveEvent(PipeResolver.Event e) { try { PeerID peerID = e.getPeerID(); PipeID pipeID = e.getPipeID(); int queryID = e.getQueryID(); OutputPipeHolder pl = null; synchronized (outputPipeListeners) { Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID); if (perpipelisteners == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No listener for event for pipe " + pipeID); } return false; } pl = perpipelisteners.get(new Integer(queryID)); if (pl == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No listener for event for query " + queryID); } return false; } } // check if they wanted a resolve from a specific peer. if(!pl.peers.isEmpty() && !pl.peers.contains(peerID)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Event was for wrong peer '" + peerID + "'. Discarding."); } return false; } // create op String type = pl.adv.getType(); OutputPipe op = null; if (PipeService.UnicastType.equals(type)) { op = new NonBlockingOutputPipe(myGroup, pipeResolver, pl.adv, peerID, pl.peers); } else if (PipeService.UnicastSecureType.equals(type)) { op = new SecureOutputPipe(myGroup, pipeResolver, pl.adv, peerID, pl.peers); } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not create output pipe of type '" + type + "'. Discarding."); } return false; } // Generate an event when the output pipe was succesfully opened. OutputPipeEvent newevent = new OutputPipeEvent( this.getInterface(), op, pipeID.toString(), queryID); try { pl.listener.outputPipeEvent(newevent); } catch(Throwable ignored) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in listener for " + pipeID + "(" + pl.getClass().getName() + ")" , ignored); } } removeOutputPipeListener(pipeID.toString(), queryID); return true; } catch (IOException ie) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Error creating output pipe " + e.getPipeID(), ie); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No listener for event for " + e.getPipeID()); } return false; } /** * {@inheritDoc} * * <p/>We don't do anything with NAKs (yet) */ public boolean pipeNAKEvent(PipeResolver.Event e) { return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -