📄 jxtacast.java
字号:
/*
* Copyright (c) 2002 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*
* $Id: JxtaCast.java,v 1.1 2009/04/15 14:51:47 ljz Exp $
*
*/
/*****************************************************************************
*
* JxtaCast release history
*
* Version numbers below correspond to the JxtaCast.version string, not to the
* CVS check-in ID.
*
* 1.00 03/18/02 Beta release. The class was named "FileCast", and was a
* part of the PicShare demo. There was an unversioned alpha
* on 03/09/02.
* 1.01 03/20/02 Don't show "duplicate block" message if we were the
* ones that sent the block. Create pipes AFTER advs have
* been published.
* 1.02 03/23/02 Use "average time between blocks" calculation to determine
* wait time before requesting missing blocks.
* Shorten inactive lifetime of input wranglers to 5 minutes.
* Put sending peer name in more log messages.
* 1.03 04/07/02 Change name to JxtaCast, change package location.
* 1.04 10/11/02 Migrate from deprecated Message and PipeService methods.
* 2.00 04/07/03 Tested with JXTA 2.0 platform: JXTA_2_0_Stable_20030301.
* Bumped default block size to 12kb. Tightened time between
* BossCheck checks, from 2 seconds to 1.
*
*****************************************************************************/
/*@ Modified by Hisham Khalil */
package net.jxta.jxtacast;
import java.io.*;
import java.util.*;
import net.jxta.discovery.*;
import net.jxta.document.*;
import net.jxta.endpoint.*;
import net.jxta.id.*;
import net.jxta.peergroup.*;
import net.jxta.pipe.*;
import net.jxta.protocol.*;
import net.jxta.rendezvous.*;
/*
* JxtaCast: Sends data files to all peers in a peer group (those that are
* listening for them with JxtaCast). Receives data files sent
* by other JxtaCast users.
*
* Large files are broken up and sent in blocks. Since blocks may
* arrive out of order, the receivers re-assemble all the blocks in
* memory before writing the file. (JxtaCast is therefore a memory hog
* if used with very large files... Should change it to read/write
* blocks directly from disk files.)
*
* The default block size is set in the public outBlockSize variable.
* Client applications can change this size if they wish. The default
* size is 12kb. The maximum message size that can be sent using IP
* multicasting is 16kb, so we want to stay under that. You can use
* bigger blocks if you are always using a rendezvous.
*
* This class was originally named "FileCast". After finding that the
* name had already been used (oops), we changed it to JxtaCast.
* The string "FileCast" is still used in some places, to maintain
* backwards compatibility with earlier versions.
*/
public class JxtaCast
implements PipeMsgListener, Runnable {
// JxtaCast version number. The version number is placed in the messages that
// JxtaCast sends. Hopefully this will help us orchestrate communication
// between newer and older versions of JxtaCast.
//
public static String version = "2.00";
// JxtaCast supports two types of messages: FILE and CHAT. File messages are
// used to broadcast files to peers through the wire protocol. Chat messages
// can be sent through the same pipes. They allow the peers to carry on a side
// chat while sending and receiving files.
//
// Most of the following message elements are used for file messages. Chat
// messages require only the MESSAGETYPE, SENDERNAME, and CAPTION elements.
// The chat text is contained in the CAPTION element.
//
// All of the element's data values are stored in the message as strings, except
// for the DATABLOCK element, containing binary image file data. Numeric values
// such as FILESIZE are converted to strings for storage.
// Message element names.
final static String MESSAGETYPE = "MessageType"; // See list of types below.
final static String SENDERNAME = "JxtaTalkSenderName"; // The sending peer name.
final static String SENDERID = "SenderID"; // Peer ID of the sender.
final static String VERSION = "FileCastVersion"; // JxtaCast version number.
final static String CAPTION = "Caption"; // Description of the file.
final static String FILEKEY = "FileKey"; // Unique key for this file transaction.
final static String FILENAME = "FileName"; // File name (no path).
final static String FILESIZE = "FileSize"; // File size.
final static String BLOCKNUM = "BlockNum"; // Large files are sent in blocks.
final static String TOTALBLOCKS = "TotalBlocks"; // Total number of blocks in the file.
final static String BLOCKSIZE = "BlockSize"; // The size of one block.
final static String DATABLOCK = "DataBlock"; // One block of file data.
// REQTOPEER is a message element name, the value will usually be a peer ID.
// REQ_ANYPEER is a value for the REQTOPEER element, requesting from any peer.
//
final static String REQTOPEER = "ReqToPeer"; // Peer ID to which we're addressing a FILE_REQ message.
final static String REQ_ANYPEER = "ReqAnyPeer"; // Addressing the FILE_REQ message to any peer.
// MESSAGETYPE element data values.
final static String MSG_FILE = "FILE"; // File transfer message.
final static String MSG_FILE_ACK = "FILE_ACK"; // Block received acknowledgement.
final static String MSG_FILE_REQ = "FILE_REQ"; // Request a block from another peer.
final static String MSG_FILE_REQ_RESP = "FILE_REQ_RESP"; // Respond to a block request.
final static String MSG_CHAT = "CHAT"; // Chat message.
public final static String DELIM = "]--,',--["; // Delimiter for some pipe name sections.
public static boolean logEnabled; // Log debug messages if true.
public int outBlockSize = 12288; // Size of the data block to send with each message, in bytes.
public int outWranglerLifetime = 600000; // 10 mins: time to store inactive output wranglers, in millis.
public int inWranglerLifetime = 300000; // 5 mins: time to store inactive input wranglers, in millis.
public int timeTilReq = 60000; // 60 secs: max time that we'll wait before requesting missing file blocks.
public int trailBossPeriod = 1000; // 1 sec: worker thread sleep time between wrangler checks.
public String fileSaveLoc; // Destination directory for saved files.
// We may be receiving several files at once, from multiple peers. Since
// the files are sent in chunks, we need objects that can hold on to what
// we've got so far, while we process a piece of a different file. We'll
// use a hash table of FileWrangler objects. Each FileWrangler will handle
// the incoming messages for one file. The wrangler's composeKey() func will
// supply us with a unique hash key for each file transfer.
//
protected Hashtable wranglers = new Hashtable(40);
// When sending a file, requests are temporarily queued here. Another thread
// reads the queue, loads the file, and starts the transmission out through
// the pipes. This helps keep the GUI thread cleared for action. The vector
// will contain OutputFileWrangler objects.
//
protected Vector sendFileQueue = new Vector(10);
protected DiscoveryService disco;
protected PeerAdvertisement myPeer;
protected PeerGroup group;
protected String castName;
protected PipeService pipeServ;
protected InputPipe inputPipe; // Public propagation pipe, the "broadcast channel".
protected OutputPipe outputPipe; // Public propagation pipe, paired with the above.
protected InputPipe privInputPipe; // Private unicast pipe, the "back channel".
protected Vector jcListeners; // Registered JxtaCastEventListener objects.
protected Thread trailBossThread;
/** Constructor
*
* @param group - peergroup that we've joined.
* @param castName - name to use in the pipe advertisement ID , such as an
* application name. This permits the creation of
* multiple JxtaCast channels within a single group.
*/
public JxtaCast(PeerAdvertisement myPeer, PeerGroup group, String castName) {
this.myPeer = myPeer;
this.castName = new String(castName);
setPeerGroup(group);
// Default destination for saved files is the current directory.
fileSaveLoc = "." + File.separator;
// Create collection to hold JxtaCastEventListener objects.
jcListeners = new Vector(10);
}
public JxtaCast() {}
public void start() {
// Create a worker thread to handle file loading and message output.
// Also checks thru the list of FileWranglers to give any stalled
// file transactions kick in the pants.
//
trailBossThread = new Thread(this, "JxtaCast:TrailBoss");
trailBossThread.start();
}
public void stop() {
trailBossThread = null;
if (inputPipe != null) {
inputPipe.close();
}
if (outputPipe != null) {
outputPipe.close();
}
}
/** Return the currently joined peer group. */
public PeerGroup getPeerGroup() {
return group;
}
/** Change to a new peer group.
* @return true if we successfully created the pipes in the new group.
*/
public boolean setPeerGroup(PeerGroup group) {
boolean rc;
// If the new group is the same group we already have, it's a no-op.
if (this.group != null &&
group.getPeerGroupID().equals(this.group.getPeerGroupID()))
return true;
// By synchronizing on the wranglers object, we ensure that the
// trailboss thread is not trying to use the current pipes while
// we create new ones.
//
synchronized (wranglers) {
this.group = group;
disco = group.getDiscoveryService();
pipeServ = group.getPipeService();
rc = createPipes(castName);
}
return rc;
}
/** Log a debug message to the console. Should maybe use Log4J?
* Have to figure out whether we can use Log4J to show our application
* debug messages, but suppress all the JXTA platform messages.
*/
public static void logMsg(String msg) {
if (logEnabled)
System.out.println(msg);
}
/**
* Create an input and output pipe to handle the file transfers.
* Publish their advertisements so that other peers will find them.
*
* @param castName - name to use in the pipe advertisement ID.
* @return true if successful.
*/
protected boolean createPipes(String castName) {
// Close any existing pipes.
if (inputPipe != null)
inputPipe.close();
if (outputPipe != null)
outputPipe.close();
if (privInputPipe != null)
privInputPipe.close();
// Create the input and output pipes for the many-to-many "broadcast channel",
// using propagation pipes. The broadcast channel is used to send the
// file data out to all listening peers. First we cook up an
// advertisement, and then create the pipes using the adv.
//
PipeAdvertisement pipeAdvt;
pipeAdvt = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(
PipeAdvertisement.getAdvertisementType());
// This is a pre-defined ID for the propagate pipes used for file transfers.
// Using this known ID allows us to start using the pipes immediately,
// without having to discover other peers pipe advertisements first.
// There is, however, a potential for collision with another app using the
// same ID. We use a prefix given for this JxtaCast object (castName), and
// then append a string and byte array that should be unique to JxtaCast.
//
byte jxtaCastID[] = {
(byte) 0xAA, (byte) 0xAA, (byte) 0xAA, (byte) 0xAA,
(byte) 0xBB, (byte) 0xBB, (byte) 0xBB, (byte) 0xBB,
(byte) 0xBB, (byte) 0xBB, (byte) 0xBB, (byte) 0xBB,
(byte) 0xAA, (byte) 0xAA, (byte) 0xAA, (byte) 0xAA };
String idStr = castName + "-[FileCast Pipe ID]-" + new String(jxtaCastID);
PipeID id = (PipeID)IDFactory.newPipeID(group.getPeerGroupID(), idStr.getBytes());
pipeAdvt.setPipeID(id);
pipeAdvt.setName("JxtaTalkSenderName." + castName);
pipeAdvt.setType(PipeService.PropagateType);
try {
disco.publish(pipeAdvt);
disco.remotePublish(pipeAdvt);
inputPipe = pipeServ.createInputPipe(pipeAdvt, this);
outputPipe = pipeServ.createOutputPipe(pipeAdvt, 2000);
} catch (Exception e) {
e.printStackTrace();
return false;
}
// Create the input pipe for the "back channel", using a unicast pipe.
// Peers use this pipe for one-to-one communication, such as requesting
// a file block from a specific peer.
//
// FIXME - The back channel concept isn't fully implemented yet. We're
// creating the pipe and adv here, but not using the pipe anywhere.
// We want to leave this code active now, even though the pipes aren't
// used, because the advs are useful. JxtaCast apps can do a filtered
// adv discovery to detect other peers running the same JxtaCast app.
//
// TODO - Include this adv in outgoing messages, so that receivers
// can respond thru the back channel pipe.
//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -