joinedupload.java

来自「一个基于NetBeans平台开发的」· Java 代码 · 共 298 行

JAVA
298
字号
/**这个类是专门从事join任务上传数据的工作,<br>
 *它是在首先启动join任务下载工作后,才得以工作,<br>
 *因为只有这时,JoinUploadBuffer缓冲区才会有数据出现。<br>
 */

package com.sinpool.rivercrescent.Task;

import net.jxta.endpoint.*;
import net.jxta.pipe.*;
import net.jxta.document.*;
import net.jxta.protocol.*;

import java.io.*;
import java.util.*;
import java.security.*;

import com.sinpool.rivercrescent.*;
import com.sinpool.rivercrescent.myutil.MyUtil;
import com.sinpool.rivercrescent.io.*;
import com.sinpool.rivercrescent.BufferZone.*;

public class JoinedUpload extends Thread{
	private JoinedUploadMessageBuffer joinedUploadMessageBuffer = null;
	private JoinedManagerControl joinedManagerControl = null;

	private MimeMediaType mimetype = new MimeMediaType("text","xml");
	private Message inmsg = new Message();
	private Message outmsg = null;
	private PipeAdvertisement pipeAdv = null;
	private MessageElement me = null;
	
	private TaskInfo taskInfo = null;
	private OutputPipe outputPipe = null;
	private int meeageBufferIndex = 0;
	private String block = null;

	//多文件时,文件块的长度,即pl
	private int filePieceLength = 0;
        //实际读到到字节数
        private int readFileDataLength = 0;
        
        private String[] thisStatus = new String[2];
        
	/**构造函数:
	 *@param joinedUploadMessageBuffer 这是与joinTaskListener公用的缓冲区,<br>
	 *									此对象是这个缓冲区的消费者。<br>
	 *
	 *@param ti 此任务的TaskInfo对象。<br>
	 *@param joinedManagerControl 控制此线程工作的开关,可以由joinTaskListener关闭此线程。<br>
	 */
	public JoinedUpload(JoinedUploadMessageBuffer joinedUploadMessageBuffer,TaskInfo ti,JoinedManagerControl joinedManagerControl){
		this.joinedUploadMessageBuffer = joinedUploadMessageBuffer;
		this.taskInfo = ti;
		this.joinedManagerControl = joinedManagerControl;
                this.filePieceLength = Integer.valueOf(taskInfo.getTaskPieceLength()).intValue();
	}
	
	public void run(){
		String messageType = null;
		long startTime = 0,endTime = 0;
		Date date = new Date();
		byte[] fileData;					//根据消息请求,从现有文件读取的、用于返回的数据
		BufferedReader br = null;
		InputStream is = null;
                
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "上传任务开始!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);

		while ( joinedManagerControl.searchJoinedManagerState() ){
			try{
				inmsg = (Message)joinedUploadMessageBuffer.getMessage(meeageBufferIndex++);
				
				me = inmsg.getMessageElement("Type");
				is = me.getStream();
				br = new BufferedReader(new InputStreamReader(is) );
				messageType = br.readLine();
				
				//确保从缓冲区取出的Message,一定是"请求"
				if ( messageType.equals("Res") ) {
                                    joinedUploadMessageBuffer.remove(inmsg);
                                    meeageBufferIndex = 0;
System.out.println("join 放弃了一个消息");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 放弃了一个消息!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
                                    continue;
                                }
System.out.println("join upload 开始处理请求");
				me = inmsg.getMessageElement("PipeAdv");
				is = me.getStream();
				pipeAdv = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(mimetype,is);
				outputPipe = ((PipeService)taskInfo.getTaskPipes()).createOutputPipe(pipeAdv,5000);
				
				/**如果这个消息的管道没有连接上,
				  *
				  *如果索引值等于消息队列的大小时,
				  *meeageBufferIndex索引回0,
				  *从头读取消息。
				  *
				  */
				if (outputPipe == null)	{
					if (meeageBufferIndex == joinedUploadMessageBuffer.size() ){
						meeageBufferIndex = 0;
					}
					continue;
				}
	 			
				me = inmsg.getMessageElement("Block");
				is = me.getStream();
                                br = new BufferedReader(new InputStreamReader(is) );
				block = br.readLine();
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "收到了第" +  block + "块的请求。";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
				//读取请求的文件数据。
				try{
					fileData = this.getFileData(block);
				}catch (IOException ioe) {
                                    System.out.println("没能打开源文件!");
                                    ioe.printStackTrace();
                                    joinedUploadMessageBuffer.remove(inmsg);
                                    meeageBufferIndex = 0;
                                    continue;
                                }
				
				//如果block(请求的文件块)不是"0",
				//要进行SHA-1校验,
				//从而判断自己是否存在这部分数据。
				if ( !block.equals("0") ){
					MessageDigest sha;
					
					try{
						sha = MessageDigest.getInstance("sha-1");
					}catch (NoSuchAlgorithmException e) {System.out.println("JoinedUpload,,Error! 无效编码NoSuchAlgorithm!");return;}
					byte[] hash = new byte[20];

					sha.update(fileData);
					hash = sha.digest();
					
					Hashtable ht = taskInfo.getTaskDictionary();
					Byte[] pieces = (Byte[])ht.get("piece");

					boolean isFileBlockExist = true;
					for (int i=0;i<20;i++) {
						if ( hash[i] != pieces[(Integer.valueOf(block).intValue()-1)*20+i].byteValue() ){
							isFileBlockExist = false;
							break;
						}
					}
					if ( !isFileBlockExist ) {
                                            joinedUploadMessageBuffer.remove(inmsg);
                                            meeageBufferIndex = 0;
System.out.println("没有请求的块,不能提供上传服务,¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 没有请求的第" +  block + "块,不能提供上传服务!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
                                            continue;
                                        }
				}
				
				//下面是封装数据
				outmsg = new Message();
				//第一个数据信息,
				//管道通告。
				InputStreamMessageElement outMsgPipeElement = new InputStreamMessageElement("PipeAdv",mimetype,taskInfo.getTaskPipeadv().getDocument(mimetype).getStream(),null);
				outmsg.addMessageElement(outMsgPipeElement);
				
				//第二个数据信息,
				//类型
				ByteArrayMessageElement outMsgTypeElement = new ByteArrayMessageElement("Type",MimeMediaType.XMLUTF8,"Res".getBytes(),null);
				outmsg.addMessageElement(outMsgTypeElement);
				
				//第三个数据信息,
				//块数
				ByteArrayMessageElement outMsgBlockElement = new ByteArrayMessageElement("Block",MimeMediaType.XMLUTF8,block.getBytes(),null);
				outmsg.addMessageElement(outMsgBlockElement);
				
				//第四个数据信息,
				//具体的数据。
				ByteArrayInputStream dataPiece = new ByteArrayInputStream(fileData);
				InputStreamMessageElement outMsgDataElement = new InputStreamMessageElement("Data",mimetype,dataPiece,null);
				outmsg.addMessageElement(outMsgDataElement);
				
				startTime = date.getTime();
				boolean b = outputPipe.send(outmsg);
				if (b) {
					endTime = date.getTime();
					int time = (int)((endTime - startTime)/1000);
					if (time == 0){
                                            if ( readFileDataLength < filePieceLength )
                                                taskInfo.setTaskUploadSpeed(String.valueOf( readFileDataLength ) );
                                            else
						taskInfo.setTaskUploadSpeed(String.valueOf(filePieceLength));
                                        }
					else
						taskInfo.setTaskUploadSpeed(String.valueOf(filePieceLength/(int)(time)));
				}
                                
				joinedUploadMessageBuffer.remove(inmsg);
				meeageBufferIndex = 0;
				outputPipe.close();				//最后要察看是否可以关闭它//////////////
                                taskInfo.taskUpdata();
			}catch (IOException ioe ) {
System.out.println("一个 Peer 没有被连接,放弃与它的连接");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "一个 Peer 没有被连接,放弃与它的连接!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
                            joinedUploadMessageBuffer.remove(inmsg);
                            meeageBufferIndex = 0;
                            continue;
                        }
System.out.println("join 任务上传了第"+block+"块的响应.");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join 任务上传了第"+block+"块的响应.!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
			//解决CPU占用率过高的问题。
			try{Thread.sleep(5);} catch (Exception e) {}
		}
System.out.println("join upload 任务停止!");
thisStatus[0] = MyUtil.getCurrentTime();
thisStatus[1] = "join upload 任务停止!!";
taskInfo.getTaskStatusTableModel().addRow(thisStatus);
	}

	/**b是"0",说明请求元文件;
	 *b从"1"开始,说明请求源文件;
	 *而在文件的索引中以"0"开始,
	 *所以要"--filePiece",
	 *使得"1"变成文件的所以"0",
	 *而"0-1 = -1",当filePiece为"-1"时,
	 *说明请求元文件。
	 *
	 */
	private byte[] getFileData(String b) throws IOException{
		File file = null;
		BufferedRandomAccessFile raf = null;
		FileInputStream fis = null;
		
		int filePiece = Integer.valueOf(b).intValue();
		--filePiece;
		byte[] data = null;
		
		//请求元文件
		if (filePiece == -1){
			file = new File((String)taskInfo.getTaskRiverscentFileName());
			fis = new FileInputStream(file);
			data = new byte[(int)file.length()];
			fis.read(data);
                        
                        fis.close();
		}
		//请求数据文件
		//需要判断是单文件情况还是多文件情况
		else {
                    if(taskInfo.getTaskRiverFileType().equals("single")){
                        //下载到100%后,从“文件名”中读取数据.
                        if ( taskInfo.getTaskPercentage().equals("100.0%") )
                            file = new File(taskInfo.getTaskFileName());
                        //下载没有到100%时,需要从“文件名.rs”中读取数据.
                        else
                            file = new File(taskInfo.getTaskFileName() + ".rs");
                        
                                raf = new BufferedRandomAccessFile(file,"r");
                                data = new byte[filePieceLength];
				raf.seek((long)(filePiece*filePieceLength));
				readFileDataLength = raf.read(data);
                                
                                raf.close();
			}else{
				String temp = taskInfo.getFileNameOfMultipleFromPiece(b);
                                
                                //下载到100%后,从“文件名”中读取数据.
                                if ( taskInfo.getTaskPercentage().equals("100.0%") )
                                    file = new File(temp);
                                 //下载没有到100%时,需要从“文件名.rs”中读取数据.
                                else
                                    file = new File(temp + ".rs");
                                
				raf = new BufferedRandomAccessFile(file,"r");
				data = new byte[filePieceLength];
				int pieceindex = Integer.valueOf(taskInfo.getFileDataIndex()).intValue();
				raf.seek( (long)(pieceindex * filePieceLength) );
				readFileDataLength = raf.read(data);
                                
                                raf.close();
			}
                    //如果读到了最后一个不足"文件块"长度的字节数组,使用这个方式返回。
                    if ( readFileDataLength < filePieceLength ){
                        byte[] lastData = new byte[readFileDataLength];
                        for ( int i=0;i<readFileDataLength;i++)
                            lastData[i] = data[i];
                        return lastData;
                    }
                }
		return data;
	}
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?