⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 handler.java

📁 java nio 服务器范例及j2me代码连接服务器的测试代码。
💻 JAVA
字号:
package net;

import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.util.*;

/**
 * 通信处理器,负责对一个客户的收发进行管理<br>
 *  他在反应器Reactor 接收accept到一个连接时,产生一个此类的实例<br>
 *
 *  此包中的send()和receive()以及,两个内部实现方法不用加synchronized同步,因为Vector本身是线程安全的<br>
*  在原send()方法中,原来设计为当需要加入OP_WRITE项,但发现在对interest进行操作时,会对调用send方法的线程造成阻塞<br>
 * 接收/发送的网络的数据格式为:<br>
 *
 * ++++   ++++++++++...   ++++   +++++++++++...<br>
 * 4B包长度  包内容         4B包长度   包内容
 *
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2006</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */
final public class Handler
    implements Runnable {

  private final SocketChannel socketChannel;
  private final SelectionKey selectionKey;

  //字节缓冲器
  static public final int RCV_BUF_SIZE = 5; //此类的设计中,此两个缓冲的尺寸必须>=4
  static public final int SND_BUF_SIZE = 5;
  private ByteBuffer inputBuffer = ByteBuffer.allocate(RCV_BUF_SIZE);
  private ByteBuffer outputBuffer = ByteBuffer.allocate(SND_BUF_SIZE);

  //发送和接收用包的池
  static public final int POOL_SIZE = 10;
  private Vector rPool = new Vector(POOL_SIZE);
  private Vector sPool = new Vector(POOL_SIZE);

  //此连接是否仍有效
  public boolean exit = false;

  /**
   * 构造方法
   * @param sel Selector
   * @param sc SocketChannel
   * @throws IOException
   */
  public Handler(Selector sel, SocketChannel sc) throws IOException {
    socketChannel = sc;
    selectionKey = socketChannel.register(sel, 0); //重新将这个socketChannel注册成read

    selectionKey.attach(this);

    selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //防止主线程的select()方法阻塞

    sel.wakeup();
  }

  /**
   * 反应器回调接口
   */
  public void run() {
    try {
      if (exit) {
        close();
        return;
      }
      if (selectionKey.isReadable()) {
        receiveImpl();
      }
      if (selectionKey.isWritable()) {
        sendImpl();
      }
    }
    catch (Exception e) {
      close();
//      e.printStackTrace();
    }

  }

  /**
   * 发送,用户只需把包压入发送池里
   *
   * @param buf int
   */
  public final void send(byte[] buf) {
    if (selectionKey.isValid()) {
      sPool.addElement(buf);

      //标注为可发送
      //注意:这种方法当selector在进行某个select操作时,对interest的任何操作都会导致调用send方法的线程阻塞
      //解决办法就是,把selectionKey 在一开始就加上OP_WRITER项,让系统每个周期进行轮询
      //selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);//会被阻塞
    }

  }

  /**
   * 接收
   * @return DataPkg
   */
  public byte[] receive() {
    byte[] tmpdp = null;
    try {
      if (rPool.size() > 0) {
        tmpdp = (byte[]) rPool.firstElement();
        rPool.removeElement(tmpdp);
        //System.out.println("commID="+tmpdp.commID);
      }
    }
    catch (Exception e) {
    }
    return tmpdp;

  }

  /**
   * 关闭连接,清清后事
   */
  public void close() {
    exit = true; //做好标志
    selectionKey.cancel(); //取消掉键
  }

  //----------------------------------------------------------------------------
  //                      内部实现方法
  //----------------------------------------------------------------------------


  //----------------------------  收实现  ---------------------------------------
  /**
   * 实现接收过程(包层级)
   *
   * 此方法复杂地方在于,可能是多次接收后,才能组装成一个DataPkg
   * 这里通过向一个二进制数组流写数据来实现
   *
   * @throws IOException
   */

  private void receiveImpl() throws IOException {
    //用inPkgBytes来判断,上一个包是否完接收完整,如果为null,则说明上个包已收完整
    //如果不为null,则说明上个包还有部分数据未到达
    inputIsComplete();

    getNextPkg();

  }

  /**
   * 缓冲层面的读操作
   *
   * @throws IOException
   */
  private void inputIsComplete() throws IOException {
    if (socketChannel.read(inputBuffer) < 0) {
      exit = true;
    }
  }

  /**
   * 用于存放大于缓冲才能收到的包,即需要多次接收
   */
  private byte[] inPkgBytes = null;
  private int inPkgBytesPosition = -1; //包计数器

  /**
   * 从缓冲区里返回下一个包,支持大于缓冲区的包
   * <br>
   *
   * @throws IOException
   */
  private void getNextPkg() throws IOException {

    //准备get
    inputBuffer.flip();

    while (inputBuffer.remaining() >= 4 //循环接收包,4为一个整型,表示包长度
           || (inPkgBytesPosition >= 0 && inputBuffer.hasRemaining()) //如果上一个包未接收完成时,继续接收
           ) {
      //如果上个包已收完整,则创建新的包

      if (inPkgBytesPosition == -1) {

        int pkgLen = inputBuffer.getInt(); //得到下一个包的长度

        //初始化数组
        inPkgBytes = new byte[pkgLen];
        inPkgBytesPosition = 0;
      }
      int need = inPkgBytes.length - inPkgBytesPosition;
      if (inputBuffer.remaining() >= need) {
        inputBuffer.get(inPkgBytes, inPkgBytesPosition, need); //复制缓冲区中的数据到tb中

        //处理接收到一个完整的包数据后,把包添加到池中
        rPool.addElement(inPkgBytes);
        //System.out.println("------------- Rcv  -------------\n" + new String(inPkgBytes));
        inPkgBytes = null;
        inPkgBytesPosition = -1;
      }
      else {
        //如果剩下的字节数,不够一个包则
        int remainBytes = inputBuffer.remaining();
        inputBuffer.get(inPkgBytes, inPkgBytesPosition, remainBytes);
        inPkgBytesPosition += remainBytes;
      }
    }

    //重新整理包位置
    //把缓冲区剩下的小于四个字节的内容放在缓冲区首部,以便下个周期连续接收
    if (inputBuffer.remaining() > 0) {
      byte[] tb = new byte[inputBuffer.remaining()];
      inputBuffer.get(tb);
      inputBuffer.clear();
      inputBuffer.put(tb);
    }
    else {
      inputBuffer.clear();
    }

  }

  //----------------------------  发实现  ---------------------------------------
  /**
   * 实现发送过程(包层级)
   *
   * 此方法可能需要多次写缓冲才能把一个大包发出去
   *
   * @throws IOException
   */
  private void sendImpl() throws IOException {

    //发送池中的所有包
    for (; sPool.size() > 0; ) {
      byte[] tmpb = (byte[]) sPool.firstElement();
      sPool.removeElement(tmpb);
      putNextPkg(tmpb);
    }

    //发送完成后,把写焦点移除
    //因为会出现阻塞问题,所以要rem这行
    //selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
  }

  /**
   * 缓冲层面的写操作,非阴塞模式时,此方法可能不会完全写入缓冲区的数据
   *
   * @throws IOException
   */
  private void outputIsComplete() throws IOException {
    //check outputstream

    while (outputBuffer.hasRemaining()) {
      socketChannel.write(outputBuffer);

    }
  }

  /**
   * 把下一个包发到网络上去,支持大于缓冲区的包
   *
   * @param tmpb byte[]
   * @throws IOException
   */
  private void putNextPkg(byte[] tmpb) throws IOException {

    int outPkgBytesPosition = 0;

    //当tmpb还有数据未发完时
    while (outPkgBytesPosition < tmpb.length) {

      //如果此包第一次写向缓冲区时
      if (outPkgBytesPosition == 0) {
        outputBuffer.putInt(tmpb.length); //包长度
      }

      //计算包与缓冲区的差异
      int arrRemainBytes = tmpb.length - outPkgBytesPosition;
      int bufRemainBytes = outputBuffer.remaining();
      if (arrRemainBytes > bufRemainBytes) { //如果需发送的数据比缓冲大

        outputBuffer.put(tmpb, outPkgBytesPosition, bufRemainBytes);
        outPkgBytesPosition += bufRemainBytes;
      }
      else {
        //如果需发送的数据比缓冲小
        outputBuffer.put(tmpb, outPkgBytesPosition, arrRemainBytes); //包内容
        outPkgBytesPosition += arrRemainBytes;
      }
      outputBuffer.flip();
      outputIsComplete(); //发送
      outputBuffer.clear(); //清除发缓冲
    }
    //System.out.println("------------- Snt  -------------\n" + new String(tmpb));

  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -