📄 joineddownload.java
字号:
/**这个类是专门从事join任务下载数据的工作,<br>
*它应该先于上传工作启动,只有这样才得以工作,<br>
*因为只有这时,JoinUploadBuffer缓冲区才会有数据出现。<br>
*
*
*/
package com.sinpool.rivercrescent.Task;
import net.jxta.endpoint.*;
import net.jxta.pipe.*;
import net.jxta.document.*;
import net.jxta.protocol.*;
import java.io.*;
import java.util.*;
import java.security.*;
import com.sinpool.rivercrescent.*;
import com.sinpool.rivercrescent.myutil.MyUtil;
import com.sinpool.rivercrescent.Listener.*;
import com.sinpool.rivercrescent.io.*;
import com.sinpool.rivercrescent.BufferZone.*;
public class JoinedDownload extends Thread{
private JoinedManagerControl joinedManagerControl = null;
private JoinedDownloadMessageBuffer joinedDownloadMessageBuffer = null;
private JoinedRequestBuffer joinedRequestBuffer = null;
private TaskInfo taskInfo = null;
private MimeMediaType mimetype = new MimeMediaType("text","xml");
private Message inmsg = new Message();
private Message outmsg = null;
private MessageElement me = null;
private String block = null;
private JoinedDownloadFindRequestListener jdfrl = null; //控制Request队列的监听器
private int filePieceLength = 0;
//实际向文件中写出的字节数。
private int writeFileDataLength = 0;
private String[] thisStatus = new String[2];
public JoinedDownload(JoinedDownloadMessageBuffer joinedDownloadMessageBuffer,
TaskInfo ti,JoinedManagerControl joinedManagerControl,
JoinedRequestBuffer joinedRequestBuffer,
JoinedDownloadFindRequestListener jdfrl){
this.joinedManagerControl = joinedManagerControl;
this.joinedDownloadMessageBuffer = joinedDownloadMessageBuffer;
this.taskInfo = ti;
this.joinedRequestBuffer = joinedRequestBuffer;
this.jdfrl = jdfrl;
}
public void run(){
String messageType = null;
byte[] fileData; //从消息中读取的数据
BufferedReader br = null;
InputStream is = null;
System.out.println("任务开始!");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "任务开始!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
while ( joinedManagerControl.searchJoinedManagerState() ){
label: try{
//如果下载完成,跳出循环
if ( taskInfo.getTaskPercentage().equals("100.0%")) {
do100();
break;
}
//如果第一个文件块没有下载,就请求元文件
//即,文件名是“?”
if ( taskInfo.getTaskFileName().equals("?") ){
boolean b = false;
for ( int i=0;i<joinedRequestBuffer.size();i++){
b = this.sendRequest("0",i);
if ( b ) break;
--i;
}
if ( !b )
try {
System.out.println("没有可请求的Peer,10 秒钟后再尝试连接。");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "没有可请求的Peer,10 秒钟后再尝试连接。";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
Thread.sleep(10 * 1000);
jdfrl.find_More();
continue;
} catch(Exception e) {};
}
//如果joinedDownloadMessageBuffer缓冲区没有数据了
//并且当前任务的第一个(已用时间是 null )
//向joinedRequestBuffer 请求队列里的所有peers发送请求。
if ( (joinedDownloadMessageBuffer.size() == 0) &&
(taskInfo.getTaskUsedTime() != null) ){
Vector list = new Vector(5);
String request_ = null;
boolean bb = false; //至少有一个成功了吗?,假设有一个成功"为假"
int t = 0;
boolean bbb[] = taskInfo.getTaskFinishedPieces();
for ( int i=0;i<bbb.length;i++){
if ( !bbb[i] ) ++t;
}
//如果没有要下载的文件块了,退出。
if ( t ==0 ) break;
for ( int i=0;i<joinedRequestBuffer.size();i++){
while ( true ){
if ( !joinedManagerControl.searchJoinedManagerState() ) { try {this.finalize(); }catch (Throwable tttt) {} }
boolean exist = false; //假设当前没有与此次申请的"数据块"相同的"数据块"
request_ = this.getRandomDataIndex();
for ( int j=0;j<list.size();j++){
//条件为真,设置f 为真,表示有一样的
if ( ((String)list.get(j)).equals(request_) )
exist = true;
}
if ( !exist ) break;
if ( list.size() == t ) {
if ( t == 1 )
try {System.out.println("核心区开始请求最后一个");Thread.sleep(2 * 1000);break;} catch(Exception e) {};
System.out.println("Request队列中的个数(n+4)>没下载的文件块个数(n)");
break label;
}
}
list.addElement(request_);
if ( this.sendRequest(request_ , i) ) bb = true;
else --i;
}
//如果发送的是最后一个请求,稍等片刻以便收到响应,避免连续发送最后的请求。
// if ( t == 1 ) try {System.out.println("发送了最后一个请求");Thread.sleep(2 * 1000);continue;} catch(Exception e) {};
if ( !bb )
try {
if ( joinedRequestBuffer.size() != 0 )
continue;
System.out.println("没有可请求的Peer,5 秒钟后再尝试连接。");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "没有可请求的Peer,5 秒钟后再尝试连接。";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
jdfrl.find_More();
Thread.sleep(5 * 1000);
continue;
} catch(Exception e) {};
list.removeAllElements();
}
inmsg = (Message)joinedDownloadMessageBuffer.getMessage( 0 );
if ( inmsg == null ) {
jdfrl.find_More();
continue;
}//说明是被WakeUp唤醒。
me = inmsg.getMessageElement("Type");
is = me.getStream();
br = new BufferedReader(new InputStreamReader(is) );
messageType = br.readLine();
//确保从缓冲区取出的Message,一定是"请求"
if ( messageType.equals("Req") ) {
joinedDownloadMessageBuffer.remove(inmsg);
System.out.println("join 放弃了一个消息");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 放弃了一个消息。";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
continue;
}
//从消息中读取Block,看看是传回来的哪个块
me = inmsg.getMessageElement("Block");
is = me.getStream();
br = new BufferedReader(new InputStreamReader(is) );
block = br.readLine();
//从消息中读取具体数据Data
me = inmsg.getMessageElement("Data");
fileData = me.getBytes(true);
//如果返回的是元文件
if (block.equals("0")) {
String s = taskInfo.getTaskGroupID(); //临时文件名,以组ID命名
//写临时文件
new File("riverfile").mkdir();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -