📄 jxtacast.java
字号:
String senderId; // Sender's peer ID.
String filename;
String caption = "";
byte fdata[]; // The file data.
int myBlockSize; // This file's block size.
int totalBlocks; // Number of blocks in the file.
long lastActivity; // Timestamp when the most recent message was processed.
/** Process a file transfer message.
*/
public abstract void processMsg(Message msg);
/** Receive a regular 'maintainence' check-in from the TrailBoss thread.
*/
public abstract void bossCheck();
/** Create a unique key for this file transfer. */
public static String composeKey(String senderId, String filename) {
// The key is a combination of the sender's PeerId, the file name,
// and a timestamp.
String keyStr = senderId + "+" + filename + "+" +
String.valueOf(System.currentTimeMillis());
return keyStr;
}
/** Send the specified block of file data out over the wire.
*
* @param blockNum The block to send.
* @param msgType Message type: MSG_FILE or MSG_FILE_REQ_RESP.
*/
synchronized void sendBlock(int blockNum, String msgType) {
// Make sure it's a valid block.
if (blockNum < 0 || blockNum >= totalBlocks)
return;
try {
lastActivity = System.currentTimeMillis();
// Create a message, fill it with our standard headers.
Message msg = new Message();
jc.setMsgString(msg, jc.MESSAGETYPE, msgType);
jc.setMsgString(msg, jc.SENDERNAME, jc.myPeer.getName());
jc.setMsgString(msg, jc.SENDERID, jc.myPeer.getPeerID().toString());
jc.setMsgString(msg, jc.VERSION, jc.version);
jc.setMsgString(msg, jc.FILEKEY, key);
jc.setMsgString(msg, jc.FILENAME, filename);
jc.setMsgString(msg, jc.FILESIZE, String.valueOf(fdata.length));
// If we've got a caption, store it in the first message.
if (blockNum == 0 && caption != null)
jc.setMsgString(msg, jc.CAPTION, caption);
// Place the block info in the message.
jc.setMsgString(msg, jc.BLOCKNUM, String.valueOf(blockNum));
jc.setMsgString(msg, jc.TOTALBLOCKS, String.valueOf(totalBlocks));
jc.setMsgString(msg, jc.BLOCKSIZE, String.valueOf(myBlockSize));
// Place the block of file data in the message.
// If this is the last block, it's probably smaller than a full block.
//
int bSize = myBlockSize;
if (blockNum == totalBlocks - 1)
bSize = fdata.length - (blockNum * myBlockSize);
ByteArrayMessageElement elem = new ByteArrayMessageElement(
jc.DATABLOCK, null, fdata, blockNum * myBlockSize, bSize, null);
msg.replaceMessageElement(elem);
// Send the message.
jc.logMsg("Sending: " + filename + " block: " + (blockNum+1) +
" of: " + totalBlocks);
jc.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* Class for sending a file.
*
* Files are split up and sent in blocks of data. This class loads the
* file into memory, and sends out data blocks. It re-send blocks in response
* to requests from other peers.
*
*/
class OutputFileWrangler extends FileWrangler {
File file;
int blocksSent; // Number of outgoing blocks processed so far.
/**
* Constructor - Build a wrangler to process an outgoing file.
*
*/
public OutputFileWrangler(JxtaCast jc, File file, String caption) {
this.jc = jc;
this.file = file;
lastActivity = System.currentTimeMillis();
// Get some header data that we only need once.
sender = jc.myPeer.getName();
senderId = jc.myPeer.getPeerID().toString();
filename = file.getName();
this.caption = caption;
key = composeKey(senderId, filename);
// Get info about the blocks.
blocksSent = 0;
myBlockSize = jc.outBlockSize;
int lastBlockSize = (int)file.length() % myBlockSize;
totalBlocks = (int)file.length() / myBlockSize;
if (lastBlockSize != 0)
totalBlocks++;
}
/** Process a file transfer message.
*/
public void processMsg(Message msg) {
lastActivity = System.currentTimeMillis();
// Since this is an output wrangler, we ignore messages of MSG_FILE.
// They came from us! Respond to ACK and REQ messages from peers
// that are receiving this file from us.
//
String msgType = jc.getMsgString(msg, jc.MESSAGETYPE);
if (msgType.equals(jc.MSG_FILE_ACK))
processMsgAck(msg);
else if (msgType.equals(jc.MSG_FILE_REQ))
processMsgReq(msg);
}
/** Receive a regular 'maintainence' check-in from the TrailBoss.
*/
public void bossCheck() {
// If there's been no activity since our last check-in, and we still have
// blocks to send, send the next one now. (But make sure we've sent at
// least one block. If we haven't, then the other thread hasn't gotten
// thru the sendFile() function yet, and the wrangler is not initialized.)
//
if (blocksSent > 0 &&
blocksSent < totalBlocks &&
System.currentTimeMillis() - lastActivity > jc.trailBossPeriod + 500) {
jc.logMsg("bossCheck sending block.");
sendBlock(blocksSent++, jc.MSG_FILE);
updateProgress();
}
// If this wrangler has been inactive for a long time, remove it from
// JxtaCast's collection.
if (System.currentTimeMillis() - lastActivity > jc.outWranglerLifetime)
jc.wranglers.remove(key);
}
/** Process a file transfer ACK message.
*
* Peers will send us an ACK message when they've received a block.
* When we get one (from any peer), for the last block we've sent,
* then we can send the next block.
*/
private void processMsgAck(Message msg) {
int blockNum = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKNUM));
jc.logMsg("Received ACK: " + filename + " block " + (blockNum+1) +
", from " + jc.getMsgString(msg, jc.SENDERNAME));
// If there are more blocks to send, send the next one now.
int nextBlock = blockNum + 1;
if (nextBlock == blocksSent && nextBlock < totalBlocks) {
blocksSent++;
sendBlock(nextBlock, jc.MSG_FILE);
updateProgress();
}
}
/** Process a file transfer REQ message.
*
* A peer has requested a block of this file. Send it out as a
* REQ_RESP 'request response' message.
*/
private void processMsgReq(Message msg) {
int blockNum = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKNUM));
// If this is a request for a block we haven't sent yet, send the
// next block as a normal MSG_FILE message, instead of as a REQ_RESP
// (request response). We want to keep to the "push" protocol of
// MSG_FILE/MSG_FILE_ACK messages until we've sent all the blocks
// one time. The peers will use the "pull" REQ/REQ_RESP protocol
// to fill in their missing blocks.
//
if (blockNum >= blocksSent) {
jc.logMsg("Received " + jc.getMsgString(msg, jc.MESSAGETYPE) +
": " + filename + " block " + (blockNum+1) +
", from " + jc.getMsgString(msg, jc.SENDERNAME));
sendBlock(blocksSent++, jc.MSG_FILE);
updateProgress();
return;
}
// Send out the block, but only if the request was addressed to us or
// to "any peer".
String reqToPeer = jc.getMsgString(msg, jc.REQTOPEER);
if (reqToPeer.equals(jc.myPeer.getPeerID().toString()) ||
reqToPeer.equals(jc.REQ_ANYPEER)) {
if (reqToPeer.equals(jc.REQ_ANYPEER))
jc.logMsg("Received REQ_ANYPEER: " + filename + " block " +
(blockNum+1) + ", from " + jc.getMsgString(msg, jc.SENDERNAME));
else
jc.logMsg("Received FILE_REQ: " + filename + " block " +
(blockNum+1) + ", from " + jc.getMsgString(msg, jc.SENDERNAME));
sendBlock(blockNum, jc.MSG_FILE_REQ_RESP);
}
}
/** Start the file transfer process.
*
* We read the file into memory here, instead of in the constructor,
* so that the operation will take place in the desired thread. (See the
* JxtaCast.sendFile() method.)
*
* We'll send out the file's first data block. Additional blocks will be
* sent in response to acknowledgement messages from the peers, or in response
* to bossCheck() calls from the TrailBoss (whichever comes faster).
*
* Why? Because if we tried to send all the blocks at once, we'd overload
* the capabilities of the propagate pipes, and lots of messages would be
* dropped. So we use the ACK in order to send blocks at a rate that can
* be managed.
*/
public void sendFile() {
blocksSent = 0;
// Allocate space to store the file data.
fdata = new byte[(int)file.length()];
// Read the file into memory.
try {
FileInputStream fis = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fis);
bis.read(fdata, 0, fdata.length);
fis.close();
} catch (Exception ex) {
ex.printStackTrace();
}
// Send out the first block;
sendBlock(blocksSent++, jc.MSG_FILE);
updateProgress();
}
/** Notify listeners of our transmission progress.
*/
private void updateProgress() {
// Notify listeners of file progress.
JxtaCastEvent e = new JxtaCastEvent();
e.transType = e.SEND;
e.filename = new String(filename);
e.filepath = new String(jc.fileSaveLoc);
e.sender = new String(sender);
e.senderId = new String(senderId);
if (caption != null)
e.caption = new String(caption);
e.percentDone = ((float)blocksSent / totalBlocks) * 100;
jc.sendJxtaCastEvent(e);
}
}
/**
* Class for receiving a file.
*
* Files are split up and sent in blocks of data. This class gathers
* the data blocks for a file as they come in, and writes the file
* to disk when it is complete.
*
* It's ok for blocks to arrive out of order, and ok for duplicate blocks
* to arrive. The wrangler can send out requests for missing blocks, and
* also provide blocks for other peers that are missing them.
*/
class InputFileWrangler extends FileWrangler {
boolean blockIn[]; // Each slot is true when we've received the corresponding data block.
String lastAck[]; // Parallel to block array, ID of last peer to send an ACK for each block.
boolean askedOrig[]; // Parallel to block array, true if we've asked the original sender for this block.
int blocksReceived; // Number of incoming blocks processed so far.
String reqLevel; // Method of requesting missing blocks: from original sender or from anyone.
int currReqBlock; // Block to request if it's missing.
long lastReqTime; // Timestamp when we last requested a missing block.
long firstBlockTime; // Timestamp when we received the first block message.
long latestBlockTime; // Timestamp when we received the most recent block.
long minTimeToWait; // Minimum time to wait with no activity before requesting a block.
/**
* Constructor - Build a wrangler to process an incoming file.
*
* The message used in the constructor doesn't have to be the first
* message in the sequence. Any will do. The message is not
* processed from the constructor, so be sure to call processMsg() as
* well.
*/
public InputFileWrangler(JxtaCast jc, Message msg) {
this.jc = jc;
lastActivity = System.currentTimeMillis();
// Get some header data that we only need once.
sender = jc.getMsgString(msg, jc.SENDERNAME);
senderId = jc.getMsgString(msg, jc.SENDERID);
key = jc.getMsgString(msg, jc.FILEKEY);
filename = jc.getMsgString(msg, jc.FILENAME);
// Get info about the blocks.
blocksReceived = 0;
totalBlocks = Integer.parseInt(jc.getMsgString(msg, jc.TOTALBLOCKS));
myBlockSize = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKSIZE));
// Allocate space to store the file data. We also create an array
// to check off the blocks as we process them, and a couple parallel arrays
// to track who to ask for missing blocks.
//
fdata = new byte[Integer.parseInt(jc.getMsgString(msg, jc.FILESIZE))];
blockIn = new boolean[totalBlocks];
lastAck = new String[totalBlocks];
askedOrig = new boolean[totalBlocks];
// Initialize tracking info for blocks to request.
currReqBlock = 0;
lastReqTime = System.currentTimeMillis();
minTimeToWait = 4000;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -