📄 pipeserviceimpl.java
字号:
/* * $Id: PipeServiceImpl.java,v 1.43 2006/07/31 23:08:12 hamada Exp $ * * Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.pipe;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.Enumeration;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Set;import net.jxta.document.Advertisement;import net.jxta.endpoint.Message;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.impl.util.TimeUtils;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.OutputPipeEvent;import net.jxta.pipe.OutputPipeListener;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeMsgListener;import net.jxta.pipe.PipeService;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.service.Service;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** * A JXTA {@link net.jxta.pipe.PipeService} implementation which implements the * standard JXTA Pipe Resolver Protocol (PRP). * * <p/>This class provides implementation for Unicast, unicast secure and * (indirectly) propagate pipes. * * @see net.jxta.pipe.PipeService * @see net.jxta.pipe.InputPipe * @see net.jxta.pipe.OutputPipe * @see net.jxta.endpoint.Message * @see net.jxta.protocol.PipeAdvertisement * @see net.jxta.protocol.PipeResolverMessage * @see <a href="http://spec.jxta.org/nonav/v1.0/docbook/JXTAProtocols.html#proto-pbp" target="_blank">JXTA Protocols Specification : Pipe Binding Protocol</a> */public class PipeServiceImpl implements PipeService, PipeResolver.Listener { /** * The log4J Logger */ private final static Logger LOG = Logger.getLogger(PipeServiceImpl.class.getName()); /** * the interval at which we verify that a pipe is still resolved at a * remote peer. */ static final long VERIFYINTERVAL = 20 * TimeUtils.AMINUTE; /** * The group this PipeService is working for. */ private PeerGroup myGroup = null; /** * Our resolver handler. */ private PipeResolver pipeResolver = null; /** * Link to wire pipe impl. */ private WirePipeImpl wirePipe = null; /** * the interface object we will hand out. */ private PipeService myInterface = null; /** * the impl advertisement for this impl. */ private ModuleImplAdvertisement implAdvertisement = null; /** * Table of listeners for asynchronous output pipe creation. * * <p/><ul> * <li>keys are {@link net.jxta.pipe.PipeID}</li> * <li>values are {@link java.util.Map}</li> * </ul> * Within the value Map: * <ul> * <li>keys are {@link java.lang.Integer} representing queryid</li> * <li>values are {@link OutputPipeHolder}</li> * </ul> */ private Map<PipeID,Map<Integer,OutputPipeHolder>> outputPipeListeners = new HashMap<PipeID,Map<Integer,OutputPipeHolder>>(); /** * Has the pipe service been started? */ private volatile boolean started = false; /** * holds a pipe adv and a listener which will be called for resolutions * of the pipe. */ private static class OutputPipeHolder { PipeAdvertisement adv; Set peers; OutputPipeListener listener; int queryid; OutputPipeHolder(PipeAdvertisement adv, Set peers, OutputPipeListener listener, int queryid) { this.adv = adv; this.peers = peers; this.listener = listener; this.queryid = queryid; } } /** * A listener useful for implementing synchronous behaviour. */ private static class syncListener implements OutputPipeListener { volatile OutputPipeEvent event = null; syncListener() {} /** * Called when a input pipe has been located for a previously registered * pipe. The event contains an {@link net.jxta.pipe.OutputPipe} which can * be used to communicate with the remote peer. * * @param outputPipeEvent event */ public synchronized void outputPipeEvent(OutputPipeEvent event) { // we only accept the first event. if(null == this.event) { this.event = event; notifyAll(); } } } /** * Default Constructor (don't delete) */ public PipeServiceImpl() { } /** * {@inheritDoc} * * <p/>We create only a single interface object and return it over and over * again. */ public synchronized Service getInterface() { if (null == myInterface) { myInterface = new PipeServiceInterface(this); } return myInterface; } /** * {@inheritDoc} */ public Advertisement getImplAdvertisement() { return implAdvertisement; } /** * {@inheritDoc} */ public synchronized void init(PeerGroup pg, ID assignedID, Advertisement impl) { implAdvertisement = (ModuleImplAdvertisement) impl; myGroup = pg; if (LOG.isEnabledFor(Level.INFO)) { StringBuffer configInfo = new StringBuffer("Configuring Pipe Service : " + assignedID); configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: " + implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : " + implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : " + implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : " + implAdvertisement.getCode()); configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : " + myGroup.getPeerGroupName()); configInfo.append("\n\t\tGroup ID : " + myGroup.getPeerGroupID()); configInfo.append("\n\t\tPeer ID : " + myGroup.getPeerID()); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tVerify Interval : " + VERIFYINTERVAL + "ms"); LOG.info(configInfo); } } /** * {@inheritDoc} * * <p/>Currently this service does not expect arguments. */ public synchronized int startApp(String[] args) { Service needed = myGroup.getResolverService(); if(null == needed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a resolver service"); } return START_AGAIN_STALLED; } needed = myGroup.getMembershipService(); if(null == needed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a membership service"); } return START_AGAIN_STALLED; } needed = myGroup.getRendezVousService(); if(null == needed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a rendezvous service"); } return START_AGAIN_STALLED; } // create our resolver handler; it will register itself w/ the resolver. pipeResolver = new PipeResolver(myGroup); needed = myGroup.getRendezVousService(); if(null == needed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Stalled until there is a rendezvous service"); } return START_AGAIN_STALLED; } // Create the WirePipe (propagated pipe) wirePipe = new WirePipeImpl(myGroup, pipeResolver); wirePipe.startApp(args); started = true; return 0; } /** * {@inheritDoc} */ public synchronized void stopApp() { started = false; try { if (wirePipe != null) { wirePipe.stopApp(); } } catch (Throwable failed) { LOG.error("Failed to stop wire pipe", failed); } finally { wirePipe = null; } try { if (pipeResolver != null) { pipeResolver.stop(); } } catch (Throwable failed) { LOG.error("Failed to stop pipe resolver", failed); } finally { pipeResolver = null; } // Avoid cross-reference problem with GC myGroup = null; myInterface = null; // clear outputPipeListeners Collection<Map<Integer,OutputPipeHolder>> values = outputPipeListeners.values(); for (Map<Integer,OutputPipeHolder> value: values) { value.clear(); } outputPipeListeners.clear(); } /** * {@inheritDoc} */ public InputPipe createInputPipe(PipeAdvertisement adv) throws IOException { return createInputPipe(adv, null); } /** * {@inheritDoc} */ public InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException { if (!started) { throw new IllegalStateException("Pipe Service has not been started or has been stopped"); } String type = adv.getType();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -