⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipeserviceimpl.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        if (type == null) {            throw new IllegalArgumentException("PipeAdvertisement type may not be null");        }        PipeID pipeId = (PipeID) adv.getPipeID();        if (pipeId == null) {            throw new IllegalArgumentException("PipeAdvertisement PipeID may not be null");        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Create " + type + " InputPipe for " + pipeId);        }        InputPipe ip = null;        // create an InputPipe.        if(type.equals(PipeService.UnicastType)) {            ip = new UnicastInputPipeImpl(pipeResolver, adv, listener);        } else if(type.equals(PipeService.UnicastSecureType)) {            ip = new SecureInputPipeImpl(pipeResolver, adv, listener);        } else if(type.equals(PipeService.PropagateType)) {            if (wirePipe != null) {                ip = wirePipe.createInputPipe(adv, listener);            } else {                throw new IOException("No propagated pipe servive available");            }        } else {            // Unknown type            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("Cannot create pipe for unknown type : " + type);            }            throw new IOException("Cannot create pipe for unknown type : " + type);        }        return ip;    }    /**     *  {@inheritDoc}     */    public OutputPipe createOutputPipe(PipeAdvertisement pipeAdv, long timeout) throws IOException {        return createOutputPipe(pipeAdv, Collections.EMPTY_SET, timeout);    }    /**     *  {@inheritDoc}     */    public OutputPipe createOutputPipe(PipeAdvertisement adv, Set resolvablePeers, long timeout) throws IOException {        // convert zero to max value.        if(0 == timeout) {            timeout = Long.MAX_VALUE;        }        long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);        // Make a listener, start async resolution and then wait until the timeout expires.        syncListener localListener = new syncListener();        int queryid = PipeResolver.getNextQueryID();        createOutputPipe(adv, resolvablePeers, localListener, queryid);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Waiting synchronously for " + timeout + "ms to resolve OutputPipe for " + adv.getPipeID());        }        try {            synchronized(localListener) {                while((null == localListener.event) &&                       (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), absoluteTimeOut) < 0)) {                    try {                        localListener.wait(TimeUtils.ASECOND);                    } catch (InterruptedException woken) {                        Thread.interrupted();                    }                }            }        } finally {            // remove the listener we installed.            removeOutputPipeListener(adv.getPipeID().toString(), queryid);        }        if (null != localListener.event) {            return localListener.event.getOutputPipe();        } else {            throw new IOException("Output Pipe could not be resolved after " + timeout + "ms.");        }    }    /**     *  {@inheritDoc}     */    public void createOutputPipe(PipeAdvertisement pipeAdv, OutputPipeListener listener) throws IOException {        createOutputPipe(pipeAdv, Collections.EMPTY_SET, listener);    }    /**     *  {@inheritDoc}     */    public void createOutputPipe(PipeAdvertisement pipeAdv, Set resolvablePeers, OutputPipeListener listener) throws IOException {        createOutputPipe(pipeAdv, resolvablePeers, listener, PipeResolver.getNextQueryID());    }    private void createOutputPipe(PipeAdvertisement pipeAdv, Set resolvablePeers, OutputPipeListener listener, int queryid) throws IOException {        if (!started) {            throw new IllegalStateException("Pipe Service has not been started or has been stopped");        }        // Recover the PipeId from the PipeServiceImpl Advertisement        PipeID pipeId = (PipeID) pipeAdv.getPipeID();        String type = pipeAdv.getType();        if(null == type) {            IllegalArgumentException failed = new IllegalArgumentException("Pipe type was not set");            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error(failed, failed);            }            throw failed;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Create " + type + " OutputPipe for " + pipeId);        }        if (PipeService.PropagateType.equals(type)) {            OutputPipe op = null;            if (wirePipe != null) {                op = wirePipe.createOutputPipe(pipeAdv, resolvablePeers);            } else {                throw new IOException("No propagated pipe service available");            }            if(null != op) {                OutputPipeEvent newevent = new OutputPipeEvent(                                               this.getInterface(), op, pipeId.toString(), PipeResolver.ANYQUERY);                try {                    listener.outputPipeEvent(newevent);                } catch(Throwable ignored) {                    if (LOG.isEnabledFor(Level.ERROR)) {                        LOG.error("Uncaught Throwable in listener for " + pipeId + " (" + listener.getClass().getName() + ")" , ignored);                    }                }            }            return;        } else if (PipeService.UnicastType.equals(type) || PipeService.UnicastSecureType.equals(type)) {            addOutputPipeListener(pipeId, new OutputPipeHolder(pipeAdv, resolvablePeers, listener, queryid));            pipeResolver.addListener(pipeId, this, queryid);            pipeResolver.sendPipeQuery(pipeAdv, resolvablePeers, queryid);            // look locally for the pipe            if(resolvablePeers.isEmpty() || resolvablePeers.contains(myGroup.getPeerID())) {                InputPipe local = pipeResolver.findLocal(pipeId);                // if we have a local instance, make sure the local instance is of the same type.                if(null != local) {                    if (local.getType().equals(pipeAdv.getType())) {                        pipeResolver.callListener(queryid, pipeId, local.getType(), myGroup.getPeerID(), false );                    } else {                        if (LOG.isEnabledFor(Level.WARN)) {                            LOG.warn("rejecting local pipe (" + local.getType() + ") because type is not (" + pipeAdv.getType() + ")");                        }                    }                }            }        } else {            // Unknown type            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("createOutputPipe: cannot create pipe for unknown type : " + type);            }            throw new IOException("cannot create pipe for unknown type : " + type);        }    }    /*     * Add an output listener for the given pipeId.     */    private void addOutputPipeListener(PipeID pipeId, OutputPipeHolder pipeHolder) {        synchronized (outputPipeListeners) {            Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeId);            if (perpipelisteners == null) {                perpipelisteners = new HashMap<Integer,OutputPipeHolder>();                outputPipeListeners.put(pipeId, perpipelisteners);            }            if (perpipelisteners.get(new Integer(pipeHolder.queryid)) != null) {                LOG.warn("Clobbering output pipe listener for query " + pipeHolder.queryid);            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Adding pipe listener for pipe " + pipeId.toString() +                          " and query " + pipeHolder.queryid);            }            perpipelisteners.put(new Integer(pipeHolder.queryid), pipeHolder);        }    }    /**     *  {@inheritDoc}     */    public OutputPipeListener removeOutputPipeListener(String opID, OutputPipeListener listener) {        // remove all instances of this listener, regardless of queryid        if (pipeResolver == null) {            return null;        }        PipeID pipeID;        try {            URI aPipeID = new URI(opID);            pipeID = (PipeID) IDFactory.fromURI(aPipeID);        } catch (URISyntaxException badID) {            throw new IllegalArgumentException("Bad pipe ID: " + opID);        } catch (ClassCastException badID) {            throw new IllegalArgumentException("id was not a pipe id: " + opID);        }        synchronized (outputPipeListeners) {            Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);            if (perpipelisteners != null) {                Set<Map.Entry<Integer,OutputPipeHolder>> entries = perpipelisteners.entrySet();                for (Map.Entry<Integer,OutputPipeHolder> entry : entries) {                    OutputPipeHolder pl = entry.getValue();                    if (pl.listener == listener) {                        pipeResolver.removeListener(pipeID, pl.queryid);                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Removing listener for query " + pl.queryid);                        }                        perpipelisteners.remove(entry.getKey());                    }                }                // clean up the map if there are no more listeners for the pipe                if (perpipelisteners.isEmpty()) {                    outputPipeListeners.remove(pipeID);                }            }        }        return listener;    }    private OutputPipeListener removeOutputPipeListener(String opID, int queryID) {        if (pipeResolver == null) {            return null;        }        PipeID pipeID;        try {            URI aPipeID = new URI(opID);            pipeID = (PipeID) IDFactory.fromURI(aPipeID);        } catch (URISyntaxException badID) {            throw new IllegalArgumentException("Bad pipe ID: " + opID);        } catch (ClassCastException badID) {            throw new IllegalArgumentException("id was not a pipe id: " + opID);        }        synchronized (outputPipeListeners) {            Map<Integer,OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);            if (perpipelisteners != null) {                Integer queryid = new Integer(queryID);                OutputPipeHolder pl = perpipelisteners.get(queryid);                perpipelisteners.remove(queryid);                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Removing listener for query " + queryID);                }                // clean up the map if there are no more listeners for the pipe                if (perpipelisteners.isEmpty()) {                    outputPipeListeners.remove(pipeID);                }                pipeResolver.removeListener(pipeID, queryID);                if (pl!= null) {                    return pl.listener;                }            }        }        return null;    }    /**     *  {@inheritDoc}     */    public boolean pipeResolveEvent(PipeResolver.Event e) {        try {            PeerID peerID = e.getPeerID();            PipeID pipeID = e.getPipeID();            int queryID = e.getQueryID();            OutputPipeHolder pl = null;            synchronized (outputPipeListeners) {                Map<Integer,OutputPipeHolder> perpipelisteners =                    outputPipeListeners.get(pipeID);                if (perpipelisteners == null) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("No listener for event for pipe " + pipeID);                    }                    return false;                }                pl = perpipelisteners.get(new Integer(queryID));                if (pl == null) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("No listener for event for query " + queryID);                    }                    return false;                }            }            // check if they wanted a resolve from a specific peer.            if(!pl.peers.isEmpty() && !pl.peers.contains(peerID)) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Event was for wrong peer '" + peerID + "'. Discarding.");                }                return false;            }            // create op            String type = pl.adv.getType();            OutputPipe op = null;            if (PipeService.UnicastType.equals(type)) {                op = new NonBlockingOutputPipe(myGroup, pipeResolver,                                               pl.adv, peerID, pl.peers);            } else if (PipeService.UnicastSecureType.equals(type)) {                op = new SecureOutputPipe(myGroup, pipeResolver, pl.adv,                                          peerID, pl.peers);            } else {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Could not create output pipe of type '" +                              type + "'. Discarding.");                }                return false;            }            // Generate an event when the output pipe was succesfully opened.            OutputPipeEvent newevent = new OutputPipeEvent(                                           this.getInterface(), op, pipeID.toString(), queryID);            try {                pl.listener.outputPipeEvent(newevent);            } catch(Throwable ignored) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Uncaught Throwable in listener for " +                              pipeID + "(" + pl.getClass().getName() + ")" ,                              ignored);                }            }            removeOutputPipeListener(pipeID.toString(), queryID);            return true;        } catch (IOException ie) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("Error creating output pipe " + e.getPipeID(), ie);            }        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("No listener for event for " + e.getPipeID());        }        return false;    }    /**     *  {@inheritDoc}     *     *  <p/>We don't do anything with NAKs (yet)     */    public boolean pipeNAKEvent(PipeResolver.Event e) {        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -