⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipeserviceimpl.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* *  $Id: PipeServiceImpl.java,v 1.43 2006/07/31 23:08:12 hamada Exp $ * *  Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *  must not be used to endorse or promote products derived from this *  software without prior written permission. For written *  permission, please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. * *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.pipe;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.Enumeration;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Set;import net.jxta.document.Advertisement;import net.jxta.endpoint.Message;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.util.TimeUtils;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.service.Service;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * A JXTA {@link net.jxta.pipe.PipeService} implementation which implements the * standard JXTA Pipe Resolver Protocol (PRP). * * <p/>This class provides implementation for Unicast, unicast secure and *  (indirectly) propagate pipes. * * @see net.jxta.pipe.PipeService * @see net.jxta.pipe.InputPipe * @see net.jxta.pipe.OutputPipe * @see net.jxta.endpoint.Message * @see net.jxta.protocol.PipeAdvertisement * @see net.jxta.protocol.PipeResolverMessage * @see <a href="http://spec.jxta.org/nonav/v1.0/docbook/JXTAProtocols.html#proto-pbp" target="_blank">JXTA Protocols Specification : Pipe Binding Protocol</a> */public class PipeServiceImpl implements PipeService, PipeResolver.Listener {    /**     *  The log4J Logger     */    private final static Logger LOG = Logger.getLogger(PipeServiceImpl.class.getName());    /**     *  the interval at which we verify that a pipe is still resolved at a     *  remote peer.     */    static final long VERIFYINTERVAL = 20 * TimeUtils.AMINUTE;    /**     *  The group this PipeService is working for.     */    private PeerGroup myGroup = null;    /**     *  Our resolver handler.     */    private PipeResolver pipeResolver = null;    /**     *  Link to wire pipe impl.     */    private WirePipeImpl wirePipe = null;    /**     *  the interface object we will hand out.     */    private PipeService myInterface = null;    /**     *  the impl advertisement for this impl.     */    private ModuleImplAdvertisement implAdvertisement = null;    /**     *  Table of listeners for asynchronous output pipe creation.     *     *  <p/><ul>     *      <li>keys are {@link net.jxta.pipe.PipeID}</li>     *      <li>values are {@link java.util.Map}</li>     *  </ul>     *  Within the value Map:     *  <ul>     *    <li>keys are {@link java.lang.Integer} representing queryid</li>     *    <li>values are {@link OutputPipeHolder}</li>     *  </ul>     */    private Map<PipeID,Map<Integer,OutputPipeHolder>> outputPipeListeners = new HashMap<PipeID,Map<Integer,OutputPipeHolder>>();    /**     *  Has the pipe service been started?     */    private volatile boolean started = false;    /**     *  holds a pipe adv and a listener which will be called for resolutions     *  of the pipe.     */    private static class OutputPipeHolder {        PipeAdvertisement   adv;        Set                 peers;        OutputPipeListener  listener;        int queryid;        OutputPipeHolder(PipeAdvertisement adv, Set peers, OutputPipeListener listener, int queryid) {            this.adv = adv;            this.peers = peers;            this.listener = listener;            this.queryid = queryid;        }    }    /**     * A listener useful for implementing synchronous behaviour.     */    private static class syncListener implements OutputPipeListener {        volatile OutputPipeEvent event = null;        syncListener() {}        /**         * Called when a input pipe has been located for a previously registered         * pipe. The event contains an {@link net.jxta.pipe.OutputPipe} which can         * be used to communicate with the remote peer.         *         * @param outputPipeEvent event         */        public synchronized void outputPipeEvent(OutputPipeEvent event) {            // we only accept the first event.            if(null == this.event) {                this.event = event;                notifyAll();            }        }    }    /**     * Default Constructor (don't delete)     */    public PipeServiceImpl() { }    /**     * {@inheritDoc}     *     * <p/>We create only a single interface object and return it over and over     * again.     */    public synchronized Service getInterface() {        if (null == myInterface) {            myInterface = new PipeServiceInterface(this);        }        return myInterface;    }    /**     *  {@inheritDoc}     */    public Advertisement getImplAdvertisement() {        return implAdvertisement;    }    /**     *  {@inheritDoc}     */    public synchronized void init(PeerGroup pg, ID assignedID, Advertisement impl) {        implAdvertisement = (ModuleImplAdvertisement) impl;        myGroup = pg;        if (LOG.isEnabledFor(Level.INFO)) {            StringBuffer configInfo = new StringBuffer("Configuring Pipe Service : " + assignedID);            configInfo.append("\n\tImplementation :");            configInfo.append("\n\t\tModule Spec ID: " + implAdvertisement.getModuleSpecID());            configInfo.append("\n\t\tImpl Description : " + implAdvertisement.getDescription());            configInfo.append("\n\t\tImpl URI : " + implAdvertisement.getUri());            configInfo.append("\n\t\tImpl Code : " + implAdvertisement.getCode());            configInfo.append("\n\tGroup Params :");            configInfo.append("\n\t\tGroup : " + myGroup.getPeerGroupName());            configInfo.append("\n\t\tGroup ID : " + myGroup.getPeerGroupID());            configInfo.append("\n\t\tPeer ID : " + myGroup.getPeerID());            configInfo.append("\n\tConfiguration :");            configInfo.append("\n\t\tVerify Interval : " + VERIFYINTERVAL + "ms");            LOG.info(configInfo);        }    }    /**     *  {@inheritDoc}     *     * <p/>Currently this service does not expect arguments.     */    public synchronized int startApp(String[] args) {        Service needed = myGroup.getResolverService();        if(null == needed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Stalled until there is a resolver service");            }            return START_AGAIN_STALLED;        }        needed = myGroup.getMembershipService();        if(null == needed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Stalled until there is a membership service");            }            return START_AGAIN_STALLED;        }        needed = myGroup.getRendezVousService();        if(null == needed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Stalled until there is a rendezvous service");            }            return START_AGAIN_STALLED;        }        // create our resolver handler; it will register itself w/ the resolver.        pipeResolver = new PipeResolver(myGroup);        needed = myGroup.getRendezVousService();        if(null == needed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Stalled until there is a rendezvous service");            }            return START_AGAIN_STALLED;        }        // Create the WirePipe (propagated pipe)        wirePipe = new WirePipeImpl(myGroup, pipeResolver);        wirePipe.startApp(args);        started = true;        return 0;    }    /**     *  {@inheritDoc}     */    public synchronized void stopApp() {        started = false;        try {            if (wirePipe != null) {                wirePipe.stopApp();            }        } catch (Throwable failed) {            LOG.error("Failed to stop wire pipe", failed);        } finally {            wirePipe = null;        }        try {            if (pipeResolver != null) {                pipeResolver.stop();            }        } catch (Throwable failed) {            LOG.error("Failed to stop pipe resolver", failed);        } finally {            pipeResolver = null;        }        // Avoid cross-reference problem with GC        myGroup = null;        myInterface = null;        // clear outputPipeListeners        Collection<Map<Integer,OutputPipeHolder>> values = outputPipeListeners.values();        for (Map<Integer,OutputPipeHolder> value: values) {            value.clear();        }        outputPipeListeners.clear();    }    /**     *  {@inheritDoc}     */    public InputPipe createInputPipe(PipeAdvertisement adv) throws IOException {        return createInputPipe(adv, null);    }    /**     *  {@inheritDoc}     */    public InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException {        if (!started) {            throw new IllegalStateException("Pipe Service has not been started or has been stopped");        }        String type = adv.getType();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -