joinedupload.java
来自「一个基于NetBeans平台开发的」· Java 代码 · 共 298 行
JAVA
298 行
/**这个类是专门从事join任务上传数据的工作,<br>
*它是在首先启动join任务下载工作后,才得以工作,<br>
*因为只有这时,JoinUploadBuffer缓冲区才会有数据出现。<br>
*/
package com.sinpool.rivercrescent.Task;
import net.jxta.endpoint.*;
import net.jxta.pipe.*;
import net.jxta.document.*;
import net.jxta.protocol.*;
import java.io.*;
import java.util.*;
import java.security.*;
import com.sinpool.rivercrescent.*;
import com.sinpool.rivercrescent.myutil.MyUtil;
import com.sinpool.rivercrescent.io.*;
import com.sinpool.rivercrescent.BufferZone.*;
public class JoinedUpload extends Thread{
private JoinedUploadMessageBuffer joinedUploadMessageBuffer = null;
private JoinedManagerControl joinedManagerControl = null;
private MimeMediaType mimetype = new MimeMediaType("text","xml");
private Message inmsg = new Message();
private Message outmsg = null;
private PipeAdvertisement pipeAdv = null;
private MessageElement me = null;
private TaskInfo taskInfo = null;
private OutputPipe outputPipe = null;
private int meeageBufferIndex = 0;
private String block = null;
//多文件时,文件块的长度,即pl
private int filePieceLength = 0;
//实际读到到字节数
private int readFileDataLength = 0;
private String[] thisStatus = new String[2];
/**构造函数:
*@param joinedUploadMessageBuffer 这是与joinTaskListener公用的缓冲区,<br>
* 此对象是这个缓冲区的消费者。<br>
*
*@param ti 此任务的TaskInfo对象。<br>
*@param joinedManagerControl 控制此线程工作的开关,可以由joinTaskListener关闭此线程。<br>
*/
public JoinedUpload(JoinedUploadMessageBuffer joinedUploadMessageBuffer,TaskInfo ti,JoinedManagerControl joinedManagerControl){
this.joinedUploadMessageBuffer = joinedUploadMessageBuffer;
this.taskInfo = ti;
this.joinedManagerControl = joinedManagerControl;
this.filePieceLength = Integer.valueOf(taskInfo.getTaskPieceLength()).intValue();
}
public void run(){
String messageType = null;
long startTime = 0,endTime = 0;
Date date = new Date();
byte[] fileData; //根据消息请求,从现有文件读取的、用于返回的数据
BufferedReader br = null;
InputStream is = null;
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "上传任务开始!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
while ( joinedManagerControl.searchJoinedManagerState() ){
try{
inmsg = (Message)joinedUploadMessageBuffer.getMessage(meeageBufferIndex++);
me = inmsg.getMessageElement("Type");
is = me.getStream();
br = new BufferedReader(new InputStreamReader(is) );
messageType = br.readLine();
//确保从缓冲区取出的Message,一定是"请求"
if ( messageType.equals("Res") ) {
joinedUploadMessageBuffer.remove(inmsg);
meeageBufferIndex = 0;
System.out.println("join 放弃了一个消息");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 放弃了一个消息!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
continue;
}
System.out.println("join upload 开始处理请求");
me = inmsg.getMessageElement("PipeAdv");
is = me.getStream();
pipeAdv = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(mimetype,is);
outputPipe = ((PipeService)taskInfo.getTaskPipes()).createOutputPipe(pipeAdv,5000);
/**如果这个消息的管道没有连接上,
*
*如果索引值等于消息队列的大小时,
*meeageBufferIndex索引回0,
*从头读取消息。
*
*/
if (outputPipe == null) {
if (meeageBufferIndex == joinedUploadMessageBuffer.size() ){
meeageBufferIndex = 0;
}
continue;
}
me = inmsg.getMessageElement("Block");
is = me.getStream();
br = new BufferedReader(new InputStreamReader(is) );
block = br.readLine();
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "收到了第" + block + "块的请求。";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
//读取请求的文件数据。
try{
fileData = this.getFileData(block);
}catch (IOException ioe) {
System.out.println("没能打开源文件!");
ioe.printStackTrace();
joinedUploadMessageBuffer.remove(inmsg);
meeageBufferIndex = 0;
continue;
}
//如果block(请求的文件块)不是"0",
//要进行SHA-1校验,
//从而判断自己是否存在这部分数据。
if ( !block.equals("0") ){
MessageDigest sha;
try{
sha = MessageDigest.getInstance("sha-1");
}catch (NoSuchAlgorithmException e) {System.out.println("JoinedUpload,,Error! 无效编码NoSuchAlgorithm!");return;}
byte[] hash = new byte[20];
sha.update(fileData);
hash = sha.digest();
Hashtable ht = taskInfo.getTaskDictionary();
Byte[] pieces = (Byte[])ht.get("piece");
boolean isFileBlockExist = true;
for (int i=0;i<20;i++) {
if ( hash[i] != pieces[(Integer.valueOf(block).intValue()-1)*20+i].byteValue() ){
isFileBlockExist = false;
break;
}
}
if ( !isFileBlockExist ) {
joinedUploadMessageBuffer.remove(inmsg);
meeageBufferIndex = 0;
System.out.println("没有请求的块,不能提供上传服务,¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 没有请求的第" + block + "块,不能提供上传服务!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
continue;
}
}
//下面是封装数据
outmsg = new Message();
//第一个数据信息,
//管道通告。
InputStreamMessageElement outMsgPipeElement = new InputStreamMessageElement("PipeAdv",mimetype,taskInfo.getTaskPipeadv().getDocument(mimetype).getStream(),null);
outmsg.addMessageElement(outMsgPipeElement);
//第二个数据信息,
//类型
ByteArrayMessageElement outMsgTypeElement = new ByteArrayMessageElement("Type",MimeMediaType.XMLUTF8,"Res".getBytes(),null);
outmsg.addMessageElement(outMsgTypeElement);
//第三个数据信息,
//块数
ByteArrayMessageElement outMsgBlockElement = new ByteArrayMessageElement("Block",MimeMediaType.XMLUTF8,block.getBytes(),null);
outmsg.addMessageElement(outMsgBlockElement);
//第四个数据信息,
//具体的数据。
ByteArrayInputStream dataPiece = new ByteArrayInputStream(fileData);
InputStreamMessageElement outMsgDataElement = new InputStreamMessageElement("Data",mimetype,dataPiece,null);
outmsg.addMessageElement(outMsgDataElement);
startTime = date.getTime();
boolean b = outputPipe.send(outmsg);
if (b) {
endTime = date.getTime();
int time = (int)((endTime - startTime)/1000);
if (time == 0){
if ( readFileDataLength < filePieceLength )
taskInfo.setTaskUploadSpeed(String.valueOf( readFileDataLength ) );
else
taskInfo.setTaskUploadSpeed(String.valueOf(filePieceLength));
}
else
taskInfo.setTaskUploadSpeed(String.valueOf(filePieceLength/(int)(time)));
}
joinedUploadMessageBuffer.remove(inmsg);
meeageBufferIndex = 0;
outputPipe.close(); //最后要察看是否可以关闭它//////////////
taskInfo.taskUpdata();
}catch (IOException ioe ) {
System.out.println("一个 Peer 没有被连接,放弃与它的连接");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "一个 Peer 没有被连接,放弃与它的连接!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
joinedUploadMessageBuffer.remove(inmsg);
meeageBufferIndex = 0;
continue;
}
System.out.println("join 任务上传了第"+block+"块的响应.");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 任务上传了第"+block+"块的响应.!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
//解决CPU占用率过高的问题。
try{Thread.sleep(5);} catch (Exception e) {}
}
System.out.println("join upload 任务停止!");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join upload 任务停止!!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
}
/**b是"0",说明请求元文件;
*b从"1"开始,说明请求源文件;
*而在文件的索引中以"0"开始,
*所以要"--filePiece",
*使得"1"变成文件的所以"0",
*而"0-1 = -1",当filePiece为"-1"时,
*说明请求元文件。
*
*/
private byte[] getFileData(String b) throws IOException{
File file = null;
BufferedRandomAccessFile raf = null;
FileInputStream fis = null;
int filePiece = Integer.valueOf(b).intValue();
--filePiece;
byte[] data = null;
//请求元文件
if (filePiece == -1){
file = new File((String)taskInfo.getTaskRiverscentFileName());
fis = new FileInputStream(file);
data = new byte[(int)file.length()];
fis.read(data);
fis.close();
}
//请求数据文件
//需要判断是单文件情况还是多文件情况
else {
if(taskInfo.getTaskRiverFileType().equals("single")){
//下载到100%后,从“文件名”中读取数据.
if ( taskInfo.getTaskPercentage().equals("100.0%") )
file = new File(taskInfo.getTaskFileName());
//下载没有到100%时,需要从“文件名.rs”中读取数据.
else
file = new File(taskInfo.getTaskFileName() + ".rs");
raf = new BufferedRandomAccessFile(file,"r");
data = new byte[filePieceLength];
raf.seek((long)(filePiece*filePieceLength));
readFileDataLength = raf.read(data);
raf.close();
}else{
String temp = taskInfo.getFileNameOfMultipleFromPiece(b);
//下载到100%后,从“文件名”中读取数据.
if ( taskInfo.getTaskPercentage().equals("100.0%") )
file = new File(temp);
//下载没有到100%时,需要从“文件名.rs”中读取数据.
else
file = new File(temp + ".rs");
raf = new BufferedRandomAccessFile(file,"r");
data = new byte[filePieceLength];
int pieceindex = Integer.valueOf(taskInfo.getFileDataIndex()).intValue();
raf.seek( (long)(pieceindex * filePieceLength) );
readFileDataLength = raf.read(data);
raf.close();
}
//如果读到了最后一个不足"文件块"长度的字节数组,使用这个方式返回。
if ( readFileDataLength < filePieceLength ){
byte[] lastData = new byte[readFileDataLength];
for ( int i=0;i<readFileDataLength;i++)
lastData[i] = data[i];
return lastData;
}
}
return data;
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?