📄 pipeserviceimpl.java
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.pipe;import net.jxta.document.Advertisement;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.util.TimeUtils;import net.jxta.logging.Logging;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.platform.Module;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.service.Service;import net.jxta.peer.PeerID;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.Collection;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger;import java.text.MessageFormat;/** * A JXTA {@link net.jxta.pipe.PipeService} implementation which implements the * standard JXTA Pipe Resolver Protocol (PRP). * <p/> * This class provides implementation for Unicast, unicast secure and * (indirectly) propagate pipes. * * @see net.jxta.pipe.PipeService * @see net.jxta.pipe.InputPipe * @see net.jxta.pipe.OutputPipe * @see net.jxta.endpoint.Message * @see net.jxta.protocol.PipeAdvertisement * @see net.jxta.protocol.PipeResolverMessage * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-pbp" target="_blank">JXTA Protocols Specification : Pipe Binding Protocol</a> */public class PipeServiceImpl implements PipeService, PipeResolver.Listener { /** * The Logger */ private final static Logger LOG = Logger.getLogger(PipeServiceImpl.class.getName()); /** * the interval at which we verify that a pipe is still resolved at a * remote peer. */ static final long VERIFYINTERVAL = 20 * TimeUtils.AMINUTE; /** * The group this PipeService is working for. */ private PeerGroup group = null; /** * Our resolver handler. */ private PipeResolver pipeResolver = null; /** * Link to wire pipe impl. */ private WirePipeImpl wirePipe = null; /** * the interface object we will hand out. */ private PipeService myInterface = null; /** * the impl advertisement for this impl. */ private ModuleImplAdvertisement implAdvertisement = null; /** * Table of listeners for asynchronous output pipe creation. * <p/> * <ul> * <li>keys are {@link net.jxta.pipe.PipeID}</li> * <li>values are {@link java.util.Map}</li> * </ul> * Within the value Map: * <ul> * <li>keys are {@link java.lang.Integer} representing queryid</li> * <li>values are {@link OutputPipeHolder}</li> * </ul> */ private final Map<PipeID, Map<Integer, OutputPipeHolder>> outputPipeListeners = new HashMap<PipeID, Map<Integer, OutputPipeHolder>>(); /** * Has the pipe service been started? */ private volatile boolean started = false; /** * holds a pipe adv and a listener which will be called for resolutions * of the pipe. */ private static class OutputPipeHolder { final PipeAdvertisement adv; final Set<? extends ID> peers; final OutputPipeListener listener; final int queryid; OutputPipeHolder(PipeAdvertisement adv, Set<? extends ID> peers, OutputPipeListener listener, int queryid) { this.adv = adv; this.peers = peers; this.listener = listener; this.queryid = queryid; } } /** * A listener useful for implementing synchronous behaviour. */ private static class syncListener implements OutputPipeListener { volatile OutputPipeEvent event = null; syncListener() {} /** * Called when a input pipe has been located for a previously registered * pipe. The event contains an {@link net.jxta.pipe.OutputPipe} which can * be used to communicate with the remote peer. * * @param event <code>net.jxta.pipe.outputPipeEvent</code> event */ public synchronized void outputPipeEvent(OutputPipeEvent event) { // we only accept the first event. if (null == this.event) { this.event = event; notifyAll(); } } } /** * Default Constructor (don't delete) */ public PipeServiceImpl() {// What is reason for this constructor??? // the same is automatically generated. } /** * {@inheritDoc} * <p/> * We create only a single interface object and return it over and over * again. */ public synchronized PipeService getInterface() { if (null == myInterface) { myInterface = new PipeServiceInterface(this); } return myInterface; } /** * {@inheritDoc} */ public ModuleImplAdvertisement getImplAdvertisement() { return implAdvertisement; } /** * {@inheritDoc} */ public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) { this.group = group; implAdvertisement = (ModuleImplAdvertisement) impl; if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) { StringBuilder configInfo = new StringBuilder("Configuring Pipe Service : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : ").append(group); configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID()); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tVerify Interval : " + VERIFYINTERVAL + "ms"); LOG.config(configInfo.toString()); } } /** * {@inheritDoc} * <p/> * Currently this service does not expect arguments. */ public synchronized int startApp(String[] args) { Service needed = group.getEndpointService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is an endpoint service"); } return START_AGAIN_STALLED; } needed = group.getResolverService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is a resolver service"); } return START_AGAIN_STALLED; } needed = group.getMembershipService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is a membership service"); } return START_AGAIN_STALLED; } needed = group.getRendezVousService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is a rendezvous service"); } return START_AGAIN_STALLED; } // create our resolver handler; it will register itself w/ the resolver. pipeResolver = new PipeResolver(group); // Create the WirePipe (propagated pipe) wirePipe = new WirePipeImpl(group, pipeResolver); // XXX 20061221 We could check the result of this. wirePipe.startApp(args); started = true; return Module.START_OK; } /** * {@inheritDoc} */ public synchronized void stopApp() { started = false; try { if (wirePipe != null) { wirePipe.stopApp(); } } catch (Throwable failed) { LOG.log(Level.SEVERE, "Failed to stop wire pipe", failed); } finally { wirePipe = null; } try { if (pipeResolver != null) { pipeResolver.stop(); } } catch (Throwable failed) { LOG.log(Level.SEVERE, "Failed to stop pipe resolver", failed); } finally { pipeResolver = null; } // Avoid cross-reference problem with GC group = null; myInterface = null; // clear outputPipeListeners Collection<Map<Integer, OutputPipeHolder>> values = outputPipeListeners.values(); for (Map<Integer, OutputPipeHolder> value : values) { value.clear(); } outputPipeListeners.clear(); } /** * {@inheritDoc} */ public InputPipe createInputPipe(PipeAdvertisement adv) throws IOException { return createInputPipe(adv, null); } /** * {@inheritDoc} */ public InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException { if (!started) { throw new IllegalStateException("Pipe Service has not been started or has been stopped"); } String type = adv.getType();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -