📄 handler.java
字号:
package net;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.util.*;
/**
* 通信处理器,负责对一个客户的收发进行管理<br>
* 他在反应器Reactor 接收accept到一个连接时,产生一个此类的实例<br>
*
* 此包中的send()和receive()以及,两个内部实现方法不用加synchronized同步,因为Vector本身是线程安全的<br>
* 在原send()方法中,原来设计为当需要加入OP_WRITE项,但发现在对interest进行操作时,会对调用send方法的线程造成阻塞<br>
* 接收/发送的网络的数据格式为:<br>
*
* ++++ ++++++++++... ++++ +++++++++++...<br>
* 4B包长度 包内容 4B包长度 包内容
*
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2006</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
final public class Handler
implements Runnable {
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
//字节缓冲器
static public final int RCV_BUF_SIZE = 5; //此类的设计中,此两个缓冲的尺寸必须>=4
static public final int SND_BUF_SIZE = 5;
private ByteBuffer inputBuffer = ByteBuffer.allocate(RCV_BUF_SIZE);
private ByteBuffer outputBuffer = ByteBuffer.allocate(SND_BUF_SIZE);
//发送和接收用包的池
static public final int POOL_SIZE = 10;
private Vector rPool = new Vector(POOL_SIZE);
private Vector sPool = new Vector(POOL_SIZE);
//此连接是否仍有效
public boolean exit = false;
/**
* 构造方法
* @param sel Selector
* @param sc SocketChannel
* @throws IOException
*/
public Handler(Selector sel, SocketChannel sc) throws IOException {
socketChannel = sc;
selectionKey = socketChannel.register(sel, 0); //重新将这个socketChannel注册成read
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //防止主线程的select()方法阻塞
sel.wakeup();
}
/**
* 反应器回调接口
*/
public void run() {
try {
if (exit) {
close();
return;
}
if (selectionKey.isReadable()) {
receiveImpl();
}
if (selectionKey.isWritable()) {
sendImpl();
}
}
catch (Exception e) {
close();
// e.printStackTrace();
}
}
/**
* 发送,用户只需把包压入发送池里
*
* @param buf int
*/
public final void send(byte[] buf) {
if (selectionKey.isValid()) {
sPool.addElement(buf);
//标注为可发送
//注意:这种方法当selector在进行某个select操作时,对interest的任何操作都会导致调用send方法的线程阻塞
//解决办法就是,把selectionKey 在一开始就加上OP_WRITER项,让系统每个周期进行轮询
//selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);//会被阻塞
}
}
/**
* 接收
* @return DataPkg
*/
public byte[] receive() {
byte[] tmpdp = null;
try {
if (rPool.size() > 0) {
tmpdp = (byte[]) rPool.firstElement();
rPool.removeElement(tmpdp);
//System.out.println("commID="+tmpdp.commID);
}
}
catch (Exception e) {
}
return tmpdp;
}
/**
* 关闭连接,清清后事
*/
public void close() {
exit = true; //做好标志
selectionKey.cancel(); //取消掉键
}
//----------------------------------------------------------------------------
// 内部实现方法
//----------------------------------------------------------------------------
//---------------------------- 收实现 ---------------------------------------
/**
* 实现接收过程(包层级)
*
* 此方法复杂地方在于,可能是多次接收后,才能组装成一个DataPkg
* 这里通过向一个二进制数组流写数据来实现
*
* @throws IOException
*/
private void receiveImpl() throws IOException {
//用inPkgBytes来判断,上一个包是否完接收完整,如果为null,则说明上个包已收完整
//如果不为null,则说明上个包还有部分数据未到达
inputIsComplete();
getNextPkg();
}
/**
* 缓冲层面的读操作
*
* @throws IOException
*/
private void inputIsComplete() throws IOException {
if (socketChannel.read(inputBuffer) < 0) {
exit = true;
}
}
/**
* 用于存放大于缓冲才能收到的包,即需要多次接收
*/
private byte[] inPkgBytes = null;
private int inPkgBytesPosition = -1; //包计数器
/**
* 从缓冲区里返回下一个包,支持大于缓冲区的包
* <br>
*
* @throws IOException
*/
private void getNextPkg() throws IOException {
//准备get
inputBuffer.flip();
while (inputBuffer.remaining() >= 4 //循环接收包,4为一个整型,表示包长度
|| (inPkgBytesPosition >= 0 && inputBuffer.hasRemaining()) //如果上一个包未接收完成时,继续接收
) {
//如果上个包已收完整,则创建新的包
if (inPkgBytesPosition == -1) {
int pkgLen = inputBuffer.getInt(); //得到下一个包的长度
//初始化数组
inPkgBytes = new byte[pkgLen];
inPkgBytesPosition = 0;
}
int need = inPkgBytes.length - inPkgBytesPosition;
if (inputBuffer.remaining() >= need) {
inputBuffer.get(inPkgBytes, inPkgBytesPosition, need); //复制缓冲区中的数据到tb中
//处理接收到一个完整的包数据后,把包添加到池中
rPool.addElement(inPkgBytes);
//System.out.println("------------- Rcv -------------\n" + new String(inPkgBytes));
inPkgBytes = null;
inPkgBytesPosition = -1;
}
else {
//如果剩下的字节数,不够一个包则
int remainBytes = inputBuffer.remaining();
inputBuffer.get(inPkgBytes, inPkgBytesPosition, remainBytes);
inPkgBytesPosition += remainBytes;
}
}
//重新整理包位置
//把缓冲区剩下的小于四个字节的内容放在缓冲区首部,以便下个周期连续接收
if (inputBuffer.remaining() > 0) {
byte[] tb = new byte[inputBuffer.remaining()];
inputBuffer.get(tb);
inputBuffer.clear();
inputBuffer.put(tb);
}
else {
inputBuffer.clear();
}
}
//---------------------------- 发实现 ---------------------------------------
/**
* 实现发送过程(包层级)
*
* 此方法可能需要多次写缓冲才能把一个大包发出去
*
* @throws IOException
*/
private void sendImpl() throws IOException {
//发送池中的所有包
for (; sPool.size() > 0; ) {
byte[] tmpb = (byte[]) sPool.firstElement();
sPool.removeElement(tmpb);
putNextPkg(tmpb);
}
//发送完成后,把写焦点移除
//因为会出现阻塞问题,所以要rem这行
//selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}
/**
* 缓冲层面的写操作,非阴塞模式时,此方法可能不会完全写入缓冲区的数据
*
* @throws IOException
*/
private void outputIsComplete() throws IOException {
//check outputstream
while (outputBuffer.hasRemaining()) {
socketChannel.write(outputBuffer);
}
}
/**
* 把下一个包发到网络上去,支持大于缓冲区的包
*
* @param tmpb byte[]
* @throws IOException
*/
private void putNextPkg(byte[] tmpb) throws IOException {
int outPkgBytesPosition = 0;
//当tmpb还有数据未发完时
while (outPkgBytesPosition < tmpb.length) {
//如果此包第一次写向缓冲区时
if (outPkgBytesPosition == 0) {
outputBuffer.putInt(tmpb.length); //包长度
}
//计算包与缓冲区的差异
int arrRemainBytes = tmpb.length - outPkgBytesPosition;
int bufRemainBytes = outputBuffer.remaining();
if (arrRemainBytes > bufRemainBytes) { //如果需发送的数据比缓冲大
outputBuffer.put(tmpb, outPkgBytesPosition, bufRemainBytes);
outPkgBytesPosition += bufRemainBytes;
}
else {
//如果需发送的数据比缓冲小
outputBuffer.put(tmpb, outPkgBytesPosition, arrRemainBytes); //包内容
outPkgBytesPosition += arrRemainBytes;
}
outputBuffer.flip();
outputIsComplete(); //发送
outputBuffer.clear(); //清除发缓冲
}
//System.out.println("------------- Snt -------------\n" + new String(tmpb));
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -