📄 filesender.java
字号:
/*
* LumaQQ - Java QQ Client
*
* Copyright (C) 2004 luma <stubma@163.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package edu.tsinghua.lumaqq.qq.filetrans;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import edu.tsinghua.lumaqq.qq.QQ;
import edu.tsinghua.lumaqq.qq.QQClient;
import edu.tsinghua.lumaqq.qq.Utils;
/**
* <pre>
* 文件发送类
* </pre>
*
* @author 马若劼
*/
public class FileSender extends FileWatcher implements Runnable {
private DatagramChannel dc;
private DatagramChannel dcLocal;
private DatagramChannel dcDirect;
// 分片缓冲区
protected FragmentBuffer fb;
// 包处理器
private FileSenderPacketProcessor processor;
// heart beat线程
protected HeartBeatThread hbThread;
// 是否暂停发送,如果heart beat长久得不到回应,将暂停发送
private boolean suspend;
// 当前文件数据信息包的顺序号
private char packetSN;
/**
* @param client
*/
public FileSender(QQClient client) {
super(client);
processor = new FileSenderPacketProcessor(this);
suspend = false;
packetSN = 0;
}
/**
* 打开本地文件准备读
* @return true表示成功,false表示失败
*/
public boolean openLocalFile() {
if(localFile != null) return true;
try {
localFile = new RandomAccessFile(localFileName, "rw");
// 如果我是发送者,附加计算一下文件和文件名的MD5
fileMD5 = Utils.getFileMD5(localFile);
fileNameMD5 = Utils.doMD5(Utils.getBytes(fileName, QQ.QQ_CHARSET_DEFAULT));
return true;
} catch (Exception e) {
return false;
}
}
/* (non-Javadoc)
* @see edu.tsinghua.lumaqq.qq.filetrans.FileWatcher#shutdown()
*/
public void shutdown() {
if(localFile != null) {
try {
localFile.close();
} catch (IOException e) {
}
}
if(hbThread != null)
hbThread.setStop(true);
shutdown = true;
if(selector != null)
selector.wakeup();
}
/* (non-Javadoc)
* @see edu.tsinghua.lumaqq.qq.filetrans.FileWatcher#abort()
*/
public void abort() {
if(fileTransferStatus == FT_NONE)
return;
else if(fileTransferStatus == FT_NEGOTIATING)
client.cancelSendFile(hisQQ, sessionSequence);
else
sendFinish(fdp, buffer);
shutdown();
fireFileAbortedEvent();
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while(true) {
try {
int n = selector.select();
if(n > 0)
processSelectedKeys();
if (shutdown) {
selector.close();
if(useUdp) {
if(dcLocal != null && dcLocal.isOpen()) dcLocal.close();
if(dcDirect != null && dcDirect.isOpen()) dcDirect.close();
}
log.debug("文件守望者正常退出,Session Sequence: " + (int)sessionSequence);
return;
}
} catch (IOException e) {
}
}
}
/**
* 处理Key事件
*/
private void processSelectedKeys() {
for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) {
// 得到下一个Key
SelectionKey sk = (SelectionKey)i.next();
i.remove();
// 交给processor处理
try {
// 如果当前处理协商阶段,那么我们期望收到0x003C命令,所以特殊处理
// 如果不是,说明连接已经建立,正在传送中,对于不用来传送文件的
// 那条链路我们释放掉
if(fileTransferStatus == FileWatcher.FT_NEGOTIATING)
processNegotiation(sk);
else {
Boolean b = (Boolean)sk.attachment();
DatagramChannel channel = (DatagramChannel)sk.channel();
if(b.booleanValue() == direct) {
buffer.clear();
channel.read(buffer);
} else {
sk.cancel();
channel.close();
return;
}
processor.processSelectedKeys(buffer);
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
/**
* @param sk
* @throws IOException
*/
private void processNegotiation(SelectionKey sk) throws Exception {
// 得到channel,读取数据
buffer.clear();
DatagramChannel channel = (DatagramChannel)sk.channel();
InetSocketAddress address = (InetSocketAddress)channel.receive(buffer);
buffer.flip();
// 检查是否控制信息包
if(buffer.get() == QQ.QQ_FILE_CONTROL_PACKET_TAG) {
buffer.rewind();
fcp.parseContent(buffer);
// 检查是否重复包
if(!monitor.checkDuplicate(fcp)) {
switch(fcp.getCommand()) {
case QQ.QQ_FILE_CMD_NOTIFY_IP_ACK:
log.debug("收到Notify IP Ack, 对方真实IP: " + Utils.getIpStringFromBytes(fcp.getLocalIp()) + " 第二个端口为" + fcp.getLocalPort());
// 得到对方的本地ip和端口
hisLocalIp = fcp.getLocalIp();
hisLocalPort = fcp.getLocalPort();
// 检查双方网络处境
checkCondition();
if(condition == QQ.QQ_SAME_LAN || condition == QQ.QQ_NONE_BEHIND_FIREWALL || condition == QQ.QQ_I_AM_BEHIND_FIREWALL) {
/*
* 1. 如果双方位于同一个局域网
* 2. 或者我们都有固定的IP,且都不在防火墙后
* 3. 或者我在防火墙后,对方不在
* 这些情况在收到Ack后都可以直接开始连接
*/
establishConnection(sk, channel, address);
} else if(condition == QQ.QQ_HE_IS_BEHIND_FIREWALL) {
/*
* 现在我是发送方,而对方却在防火墙内,这个时候我需要发一个包
* 通知他主动连接我
*/
client.pleaseConnectMe(hisQQ, myDirectPort, sessionSequence);
}
break;
case QQ.QQ_FILE_CMD_NOTIFY_NAT_PORT:
log.debug("对方收到通知,已经开始主动连接我");
establishConnection(sk, channel, address);
break;
case QQ.QQ_FILE_CMD_TEST_CONNECTION_ACK:
log.debug("收到Init Connection Ack");
// 仅当我在防火墙后时,理会这个Ack
if(!Utils.isIpEquals(myInternetIp, myLocalIp)) {
condition = QQ.QQ_I_AM_BEHIND_FIREWALL;
establishConnection(sk, channel, address);
}
break;
}
}
}
}
/**
* 往对方直接端口发送Init Connection包
*/
public void sendInitConnectionToDirect() {
InetSocketAddress address = new InetSocketAddress(Utils.getIpStringFromBytes(hisInternetIp), hisDirectPort);
fcp.setCommand(QQ.QQ_FILE_CMD_TEST_CONNECTION);
fcp.initContent(buffer);
try {
buffer.flip();
dcDirect.send(buffer, address);
buffer.rewind();
dcDirect.send(buffer, address);
} catch (IOException e) {
}
log.debug("Init Connection 已发送");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -