📄 jxtacast.java
字号:
}
/** Process a file transfer message.
*/
public void processMsg(Message msg) {
String msgType = jc.getMsgString(msg, jc.MESSAGETYPE);
if (msgType.equals(jc.MSG_FILE) || msgType.equals(jc.MSG_FILE_REQ_RESP))
processMsgFile(msg);
else if (msgType.equals(jc.MSG_FILE_ACK))
processMsgAck(msg);
else if (msgType.equals(jc.MSG_FILE_REQ))
processMsgReq(msg);
}
/** Receive a regular 'maintainence' check-in from the TrailBoss.
*/
public void bossCheck() {
// If this wrangler has been inactive for a long time, remove it from
// JxtaCast's collection.
if (System.currentTimeMillis() - lastActivity > jc.inWranglerLifetime)
jc.wranglers.remove(key);
// Calculate the time of inactivity that we'll endure before
// requesting missing blocks. First determine the average amount
// of time between blocks for the blocks we've received so far.
// We'll wait either thrice that amount, or the time contained in
// JxtaCast's timeTilReq member, whichever is shorter. Let's also
// impose a minimum of a few seconds.
//
// This calculation should help us find an optimal time to wait,
// based on current network conditions. We want to give any missing
// blocks an adequate amount of time to reach us before we give up and
// start requesting them. But this amount of time can be very different
// depending on the network topology. It is very short when the sending
// and receiving peers are on the same subnet. It can be long, over 30
// seconds, if the peers are separated by an HTTP relay.
//
// The minimum time to wait starts out at a few seconds, and grows each
// time we send a REQ, until a new file block comes in. Then it is
// reset. This will keep a single peer from spewing out too many requests.
//
long timeToWait = jc.timeTilReq;
long avgTimeTweenBlocks;
if (blocksReceived > 1) {
avgTimeTweenBlocks = (latestBlockTime - firstBlockTime) / blocksReceived;
if ((avgTimeTweenBlocks * 3) < timeToWait)
timeToWait = avgTimeTweenBlocks * 2;
if (timeToWait < minTimeToWait)
timeToWait = minTimeToWait;
}
// Are we missing any blocks? We'll request missing blocks, but don't
// want to do it too often, or we'll queue up a bunch requests and then
// receive a bunch of duplicate blocks.
//
if (blocksReceived < totalBlocks &&
System.currentTimeMillis() - lastReqTime > timeToWait &&
System.currentTimeMillis() - lastActivity > timeToWait) {
requestNextMissingBlock();
}
}
/** Process one incoming block of file data.
*/
public void processMsgFile(Message msg) {
lastActivity = System.currentTimeMillis();
try {
int blockNum = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKNUM));
String msgSender = jc.getMsgString(msg, jc.SENDERNAME);
String msgSenderId = jc.getMsgString(msg, jc.SENDERID);
// Have we already processed this block? If we've received a
// duplicate message, ignore it.
//
if (blockIn[blockNum] == true) {
// Log a msg, unless we were the sender.
if (!msgSenderId.equals(jc.myPeer.getPeerID().toString()));
jc.logMsg("Duplicate block: " + filename + " block: " + (blockNum+1));
return;
}
// Record some timestamps to be used later. bossCheck() uses these
// to calculate an average time between blocks.
latestBlockTime = lastActivity;
if (blocksReceived == 0)
firstBlockTime = lastActivity;
minTimeToWait = 4000;
// The caption is stored with the first block.
if (blockNum == 0)
caption = jc.getMsgString(msg, jc.CAPTION);
jc.logMsg("From " + sender + " - " + " < " + filename +
" > block: " + (blockNum+1) + " of " + totalBlocks);
// Get the file data block, place it in our data array.
MessageElement elem = msg.getMessageElement(jc.DATABLOCK);
if (elem == null)
return;
byte dataBlock[] = elem.getBytes(false);
System.arraycopy(dataBlock, 0, fdata, blockNum * myBlockSize, dataBlock.length);
// Record that we've processed this block.
blockIn[blockNum] = true;
blocksReceived++;
// Acknowledge receipt of the block, so the sender will send the next.
// This also serves to notify other peers that we have received this block;
// they may request it from us.
//
sendAck(msg);
// Are we done? Then write the file. Otherwise, if this was a response to
// a missing block request, ask for the next one.
//
if (blocksReceived == totalBlocks) {
writeFile();
}
else if (jc.getMsgString(msg, jc.MESSAGETYPE).equals(jc.MSG_FILE_REQ_RESP)) {
// The REQ_RESP may not have been in response to a request from this
// peer. But assume that if one peer is already requesting dropped
// block messages, we should be too.
//
requestNextMissingBlock();
}
// Notify listeners of file progress.
JxtaCastEvent e = new JxtaCastEvent();
e.transType = e.RECV;
e.filename = new String(filename);
e.filepath = new String(jc.fileSaveLoc);
e.senderId = new String(senderId);
if (sender == null)
sender = "<anonymous>";
e.sender = new String(sender);
if (caption != null)
e.caption = new String(caption);
e.percentDone = ((float)blocksReceived / totalBlocks) * 100;
jc.sendJxtaCastEvent(e);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/** Send an acknowledgement that we've received a file data block.
*/
private void sendAck(Message msg) {
try {
// Create and send and ACK message.
Message ackMsg = new Message();
jc.setMsgString(ackMsg, jc.MESSAGETYPE, jc.MSG_FILE_ACK);
jc.setMsgString(ackMsg, jc.SENDERNAME, jc.myPeer.getName());
jc.setMsgString(ackMsg, jc.SENDERID, jc.myPeer.getPeerID().toString());
jc.setMsgString(ackMsg, jc.VERSION, jc.version);
jc.setMsgString(ackMsg, jc.FILEKEY, jc.getMsgString(msg, jc.FILEKEY));
jc.setMsgString(ackMsg, jc.FILENAME, filename);
jc.setMsgString(ackMsg, jc.BLOCKNUM, jc.getMsgString(msg, jc.BLOCKNUM));
// Send the ACK message.
int blockNum = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKNUM));
jc.logMsg("Sending ACK: " + filename + " block " + (blockNum+1));
jc.sendMessage(ackMsg);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/** Send a request for specific file data block.
*
* @param blockNum the block to request.
*/
private void sendReq(int blockNum) {
// Increase the wait time every time we send a request. It's reset
// when we get a response. The longer we go without getting a response,
// the less often we'll send requests.
minTimeToWait += 4000;
try {
// Create a message, fill it with key info, and the block number.
Message reqMsg = new Message();
jc.setMsgString(reqMsg, jc.MESSAGETYPE, jc.MSG_FILE_REQ);
jc.setMsgString(reqMsg, jc.SENDERNAME, jc.myPeer.getName());
jc.setMsgString(reqMsg, jc.SENDERID, jc.myPeer.getPeerID().toString());
jc.setMsgString(reqMsg, jc.VERSION, jc.version);
jc.setMsgString(reqMsg, jc.FILEKEY, key);
jc.setMsgString(reqMsg, jc.FILENAME, filename);
jc.setMsgString(reqMsg, jc.BLOCKNUM, String.valueOf(blockNum));
// Who are we requesting it from?
// If we've gotten an ACK for this block, ask that peer. Then clear
// him from the lastAck array, so we don't ask the same peer again.
//
String reqTo = "last ACK";
if (lastAck[blockNum] != null) {
jc.setMsgString(reqMsg, jc.REQTOPEER, lastAck[blockNum]);
lastAck[blockNum] = null;
} else if (!askedOrig[blockNum]) {
// We haven't asked the original sender for this block yet, so
// ask him now.
jc.setMsgString(reqMsg, jc.REQTOPEER, senderId);
askedOrig[blockNum] = true;
reqTo = "orig sender";
} else {
// Ask any peer to respond.
jc.setMsgString(reqMsg, jc.REQTOPEER, jc.REQ_ANYPEER);
reqTo = "ANYONE!";
}
// Send the REQ message. It'd be nice to send this through a
// "back channel" unicast pipe directly to the peer we're requesting
// from. For now we'll send it out over the wire. Everyone else
// can just ignore it.
//
jc.logMsg("Sending REQ to " + reqTo + ": " + filename +
" block " + (blockNum+1));
jc.sendMessage(reqMsg);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Request the next missing block of file data. We request the block from
* the latest peer known to have received that block. If none are known, or
* we've already requested from that peer and not gotten a response, we request
* from the original sender of the file. If we've already done that, then
* we request from anyone. (We hope not to have to do that, since it may
* result in many peers responding at once with the same data block.)
*/
private void requestNextMissingBlock() {
// Find and request the next missing block. We just request one block.
// We'll request the next after it comes in, or when the TrailBoss triggers
// the next bossCheck().
//
while (currReqBlock < blockIn.length) {
if (blockIn[currReqBlock] == false) {
sendReq(currReqBlock);
currReqBlock++;
lastReqTime = System.currentTimeMillis();
break;
}
currReqBlock++;
}
// If we've reached the end of the array, start over.
//
if (currReqBlock == blockIn.length)
currReqBlock = 0;
}
/** Process a file transfer ACK message.
*
* Peers will send out an ACK message when they've received a block.
* We'll keep track of the latest peer that sent an ACK for each block.
* If we don't get that block ourselves, we can request it from a peer
* that has it.
*/
private void processMsgAck(Message msg) {
// Ignore the ACK if it's from us.
String senderId = jc.getMsgString(msg, jc.SENDERID);
if (senderId.equals(jc.myPeer.getPeerID().toString()))
return;
int blockNum = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKNUM));
lastAck[blockNum] = jc.getMsgString(msg, jc.SENDERID);
jc.logMsg("Received ACK: " + filename + " block " +
(blockNum+1) + ", from " + jc.getMsgString(msg, jc.SENDERNAME));
}
/** Process a file transfer REQ message.
*
* A peer has requested a block of this file. If the request is addressed
* to us, or to any peer, send the block out as a REQ_RESP 'request response'
* message.
*/
private void processMsgReq(Message msg) {
// If it's not addressed to us, or to "any peer", bail out.
String reqToPeer = jc.getMsgString(msg, jc.REQTOPEER);
if (reqToPeer == null)
return;
if (!reqToPeer.equals(jc.myPeer.getPeerID().toString()) &&
!reqToPeer.equals(jc.REQ_ANYPEER))
return;
// If it's FROM us, bail out. (It's an any peer req that we sent.)
String reqSender = jc.getMsgString(msg, jc.SENDERID);
if (reqSender == null)
return;
if (reqSender.equals(jc.myPeer.getPeerID().toString()))
return;
int blockNum = Integer.parseInt(jc.getMsgString(msg, jc.BLOCKNUM));
if (reqToPeer.equals(jc.REQ_ANYPEER))
jc.logMsg("Received REQ_ANYPEER: " + filename + " block " +
(blockNum+1) + ", from " + jc.getMsgString(msg, jc.SENDERNAME));
else
jc.logMsg("Received FILE_REQ: " + filename + " block " +
(blockNum+1) + ", from " + jc.getMsgString(msg, jc.SENDERNAME));
// Send the block, if we actually do have it.
if (blockNum > 0 && blockNum < blockIn.length && blockIn[blockNum] == true)
sendBlock(blockNum, jc.MSG_FILE_REQ_RESP);
}
/** Write the file data to a disk file.
*/
private void writeFile() {
jc.logMsg("*** WRITING FILE *** " + jc.fileSaveLoc + filename);
try {
FileOutputStream fos = new FileOutputStream(jc.fileSaveLoc + filename);
BufferedOutputStream bos = new BufferedOutputStream(fos);
bos.write(fdata, 0, fdata.length);
bos.flush();
fos.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -