⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipeserviceimpl.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        if (type == null) {            throw new IllegalArgumentException("PipeAdvertisement type may not be null");        }        PipeID pipeId = (PipeID) adv.getPipeID();        if (pipeId == null) {            throw new IllegalArgumentException("PipeAdvertisement PipeID may not be null");        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Create " + type + " InputPipe for " + pipeId);        }        InputPipe inputPipe;        // create an InputPipe.        if (type.equals(PipeService.UnicastType)) {            inputPipe = new InputPipeImpl(pipeResolver, adv, listener);        } else if (type.equals(PipeService.UnicastSecureType)) {            inputPipe = new SecureInputPipeImpl(pipeResolver, adv, listener);        } else if (type.equals(PipeService.PropagateType)) {            if (wirePipe != null) {                inputPipe = wirePipe.createInputPipe(adv, listener);            } else {                throw new IOException("No propagated pipe servive available");            }        } else {            // Unknown type            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.severe("Cannot create pipe for unknown type : " + type);            }            throw new IOException("Cannot create pipe for unknown type : " + type);        }        return inputPipe;    }    /**     * {@inheritDoc}     */    public OutputPipe createOutputPipe(PipeAdvertisement pipeAdv, long timeout) throws IOException {        return createOutputPipe(pipeAdv, Collections.<ID>emptySet(), timeout);    }    /**     * {@inheritDoc}     */    public OutputPipe createOutputPipe(PipeAdvertisement adv, Set<? extends ID> resolvablePeers, long timeout) throws IOException {        // convert zero to max value.        if (0 == timeout) {            timeout = Long.MAX_VALUE;        }        long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);        // Make a listener, start async resolution and then wait until the timeout expires.        syncListener localListener = new syncListener();        int queryid = PipeResolver.getNextQueryID();        createOutputPipe(adv, resolvablePeers, localListener, queryid);        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Waiting synchronously for " + timeout + "ms to resolve OutputPipe for " + adv.getPipeID());        }        try {            synchronized (localListener) {                while ((null == localListener.event) && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), absoluteTimeOut) < 0)) {                    try {                        localListener.wait(TimeUtils.ASECOND);                    } catch (InterruptedException woken) {                        Thread.interrupted();                    }                }            }        } finally {            // remove the listener we installed.            removeOutputPipeListener(adv.getPipeID().toString(), queryid);        }        if (null != localListener.event) {            return localListener.event.getOutputPipe();        } else {            throw new IOException("Output Pipe could not be resolved after " + timeout + "ms.");        }    }    /**     * {@inheritDoc}     */    public void createOutputPipe(PipeAdvertisement pipeAdv, OutputPipeListener listener) throws IOException {        createOutputPipe(pipeAdv, Collections.<ID>emptySet(), listener);    }    /**     * {@inheritDoc}     */    public void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener) throws IOException {        createOutputPipe(pipeAdv, resolvablePeers, listener, PipeResolver.getNextQueryID());    }    private void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener, int queryid) throws IOException {        if (!started) {            throw new IOException("Pipe Service has not been started or has been stopped");        }        // Recover the PipeId from the PipeServiceImpl Advertisement        PipeID pipeId = (PipeID) pipeAdv.getPipeID();        String type = pipeAdv.getType();        if (null == type) {            IllegalArgumentException failed = new IllegalArgumentException("Pipe type was not set");            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, failed.getMessage(), failed);            }            throw failed;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Create " + type + " OutputPipe for " + pipeId);        }        if (PipeService.PropagateType.equals(type)) {            OutputPipe op;            if (resolvablePeers.size() == 1) {                op = new BlockingWireOutputPipe(group, pipeAdv, (PeerID) resolvablePeers.iterator().next());            } else {                if (wirePipe != null) {                    op = wirePipe.createOutputPipe(pipeAdv, resolvablePeers);                } else {                    throw new IOException("No propagated pipe service available");                }            }            if (null != op) {                OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeId.toString(), PipeResolver.ANYQUERY);                try {                    listener.outputPipeEvent(newevent);                } catch (Throwable ignored) {                    if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                        LOG.log(Level.SEVERE,                                "Uncaught Throwable in listener for " + pipeId + " (" + listener.getClass().getName() + ")",                                ignored);                    }                }            }        } else if (PipeService.UnicastType.equals(type) || PipeService.UnicastSecureType.equals(type)) {            addOutputPipeListener(pipeId, new OutputPipeHolder(pipeAdv, resolvablePeers, listener, queryid));            pipeResolver.addListener(pipeId, this, queryid);            pipeResolver.sendPipeQuery(pipeAdv, resolvablePeers, queryid);            // look locally for the pipe            if (resolvablePeers.isEmpty() || resolvablePeers.contains(group.getPeerID())) {                InputPipe local = pipeResolver.findLocal(pipeId);                // if we have a local instance, make sure the local instance is of the same type.                if (null != local) {                    if (local.getType().equals(pipeAdv.getType())) {                        pipeResolver.callListener(queryid, pipeId, local.getType(), group.getPeerID(), false);                    } else {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.warning(MessageFormat.format("rejecting local pipe ({0}) because type is not ({1})", local.getType(),                                    pipeAdv.getType()));                        }                    }                }            }        } else {            // Unknown type            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.severe("createOutputPipe: cannot create pipe for unknown type : " + type);            }            throw new IOException("cannot create pipe for unknown type : " + type);        }    }    /*     * Add an output listener for the given pipeId.     */    private void addOutputPipeListener(PipeID pipeId, OutputPipeHolder pipeHolder) {        synchronized (outputPipeListeners) {            Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeId);            if (perpipelisteners == null) {                perpipelisteners = new HashMap<Integer, OutputPipeHolder>();                outputPipeListeners.put(pipeId, perpipelisteners);            }            if (perpipelisteners.get(pipeHolder.queryid) != null) {                LOG.warning("Clobbering output pipe listener for query " + pipeHolder.queryid);            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Adding pipe listener for pipe " + pipeId + " and query " + pipeHolder.queryid);            }            perpipelisteners.put(pipeHolder.queryid, pipeHolder);        }    }    /**     * {@inheritDoc}     */    public OutputPipeListener removeOutputPipeListener(String pipeID, OutputPipeListener listener) {        throw new UnsupportedOperationException("Legacy method not supported. Use interface object if you need this method.");    }    /**     * {@inheritDoc}     */    public OutputPipeListener removeOutputPipeListener(ID pipeID, OutputPipeListener listener) {        // remove all instances of this listener, regardless of queryid        if (pipeResolver == null) {            return null;        }        if (!(pipeID instanceof PipeID)) {            throw new IllegalArgumentException("pipeID must be a PipeID.");        }        synchronized (outputPipeListeners) {            Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);            if (perpipelisteners != null) {                Set<Map.Entry<Integer, OutputPipeHolder>> entries = perpipelisteners.entrySet();                for (Map.Entry<Integer, OutputPipeHolder> entry : entries) {                    OutputPipeHolder pl = entry.getValue();                    if (pl.listener == listener) {                        pipeResolver.removeListener((PipeID) pipeID, pl.queryid);                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Removing listener for query " + pl.queryid);                        }                        perpipelisteners.remove(entry.getKey());                    }                }                // clean up the map if there are no more listeners for the pipe                if (perpipelisteners.isEmpty()) {                    outputPipeListeners.remove(pipeID);                }            }        }        return listener;    }    private OutputPipeListener removeOutputPipeListener(String opID, int queryID) {        if (pipeResolver == null) {            return null;        }        PipeID pipeID;        try {            URI aPipeID = new URI(opID);            pipeID = (PipeID) IDFactory.fromURI(aPipeID);        } catch (URISyntaxException badID) {            throw new IllegalArgumentException("Bad pipe ID: " + opID);        } catch (ClassCastException badID) {            throw new IllegalArgumentException("id was not a pipe id: " + opID);        }        synchronized (outputPipeListeners) {            Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);            if (perpipelisteners != null) {                OutputPipeHolder pipeHolder = perpipelisteners.get(queryID);                perpipelisteners.remove(queryID);                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Removing listener for query " + queryID);                }                // clean up the map if there are no more listeners for the pipe                if (perpipelisteners.isEmpty()) {                    outputPipeListeners.remove(pipeID);                }                pipeResolver.removeListener(pipeID, queryID);                if (pipeHolder != null) {                    return pipeHolder.listener;                }            }        }        return null;    }    /**     * {@inheritDoc}     */    public boolean pipeResolveEvent(PipeResolver.Event event) {        try {            ID peerID = event.getPeerID();            ID pipeID = event.getPipeID();            int queryID = event.getQueryID();            OutputPipeHolder pipeHolder;            synchronized (outputPipeListeners) {                Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);                if (perpipelisteners == null) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("No listener for event for pipe " + pipeID);                    }                    return false;                }                pipeHolder = perpipelisteners.get(queryID);                if (pipeHolder == null) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("No listener for event for query " + queryID);                    }                    return false;                }            }            // check if they wanted a resolve from a specific peer.            if (!pipeHolder.peers.isEmpty() && !pipeHolder.peers.contains(peerID)) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Event was for wrong peer \'" + peerID + "\'. Discarding.");                }                return false;            }            // create op            String type = pipeHolder.adv.getType();            OutputPipe op;            if (PipeService.UnicastType.equals(type)) {                op = new NonBlockingOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers);            } else if (PipeService.UnicastSecureType.equals(type)) {                op = new SecureOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers);            } else {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Could not create output pipe of type \'" + type + "\'. Discarding.");                }                return false;            }            // Generate an event when the output pipe was succesfully opened.            OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeID.toString(), queryID);            try {                pipeHolder.listener.outputPipeEvent(newevent);            } catch (Throwable ignored) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.log(Level.SEVERE                            ,                            "Uncaught Throwable in listener for " + pipeID + "(" + pipeHolder.getClass().getName() + ")", ignored);                }            }            removeOutputPipeListener(pipeID.toString(), queryID);            return true;        } catch (IOException ie) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, "Error creating output pipe " + event.getPipeID(), ie);            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("No listener for event for " + event.getPipeID());        }        return false;    }    /**     * {@inheritDoc}     * <p/>     * We don't do anything with NAKs (yet)     */    public boolean pipeNAKEvent(PipeResolver.Event event) {        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -