⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bidirectionalpipeservice.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
字号:
/*
 * Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 *    if any, must include the following acknowledgment:
 *       "This product includes software developed by the
 *       Sun Microsystems, Inc. for Project JXTA."
 *    Alternately, this acknowledgment may appear in the software itself,
 *    if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
 *    not be used to endorse or promote products derived from this
 *    software without prior written permission. For written
 *    permission, please contact Project JXTA at http://www.jxta.org.
 *
 * 5. Products derived from this software may not be called "JXTA",
 *    nor may "JXTA" appear in their name, without prior written
 *    permission of Sun.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of Project JXTA.  For more
 * information on Project JXTA, please see
 * <http://www.jxta.org/>.
 *
 * This license is based on the BSD license adopted by the Apache Foundation.
 *
 * $Id: BidirectionalPipeService.java,v 1.2 2002/03/04 21:43:01 echtcherbina Exp $
 */
package net.jxta.impl.util;

import net.jxta.pipe.PipeService;
import net.jxta.pipe.PipeID;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.discovery.DiscoveryService;
import net.jxta.document.Advertisement;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.Document;
import net.jxta.document.MimeMediaType;
import net.jxta.endpoint.Message;
import net.jxta.id.ID;
import net.jxta.id.IDFactory;
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupID;
import net.jxta.peergroup.PeerGroupFactory;
import net.jxta.protocol.PipeAdvertisement;

import java.io.*;
import java.util.ResourceBundle;
import java.lang.reflect.InvocationTargetException;


/**
 * The BidirectionalPipeService simulates bi-directional pipes
 * by using unidirectional pipes.
 */
public class BidirectionalPipeService {

    PeerGroup peerGroup;
    DiscoveryService discoveryService;
    net.jxta.pipe.PipeService pipeService;

    /**
     * Create a BidirectionalPipeService for a given peer group object.
     * The service will work with the group's PipeService and DiscoveryService.
     *
     * @param peerGroup  the peer group for which to create the service
     */
    public BidirectionalPipeService (PeerGroup peerGroup) {
	
	this.peerGroup        = peerGroup;
	this.pipeService      = peerGroup.getPipeService ();
	this.discoveryService = peerGroup.getDiscoveryService ();
    }

    
    
    /**
     * Create an "AcceptPipe" - the interface using which you can
     * accept incoming bi-directional pipe connections.  This method
     * will also publish the AcceptPipe's advertisement into the 
     * peer group.
     *
     * @param pipeName the name of the accepting pipe to create
     */
    public AcceptPipe bind (String pipeName) throws IOException {
	PipeAdvertisement inputPipeAdv;
	InputPipe inputPipe;
	
	inputPipeAdv = (PipeAdvertisement)
	    AdvertisementFactory.newAdvertisement
	    ("jxta:PipeAdvertisement");
	
	inputPipeAdv.setPipeID (IDFactory.newPipeID
				((PeerGroupID) peerGroup.getPeerGroupID ()));
	inputPipeAdv.setName (pipeName + ".end1");

	inputPipe = pipeService.createInputPipe (inputPipeAdv);

	discoveryService.publish (inputPipeAdv, DiscoveryService.ADV);
	discoveryService.remotePublish (inputPipeAdv, DiscoveryService.ADV);
	
	System.out.println ("Published bidir pipe " + inputPipeAdv.getPipeID ());

	return new AcceptPipe (inputPipeAdv, inputPipe);
    }


    /**
     * An AcceptPipe can be used to accept incoming bi-directional
     * pipe connections.
     * 
     * @see BidirectionalPipeService.AcceptPipe#accept
     */
    public class AcceptPipe {

	PipeAdvertisement acceptInputPipeAdv;
	InputPipe         acceptInputPipe;
	boolean           done;
	
	AcceptPipe (PipeAdvertisement acceptInputPipeAdv,
		    InputPipe         acceptInputPipe)
	{
	    this.acceptInputPipeAdv = acceptInputPipeAdv;
	    this.acceptInputPipe    = acceptInputPipe;
	    this.done               = false;
	}


	/**
	 * Get the advertisement that was published for this AcceptPipe.
	 */
	public PipeAdvertisement getAdvertisement () {
	    return acceptInputPipeAdv;
	}

	/**
	 * Accept an incoming bi-directional pipe connection.
	 *
	 * @param timeout the maximum time to wait for an incoming connection.
	 */
	public BidirectionalPipeService.Pipe accept (int timeout) 
	    throws IOException, InterruptedException
	{
	    return accept (timeout, null);
	}

	/**
	 * Accept an incoming bi-directional pipe connection.  All
	 * messages arriving on that pipe will be sent to the 
	 * MessageListener.
	 *
	 * @param timeout the maximum time to wait for an incoming connection.
	 * @param ml      the message listener that will be called for each
	 *                arriving message.
	 */
	public BidirectionalPipeService.Pipe accept (int             timeout,
						     MessageListener ml) 
	    throws IOException, InterruptedException
	{
	    // read an InputPipeAdv message from accepting pipe
	    // create an output pipe from advertisement in InputPipeAdv
	    // create an input pipe to which new connection can write to
	    // send the advertisement for this input pipe to whoever
	    // is trying to connect in an InputPipeAdvAck message, so
	    // that they can write
	    
	    Message           msg;
	    PipeAdvertisement inputPipeAdv;
	    PipeAdvertisement outputPipeAdv;
	    InputPipe         inputPipe;
	    OutputPipe        outputPipe;
	    Document          inputPipeAdvDoc;

	    msg = acceptInputPipe.poll (timeout/2);
	    
	    if (msg == null) {
		throw new InterruptedException ("Timed out in accept.");
	    } else if (done && msg.hasElement ("Close")) {
		
		throw new InterruptedException ();
	    } else if (! done) {
		InputStream in = msg.getElement ("InputPipeAdv").getStream();

		    outputPipeAdv = (PipeAdvertisement)
			AdvertisementFactory.newAdvertisement
			(new MimeMediaType ("text", "xml"), 
			 in);	

		outputPipe = 
		    pipeService.createOutputPipe (outputPipeAdv,
						  timeout/2);
		// create input pipe to which new connection can write to
		    inputPipeAdv = (PipeAdvertisement)
			AdvertisementFactory.newAdvertisement
			("jxta:PipeAdvertisement");
		
		inputPipeAdv.setPipeID(
                    IDFactory.newPipeID( (PeerGroupID)
                                         peerGroup.getPeerGroupID () ) );
		inputPipeAdv.setName (outputPipeAdv.getName () + ".out");
		
		if (ml == null)
		    inputPipe = pipeService.createInputPipe (inputPipeAdv);
		else
		    inputPipe = pipeService.createInputPipe
			(inputPipeAdv, 
			 new PipeMsgListenerImpl (ml, outputPipe));
		

		try {
		    inputPipeAdvDoc = 
			inputPipeAdv.getDocument (new MimeMediaType
						  ("text/xml"));
		} catch (Exception e) {
		    throw new IOException ("Unable to create input pipe " +
					   "advertisement document.");
		}

		
		msg = pipeService.createMessage ();
		msg.addElement (msg.newMessageElement("InputPipeAdvAck",
						      null,
						      inputPipeAdvDoc.getStream ()));
		System.out.println ("Sending bidir pipe ack.");
		outputPipe.send (msg);

		return new Pipe (inputPipe, outputPipe);
	    } else {
		throw new IOException ("Pipe closed.");
	    }
	}

	/**
	 * Stop accepting incoming bi-directional pipe connections.
	 */
	public synchronized void close () {
	    
	    done = true;

	    // will unblock the thread in inputPipe.read () ?

	    acceptInputPipe.close ();
	}
    }

    /*
    public Pipe connect (PipeAdvertisement adv, int timeout) 
	throws IOException
    {
	connect (adv, timeout, null);
    }
    */

    /**
     * Connect to an accepting pipe published under the given advertisement.
     *
     * @param adv     the advertisement of the pipe to connect to.
     * @param timeout the maximum time to wait for a connection to be set up.
     */
    public Pipe connect (PipeAdvertisement adv, 
			 int               timeout)
	throws IOException
    {
	// open output pipe
	// create my own adv
	// create my own pipe
	// send my adv to output pipe
	// receive ack message on my own pipe
	
	OutputPipe        connectToPipe;
	OutputPipe        outputPipe;
	InputPipe         inputPipe;

	PipeAdvertisement outputPipeAdv;
	PipeAdvertisement inputPipeAdv;
	Document          inputPipeAdvDoc;
	Message           msg;
	InputStream       in;

	connectToPipe = pipeService.createOutputPipe (adv,
						      (int)timeout/2);

	inputPipeAdv = (PipeAdvertisement)
	    AdvertisementFactory.newAdvertisement
	    ("jxta:PipeAdvertisement");
	
	inputPipeAdv.setPipeID (IDFactory.newPipeID
				((PeerGroupID) peerGroup.getPeerGroupID ()));
	inputPipeAdv.setName (adv.getName () + ".in");
	
	// if (ml == null)
	    inputPipe = pipeService.createInputPipe (inputPipeAdv);
	    /*
	else
	    inputPipe = 
		pipeService.createInputPipe (inputPipeAdv,
	    			     new PipeMsgListenerImpl (ml));
	    */
	msg = pipeService.createMessage( );

	try {
	    inputPipeAdvDoc = 
		inputPipeAdv.getDocument (new MimeMediaType ("text/xml"));
	} catch (Exception e) {
	    throw new IOException ("Unable to create input pipe " +
				   "advertisement document.");
	}

	msg.addElement (msg.newMessageElement ("InputPipeAdv",
					       null,
					       inputPipeAdvDoc.getStream ()));

	connectToPipe.send (msg);

	try {
	    msg = inputPipe.poll ((int)timeout/2);
	    
	    if (msg == null || ((in = msg.getElement ("InputPipeAdvAck").getStream()) == null))
		throw new IOException ("Expecting bi-directional pipe " +
				       "confirmation.");
	} catch (InterruptedException e) {
	    throw new IOException ("Did not receive expected bi-directional "+
				   "confirmation.");
	}
	
	outputPipeAdv = (PipeAdvertisement)
	    AdvertisementFactory.newAdvertisement
		(new MimeMediaType ("text", "xml"), 
		 in);	

	outputPipe = pipeService.createOutputPipe (outputPipeAdv,
						   timeout/2);

	return new Pipe (inputPipe, outputPipe);
    }


    /**
     * This class represents a bi-directional pipe connection.
     */
    public class Pipe {

	InputPipe       inputPipe;
	OutputPipe      outputPipe;
	MessageListener msgListener;

	Pipe (InputPipe inputPipe, OutputPipe outputPipe) {
	    this.inputPipe = inputPipe;
	    this.outputPipe = outputPipe;
	}
	
	/**
	 * Get the input pipe that can be used to poll for messages.
	 */
	public InputPipe getInputPipe () {
	    return inputPipe;
	}
	
	/**
	 * Get the output pipe that can be used to send messages.
	 */
	public OutputPipe getOutputPipe () {
	    return outputPipe;
	}


    }

    /**
     * Implementations of this interface, when passed to the
     * AcceptPipe's accept() method, will be called back with
     * messages that arrive to that pipe.
     *
     * @see BidirectionalPipeService.MessageListener#messageReceived
     * @see BidirectionalPipeService.AcceptPipe#accept(long,MessageListener)
     */
    public interface MessageListener {

	/**
	 * The metdod invoked for each incoming message received by
	 * a pipe created with AcceptPipe's accept() method.
	 * 
	 * @param msg          the received message
	 * @param responsePipe the output pipe to which responses can be
	 *                     written
	 *  @see BidirectionalPipeService.AcceptPipe#accept(long,MessageListener)
	 */
	public void messageReceived (Message msg, OutputPipe responsePipe);
    }


    class PipeMsgListenerImpl implements PipeMsgListener {
	
	MessageListener msgListener;
	OutputPipe      outputPipe;
	
	PipeMsgListenerImpl (MessageListener msgListener,
			     OutputPipe      outputPipe)
	{
	    this.msgListener = msgListener;
	    this.outputPipe  = outputPipe;
	}

	public void pipeMsgEvent (PipeMsgEvent ev) {
	    msgListener.messageReceived (ev.getMessage (),
					 outputPipe);
	}
    }
    
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -