⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipeserviceimpl.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved. *   *  The Sun Project JXTA(TM) Software License *   *  Redistribution and use in source and binary forms, with or without  *  modification, are permitted provided that the following conditions are met: *   *  1. Redistributions of source code must retain the above copyright notice, *     this list of conditions and the following disclaimer. *   *  2. Redistributions in binary form must reproduce the above copyright notice,  *     this list of conditions and the following disclaimer in the documentation  *     and/or other materials provided with the distribution. *   *  3. The end-user documentation included with the redistribution, if any, must  *     include the following acknowledgment: "This product includes software  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology."  *     Alternately, this acknowledgment may appear in the software itself, if  *     and wherever such third-party acknowledgments normally appear. *   *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must  *     not be used to endorse or promote products derived from this software  *     without prior written permission. For written permission, please contact  *     Project JXTA at http://www.jxta.org. *   *  5. Products derived from this software may not be called "JXTA", nor may  *     "JXTA" appear in their name, without prior written permission of Sun. *   *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *   *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United  *  States and other countries. *   *  Please see the license information page at : *  <http://www.jxta.org/project/www/license.html> for instructions on use of  *  the license in source files. *   *  ==================================================================== *   *  This software consists of voluntary contributions made by many individuals  *  on behalf of Project JXTA. For more information on Project JXTA, please see  *  http://www.jxta.org. *   *  This license is based on the BSD license adopted by the Apache Foundation.  */package net.jxta.impl.pipe;import net.jxta.document.Advertisement;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.util.TimeUtils;import net.jxta.logging.Logging;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.platform.Module;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.service.Service;import net.jxta.peer.PeerID;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.Collection;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger;import java.text.MessageFormat;/** * A JXTA {@link net.jxta.pipe.PipeService} implementation which implements the * standard JXTA Pipe Resolver Protocol (PRP). * <p/> * This class provides implementation for Unicast, unicast secure and * (indirectly) propagate pipes. * * @see net.jxta.pipe.PipeService * @see net.jxta.pipe.InputPipe * @see net.jxta.pipe.OutputPipe * @see net.jxta.endpoint.Message * @see net.jxta.protocol.PipeAdvertisement * @see net.jxta.protocol.PipeResolverMessage * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-pbp" target="_blank">JXTA Protocols Specification : Pipe Binding Protocol</a> */public class PipeServiceImpl implements PipeService, PipeResolver.Listener {    /**     * The Logger     */    private final static Logger LOG = Logger.getLogger(PipeServiceImpl.class.getName());    /**     * the interval at which we verify that a pipe is still resolved at a     * remote peer.     */    static final long VERIFYINTERVAL = 20 * TimeUtils.AMINUTE;    /**     * The group this PipeService is working for.     */    private PeerGroup group = null;    /**     * Our resolver handler.     */    private PipeResolver pipeResolver = null;    /**     * Link to wire pipe impl.     */    private WirePipeImpl wirePipe = null;    /**     * the interface object we will hand out.     */    private PipeService myInterface = null;    /**     * the impl advertisement for this impl.     */    private ModuleImplAdvertisement implAdvertisement = null;    /**     * Table of listeners for asynchronous output pipe creation.     * <p/>     * <ul>     * <li>keys are {@link net.jxta.pipe.PipeID}</li>     * <li>values are {@link java.util.Map}</li>     * </ul>     * Within the value Map:     * <ul>     * <li>keys are {@link java.lang.Integer} representing queryid</li>     * <li>values are {@link OutputPipeHolder}</li>     * </ul>     */    private final Map<PipeID, Map<Integer, OutputPipeHolder>> outputPipeListeners = new HashMap<PipeID, Map<Integer, OutputPipeHolder>>();    /**     * Has the pipe service been started?     */    private volatile boolean started = false;    /**     * holds a pipe adv and a listener which will be called for resolutions     * of the pipe.     */    private static class OutputPipeHolder {        final PipeAdvertisement adv;        final Set<? extends ID> peers;        final OutputPipeListener listener;        final int queryid;        OutputPipeHolder(PipeAdvertisement adv, Set<? extends ID> peers, OutputPipeListener listener, int queryid) {            this.adv = adv;            this.peers = peers;            this.listener = listener;            this.queryid = queryid;        }    }    /**     * A listener useful for implementing synchronous behaviour.     */    private static class syncListener implements OutputPipeListener {        volatile OutputPipeEvent event = null;        syncListener() {}        /**         * Called when a input pipe has been located for a previously registered         * pipe. The event contains an {@link net.jxta.pipe.OutputPipe} which can         * be used to communicate with the remote peer.         *         * @param event <code>net.jxta.pipe.outputPipeEvent</code> event         */        public synchronized void outputPipeEvent(OutputPipeEvent event) {            // we only accept the first event.            if (null == this.event) {                this.event = event;                notifyAll();            }        }    }    /**     * Default Constructor (don't delete)     */    public PipeServiceImpl() {// What is reason for this constructor???        // the same is automatically generated.    }    /**     * {@inheritDoc}     * <p/>     * We create only a single interface object and return it over and over     * again.     */    public synchronized PipeService getInterface() {        if (null == myInterface) {            myInterface = new PipeServiceInterface(this);        }        return myInterface;    }    /**     * {@inheritDoc}     */    public ModuleImplAdvertisement getImplAdvertisement() {        return implAdvertisement;    }    /**     * {@inheritDoc}     */    public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) {        this.group = group;        implAdvertisement = (ModuleImplAdvertisement) impl;        if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {            StringBuilder configInfo = new StringBuilder("Configuring Pipe Service : " + assignedID);            if (implAdvertisement != null) {                configInfo.append("\n\tImplementation :");                configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());                configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());                configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());                configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());            }            configInfo.append("\n\tGroup Params :");            configInfo.append("\n\t\tGroup : ").append(group);            configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());            configInfo.append("\n\tConfiguration :");            configInfo.append("\n\t\tVerify Interval : " + VERIFYINTERVAL + "ms");            LOG.config(configInfo.toString());        }    }    /**     * {@inheritDoc}     * <p/>     * Currently this service does not expect arguments.     */    public synchronized int startApp(String[] args) {        Service needed = group.getEndpointService();        if (null == needed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Stalled until there is an endpoint service");            }            return START_AGAIN_STALLED;        }        needed = group.getResolverService();        if (null == needed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Stalled until there is a resolver service");            }            return START_AGAIN_STALLED;        }        needed = group.getMembershipService();        if (null == needed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Stalled until there is a membership service");            }            return START_AGAIN_STALLED;        }        needed = group.getRendezVousService();        if (null == needed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Stalled until there is a rendezvous service");            }            return START_AGAIN_STALLED;        }        // create our resolver handler; it will register itself w/ the resolver.        pipeResolver = new PipeResolver(group);        // Create the WirePipe (propagated pipe)        wirePipe = new WirePipeImpl(group, pipeResolver);        // XXX 20061221 We could check the result of this.        wirePipe.startApp(args);        started = true;        return Module.START_OK;    }    /**     * {@inheritDoc}     */    public synchronized void stopApp() {        started = false;        try {            if (wirePipe != null) {                wirePipe.stopApp();            }        } catch (Throwable failed) {            LOG.log(Level.SEVERE, "Failed to stop wire pipe", failed);        } finally {            wirePipe = null;        }        try {            if (pipeResolver != null) {                pipeResolver.stop();            }        } catch (Throwable failed) {            LOG.log(Level.SEVERE, "Failed to stop pipe resolver", failed);        } finally {            pipeResolver = null;        }        // Avoid cross-reference problem with GC        group = null;        myInterface = null;        // clear outputPipeListeners        Collection<Map<Integer, OutputPipeHolder>> values = outputPipeListeners.values();        for (Map<Integer, OutputPipeHolder> value : values) {            value.clear();        }        outputPipeListeners.clear();    }    /**     * {@inheritDoc}     */    public InputPipe createInputPipe(PipeAdvertisement adv) throws IOException {        return createInputPipe(adv, null);    }    /**     * {@inheritDoc}     */    public InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException {        if (!started) {            throw new IllegalStateException("Pipe Service has not been started or has been stopped");        }        String type = adv.getType();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -