asynreader.java

来自「WAP PUSH后台源码,WAP PUSH后台源码」· Java 代码 · 共 383 行

JAVA
383
字号
package com.tssx.ebiz.sgip;

import java.util.*;
import java.io.*;
/**
 * <p>类名: AsynReader</p>
 * <p>功能: 输入流中读取obj</p>
 * <p>版权: Copyright (c) 2002</p>
 * <p>公司: 深讯信科</p>
 * <p>版本: 1.0</p>
 * @程序 xuke
 * @修改纪录
 */

/**
 * - 支持异步和同步两种模式从输入流中读取obj<br>
 * - 调用者可指定要读取的obj类型,还可指定要读取的obj的SequenceNumber<br>
 * @author: xuke
 */
public class AsynReader implements Runnable {

//     private class ListenerRunner implements Runnable {
//          private Listener m_listener;
//          private SMSData m_obj;
//
//          public ListenerRunner(Listener listener, SMSData obj) {
//               m_listener = listener;
//               m_obj=obj;
//          }
//
//          public void run() {
//               try {
//                    m_listener.Arrival(m_obj);
//               } catch (Exception e) {
//               }
//          }
//     }

     /**
      * 封装了一个以commandID和seq为下标的Vector
      */
     private class ObjVector {
          private class Node {
               public int commandID;
               public int seqAddr;
               public int seqDate;
               public int sequence;
               public Object obj;

               public Node(int commandID, int seqAddr,int seqDate,int sequence, Object obj) {
                    super();

                    this.commandID = commandID;
                    this.seqAddr = seqAddr;
                    this.seqDate = seqDate;
                    this.sequence = sequence;
                    this.obj = obj;
               }
          }

          private Vector m_vec;

          /**
           * 构造函数
           */
          public ObjVector() {
               m_vec = new Vector();
          }

          /**
           * 加入一个对象
           */
          public void add(int commandID, int seqAddr,int seqDate,int sequence, Object obj) {
               m_vec.add(new Node(commandID, seqAddr,seqDate,sequence, obj));
          }

          /**
           * 在Vector中查找第一个具有指定commandID的对象
           * @param commandId
           */
          public Object get(int commandID) {
               int len = m_vec.size();
               Node node;

               for (int i = 0; i < len; i++) {
                    node = (Node) m_vec.elementAt(i);
                    if (node.commandID == commandID) {
                         return node.obj;
                    }
               }
               return null;
          }

          /**
           * 在Vector中查找第一个具有指定commandID和seqAddr的对象
           * @param commandId
           * @param sequenceNo
           */
          public Object get(int commandID, int seqAddr,int seqDate,int sequence) {
               int len = m_vec.size();
               Node node;

               for (int i = 0; i < len; i++) {
                    node = (Node) m_vec.elementAt(i);
                    if (node.commandID == commandID && node.seqAddr == seqAddr && node.seqDate == seqDate && node.sequence == sequence ) {
                         return node.obj;
                    }
               }
               return null;
          }

          /**
           * 从Vector中删除指定的对象
           * @param obj
           */
          public void remove(Object obj) {
               int len = m_vec.size();
               Node node;

               for (int i = 0; i < len; i++) {
                    node = (Node) m_vec.elementAt(i);
                    if (node.obj == obj) {
                         m_vec.remove(i);
                         break;
                    }
               }
          }

          /**
           * Vector中对象的数目
           */
          public int size() {
               return m_vec.size();
          }

          public Object elementAt(int idx) {
               return ((Node) m_vec.elementAt(idx)).obj;
          }

          public void removeAll() {
               m_vec.removeAllElements();
          }
     }


     /** 队列中最多可以保存的obj数。超过此数,则收到一个新的obj时,最旧的一个obj被丢弃 */
     private int MAX_IN_QUEUE = 256;
     /** Reader, 从输入流中读取obj */
     private Reader m_reader;
     /** 保存收到的、未被取走的obj */
     private ObjVector m_obj;
     /** 保存所有注册的Listener */
//     private ObjVector m_listeners;
     /** 正在等待obj的线程 */
     private ObjVector m_waitingThreads;
     /** 接收线程,负责不断的从输入流中读取obj */
     private Thread m_receiverThread;
     /** 导致接收线程终止的异常 */
     private Exception m_receiverException;

     public AsynReader(InputStream is) throws IOException {
          super();
          m_reader = new Reader(is);
          m_obj = new ObjVector();
//          m_listeners = new ObjVector();
          m_waitingThreads = new ObjVector();
//
          startReceiver();
     }
     /**
      * 停止接收线程
      */
     public void stopReceiver() {
          m_receiverThread = null;
     }

     /**
      * 启动接收线程。用stop()方法停止了接收线程后,可以用这个方法重新启动
      * 接收线程。但是如果接收线程是由于输入流错误而导致停止的,调用这个方
      * 法启动接收线程后接收线程可能马上又遇到错误而停止。
      */
     public void startReceiver() {
          m_receiverException = null;
          m_receiverThread = new Thread(this, "AsynReader Receiver Thread");
          m_receiverThread.start();
     }

     /**
      * 接收线程是否正在运行?
      * @return true是
      */
     public boolean isReceiverRunning() {
          try {
               checkReceiverState();
          } catch (Exception e) {
               return false;
          }

          return true;
     }
     /**
      * 将obj交给Listener运行
      * @param listener
      * @param obj
      */
//     private void invokeListener(Listener listener, SMSData obj) {
//          try {
//               Thread newThread =new Thread(new ListenerRunner((Listener) listener, obj),"AsynReader Receiver - Run Listener Thread");
//               newThread.start();
//          }
//          catch (Exception e) {
//          }
//     }
     /**
      * 接收线程
      */
     public void run() {
          try {
               SMSData smsdata;
               Object listener;
               Object thread;
               while (m_receiverThread == Thread.currentThread()) {
                    smsdata = m_reader.read();

                    //检查是否有Listener在等待该obj
//                    listener = m_listeners.get(smsdata.getCommandID(), smsdata.getSeqAddr(),smsdata.getSeqDate(),smsdata.getSequence());
//                    if (listener == null) { //没有,检查是否有Listener在等待该类objs
//                         listener = m_listeners.get(smsdata.getCommandID(), 0,0,0);
//                    }
//                    if (listener != null) { //有,将obj交给该Listener
//                         try {
//                              m_listeners.remove(listener);
//                              invokeListener((Listener)listener, smsdata);
//                         } catch (Exception e) {
//                         }
//                    } else { //没有,将obj保存起来
                         if (m_obj.size() >= MAX_IN_QUEUE) { //obj队列满,丢弃第一个obj
                              m_obj.remove(m_obj.elementAt(0));
                         }
                         m_obj.add(smsdata.getCommandID(), smsdata.getSeqAddr(),smsdata.getSeqDate(),smsdata.getSequence(), smsdata);

                         //检查是否有线程在等待该obj
                         thread = m_waitingThreads.get(smsdata.getCommandID(), smsdata.getSeqAddr(),smsdata.getSeqDate(),smsdata.getSequence());
                         if (thread == null) { //没有,检查是否有线程在等待该类obj
                              thread = m_waitingThreads.get(smsdata.getCommandID(), 0,0,0);
                         }
                         if (thread != null) { //有,唤醒该线程
                              try {
                                   ((Thread) thread).interrupt();
                              } catch (Exception e) {
                              }
                         }
//                    }
               }
          } catch (Exception e) {
               m_receiverException = e;
          } finally {
               if (m_receiverException == null) {
                    m_receiverException = new Exception("Reader stopped.");
               }

               //唤醒所有正在等待的线程
               int len = m_waitingThreads.size();
               for (int i = 0; i < len; i++) {
                    try {
                         ((Thread) m_waitingThreads.elementAt(i)).interrupt();
                    } catch (Exception exp) {
                    }
               }

               //清除所有正在等待的线程
               m_waitingThreads.removeAll();
               //清除所有Listener
//               m_listeners.removeAll();
          }
     }
     /**
      * 检查接收线程状态,是否已经异常中止。如果是的话,将中止接收线程的
      * 异常抛出。
      * @exception IOException
      * @exception PduException
      */
     private void checkReceiverState() throws IOException {
          if (m_receiverException == null) {
               return;
          }

          if (m_receiverException instanceof IOException) {
               throw (IOException) m_receiverException;
          }

     }

     /**
      * 读取指定的PDU。如果指定PDU还未到达,则等待。
      */
     public SMSData blockRead(int commandID, int seqAddr,int seqDate,int sequence)
               throws IOException{
          SMSData smsdata;

          m_waitingThreads.add(commandID, seqAddr, seqDate,sequence,Thread.currentThread());

          while (true) {
               smsdata = read(commandID, seqAddr, seqDate,sequence);
               if (smsdata != null) {
                    m_waitingThreads.remove(Thread.currentThread());
                    return smsdata;
               }
               checkReceiverState();

               try {
                    while (true) {
                         Thread.sleep(100000);
                    }
               } catch (InterruptedException e) {
               }
          }
     }
     /**
      * 读取指定的obj。如果指定obj还未到达,则等待。
      */
     public SMSData blockRead(int commandID) throws IOException {
          SMSData smsdata;

          m_waitingThreads.add(commandID, 0,0,0, Thread.currentThread());

          while (true) {
               smsdata = read(commandID);
               if (smsdata != null) {
                    m_waitingThreads.remove(Thread.currentThread());
                    return smsdata;
               }
               checkReceiverState();

               try {
                    while (true) {
                         Thread.sleep(100000);
                    }
               } catch (InterruptedException e) {
               }
          }
     }
     /**
      * 读取下一个obj。如果没有obj可读,则返回NULL。
      */
     public SMSData read() {
          SMSData smsdata;

          if (m_obj.size() == 0) {
               return null;
          }

          smsdata = (SMSData) m_obj.elementAt(0);
          m_obj.remove(smsdata);
          return smsdata;
     }

     /**
      * 读取指定的obj。如果指定obj还未到达,则返回NULL。
      */
     public SMSData read(int commandID) {
          Object obj = m_obj.get(commandID);
          if (obj != null) {
               m_obj.remove(obj);
               return (SMSData) obj;
          }

          return null;
     }
     /**
      * 读取指定的obj。如果指定obj还未到达,则返回NULL。
      */
     public SMSData read(int commandID, int seqAddr,int seqDate,int sequence) {
          Object obj = m_obj.get(commandID,seqAddr ,seqDate,sequence);
          if (obj != null) {
               m_obj.remove(obj);
               return (SMSData)obj;
          }

          return null;
     }

}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?