📄 rdtunit.java
字号:
package org.tuna.net.rdt;
import java.io.*;
import java.net.*;
import java.util.*;
import java.awt.event.*;
/**
* RDT传输单元,在传输结构中类似于TCP传输中的Socket类,
* 可单独同远程主机进程通信,可独立应用于客户端,
* 而在服务器端,则模拟由欢迎套接字产生连接套接字的进程
* 由服务端接收连接请求,进而生成一个RDT单元,与特定的
* 客户端RDT通信。
*/
public class RdtUnit implements DataSort
{
String sort; //应用属性为客户端或服务端 InetAddress addr; //
int port; //
int localPort;
DatagramSocket sock; //
ReceiveThread receiveThread; //接收数据的线程,可将收到的数据按类型定向至不同的处理过程
SendWindow sendWindow;
DataBuffer receiveBuffer; //存储接收数据的缓存
RcvWindow rcvWindow;
boolean blockFlag = false;
/**
* 定义一个同指定地址和端口的远端通信的RDT传输单元
*
* @param a 远程地址
* @param p 远程端口
*/
public RdtUnit(InetAddress a, int p) throws Exception
{
sort = "client";
addr = a;
port = p;
sock = new DatagramSocket();
localPort = sock.getLocalPort();
}
/**
* 定义一个同指定地址和端口的远端通信的RDT传输单元
* 并将此RDT传输单元绑定到本地端口
*
* @param a 远程地址
* @param p 远程端口
*/
public RdtUnit(InetAddress a, int p, int sp) throws Exception
{
sort = "client";
addr = a;
port = p;
sock = new DatagramSocket(sp);
localPort = sp;
}
/**
* 由欢迎套接字生成传输单元使用
* 由RdtUnitManager调用,对外不可见
*
* @param a 远程地址
* @param p 远程端口
* @param s 用于通信的UDP socket
* @param clt 客户端分组的起始序号
* @param srv 服务端分组的起始序号
*/
RdtUnit(InetAddress a, int p, DatagramSocket s)
{
sort = "server";
addr = a;
port = p;
sock = s;
//------- 初始化数据发送部分 -----------
receiveThread = new ReceiveThread(this);
sendWindow = new SendWindow(this);
receiveBuffer = new DataBuffer();
rcvWindow = new RcvWindow(this);
receiveThread.start();
}
// ------ Public Methods ------
/**
* 关闭此传输单元,释放资源
*/
public void close() throws Exception
{
//向对方发送关闭连接请求
byte[] data = new byte[] {PKT_SHUT};
RdtPacket rp = new RdtPacket((int)PKT_SHUT, -1, data);
byte[] msg = rp.getPostBytes();
DatagramPacket pkt = new DatagramPacket(msg, msg.length, this.addr, this.port);
try{
this.sock.send(pkt);
}
catch(Exception e){e.printStackTrace();}
stopBlock();
receiveThread.tstop();
byte[] end = new byte[] {-1};
DatagramPacket endpkt = new DatagramPacket(end, end.length, InetAddress.getLocalHost(), localPort);
this.sock.send(endpkt);
sendWindow.destroy();
rcvWindow.destroy();
if ( sort.equals("client") )
sock.close();
}
/**
* 同远程主机进程建立连接
* @return 若成功建立连接,返回true,否则返回false
*/
public boolean connect() throws Exception
{
if (sort.equals("server")) return false;
//随机生成一个128以内的开始序号
byte[] sh1 = new byte[] {PKT_SH_1};
byte[] sh2 = new byte[2];
byte[] sh3 = new byte[] {PKT_SH_3};
try{
DatagramPacket sndpkt1 = new DatagramPacket(sh1, sh1.length, addr, port);
DatagramPacket rcvpkt = new DatagramPacket(sh2, sh2.length);
DatagramPacket sndpkt3 = new DatagramPacket(sh3, sh3.length, addr, port);
//sock.setSoTimeout(500);
sock.send(sndpkt1); //第一次握手
sock.receive(rcvpkt); //第二次握手
if ( !rcvpkt.getAddress().getHostAddress().equals(addr.getHostAddress())
|| rcvpkt.getPort() != port || sh2[0] != PKT_SH_2 ){
//第二次握手若来自不同地址,则连接失败
return false;
}
else{
//进行一系列初始化
//------- 初始化数据发送部分 -----------
receiveThread = new ReceiveThread(this);
sendWindow = new SendWindow(this);
//------- 初始化数据接收部分 -----------
rcvWindow = new RcvWindow(this);
receiveBuffer = new DataBuffer();
}
sock.send(sndpkt3); //第三次握手
receiveThread.start(); //启动接收线程,侦听服务器发来的信息
//sock.setSoTimeout(0);
}
catch(SocketTimeoutException ste){
return false;
}
catch(Exception e){
e.printStackTrace();
return false;
}
return true;
}
/**
* 得到与之通信的远程主机地址
* @return 远程主机地址
*/
public InetAddress getAddress()
{
return addr;
}
/**
* 得到可以读取的字节数
*/
public int getAvailable()
{
return receiveBuffer.getUsed();
}
/**
* 得到与之通信的远程主机端口
* @return 远程主机端口
*/
public int getPort()
{
return port;
}
/**
* 将缓冲区中指定数量的字节读出来
*/
public byte[] receive(int len)
{
while(receiveBuffer.getUsed() < len && !blockFlag){
try{
Thread.sleep(10);
}
catch(Exception e) {}
}
return receiveBuffer.read(len);
}
/**
* 将提交的字节数组发送出去
* @param data 待发送的字节数组
*/
public void send(byte[] data)
{
int pos = 0;
while(pos < data.length){
int len = 512 < (data.length - pos) ? 512 : (data.length - pos);
byte[] snd = new byte[len];
System.arraycopy(data, pos, snd, 0, len);
pos += len;
try{
while( !sendWindow.send(snd) && !blockFlag ){
Thread.sleep(100);
}
}
catch(Exception e) { e.printStackTrace(); }
}
int wait = 1;
do{ //一直等发送完毕再返回
try{
Thread.sleep(wait);
if (wait < 128)
wait = wait * 2;
}
catch(Exception ex) {}
}while( !sendWindow.isEmpty() && !blockFlag );
}
// ------ ------
void stopBlock()
{
blockFlag = true; //使发送、接收等可能出现等待的方法返回
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -