asynreader.java
来自「WAP PUSH后台源码,WAP PUSH后台源码」· Java 代码 · 共 383 行
JAVA
383 行
package com.tssx.ebiz.sgip;
import java.util.*;
import java.io.*;
/**
* <p>类名: AsynReader</p>
* <p>功能: 输入流中读取obj</p>
* <p>版权: Copyright (c) 2002</p>
* <p>公司: 深讯信科</p>
* <p>版本: 1.0</p>
* @程序 xuke
* @修改纪录
*/
/**
* - 支持异步和同步两种模式从输入流中读取obj<br>
* - 调用者可指定要读取的obj类型,还可指定要读取的obj的SequenceNumber<br>
* @author: xuke
*/
public class AsynReader implements Runnable {
// private class ListenerRunner implements Runnable {
// private Listener m_listener;
// private SMSData m_obj;
//
// public ListenerRunner(Listener listener, SMSData obj) {
// m_listener = listener;
// m_obj=obj;
// }
//
// public void run() {
// try {
// m_listener.Arrival(m_obj);
// } catch (Exception e) {
// }
// }
// }
/**
* 封装了一个以commandID和seq为下标的Vector
*/
private class ObjVector {
private class Node {
public int commandID;
public int seqAddr;
public int seqDate;
public int sequence;
public Object obj;
public Node(int commandID, int seqAddr,int seqDate,int sequence, Object obj) {
super();
this.commandID = commandID;
this.seqAddr = seqAddr;
this.seqDate = seqDate;
this.sequence = sequence;
this.obj = obj;
}
}
private Vector m_vec;
/**
* 构造函数
*/
public ObjVector() {
m_vec = new Vector();
}
/**
* 加入一个对象
*/
public void add(int commandID, int seqAddr,int seqDate,int sequence, Object obj) {
m_vec.add(new Node(commandID, seqAddr,seqDate,sequence, obj));
}
/**
* 在Vector中查找第一个具有指定commandID的对象
* @param commandId
*/
public Object get(int commandID) {
int len = m_vec.size();
Node node;
for (int i = 0; i < len; i++) {
node = (Node) m_vec.elementAt(i);
if (node.commandID == commandID) {
return node.obj;
}
}
return null;
}
/**
* 在Vector中查找第一个具有指定commandID和seqAddr的对象
* @param commandId
* @param sequenceNo
*/
public Object get(int commandID, int seqAddr,int seqDate,int sequence) {
int len = m_vec.size();
Node node;
for (int i = 0; i < len; i++) {
node = (Node) m_vec.elementAt(i);
if (node.commandID == commandID && node.seqAddr == seqAddr && node.seqDate == seqDate && node.sequence == sequence ) {
return node.obj;
}
}
return null;
}
/**
* 从Vector中删除指定的对象
* @param obj
*/
public void remove(Object obj) {
int len = m_vec.size();
Node node;
for (int i = 0; i < len; i++) {
node = (Node) m_vec.elementAt(i);
if (node.obj == obj) {
m_vec.remove(i);
break;
}
}
}
/**
* Vector中对象的数目
*/
public int size() {
return m_vec.size();
}
public Object elementAt(int idx) {
return ((Node) m_vec.elementAt(idx)).obj;
}
public void removeAll() {
m_vec.removeAllElements();
}
}
/** 队列中最多可以保存的obj数。超过此数,则收到一个新的obj时,最旧的一个obj被丢弃 */
private int MAX_IN_QUEUE = 256;
/** Reader, 从输入流中读取obj */
private Reader m_reader;
/** 保存收到的、未被取走的obj */
private ObjVector m_obj;
/** 保存所有注册的Listener */
// private ObjVector m_listeners;
/** 正在等待obj的线程 */
private ObjVector m_waitingThreads;
/** 接收线程,负责不断的从输入流中读取obj */
private Thread m_receiverThread;
/** 导致接收线程终止的异常 */
private Exception m_receiverException;
public AsynReader(InputStream is) throws IOException {
super();
m_reader = new Reader(is);
m_obj = new ObjVector();
// m_listeners = new ObjVector();
m_waitingThreads = new ObjVector();
//
startReceiver();
}
/**
* 停止接收线程
*/
public void stopReceiver() {
m_receiverThread = null;
}
/**
* 启动接收线程。用stop()方法停止了接收线程后,可以用这个方法重新启动
* 接收线程。但是如果接收线程是由于输入流错误而导致停止的,调用这个方
* 法启动接收线程后接收线程可能马上又遇到错误而停止。
*/
public void startReceiver() {
m_receiverException = null;
m_receiverThread = new Thread(this, "AsynReader Receiver Thread");
m_receiverThread.start();
}
/**
* 接收线程是否正在运行?
* @return true是
*/
public boolean isReceiverRunning() {
try {
checkReceiverState();
} catch (Exception e) {
return false;
}
return true;
}
/**
* 将obj交给Listener运行
* @param listener
* @param obj
*/
// private void invokeListener(Listener listener, SMSData obj) {
// try {
// Thread newThread =new Thread(new ListenerRunner((Listener) listener, obj),"AsynReader Receiver - Run Listener Thread");
// newThread.start();
// }
// catch (Exception e) {
// }
// }
/**
* 接收线程
*/
public void run() {
try {
SMSData smsdata;
Object listener;
Object thread;
while (m_receiverThread == Thread.currentThread()) {
smsdata = m_reader.read();
//检查是否有Listener在等待该obj
// listener = m_listeners.get(smsdata.getCommandID(), smsdata.getSeqAddr(),smsdata.getSeqDate(),smsdata.getSequence());
// if (listener == null) { //没有,检查是否有Listener在等待该类objs
// listener = m_listeners.get(smsdata.getCommandID(), 0,0,0);
// }
// if (listener != null) { //有,将obj交给该Listener
// try {
// m_listeners.remove(listener);
// invokeListener((Listener)listener, smsdata);
// } catch (Exception e) {
// }
// } else { //没有,将obj保存起来
if (m_obj.size() >= MAX_IN_QUEUE) { //obj队列满,丢弃第一个obj
m_obj.remove(m_obj.elementAt(0));
}
m_obj.add(smsdata.getCommandID(), smsdata.getSeqAddr(),smsdata.getSeqDate(),smsdata.getSequence(), smsdata);
//检查是否有线程在等待该obj
thread = m_waitingThreads.get(smsdata.getCommandID(), smsdata.getSeqAddr(),smsdata.getSeqDate(),smsdata.getSequence());
if (thread == null) { //没有,检查是否有线程在等待该类obj
thread = m_waitingThreads.get(smsdata.getCommandID(), 0,0,0);
}
if (thread != null) { //有,唤醒该线程
try {
((Thread) thread).interrupt();
} catch (Exception e) {
}
}
// }
}
} catch (Exception e) {
m_receiverException = e;
} finally {
if (m_receiverException == null) {
m_receiverException = new Exception("Reader stopped.");
}
//唤醒所有正在等待的线程
int len = m_waitingThreads.size();
for (int i = 0; i < len; i++) {
try {
((Thread) m_waitingThreads.elementAt(i)).interrupt();
} catch (Exception exp) {
}
}
//清除所有正在等待的线程
m_waitingThreads.removeAll();
//清除所有Listener
// m_listeners.removeAll();
}
}
/**
* 检查接收线程状态,是否已经异常中止。如果是的话,将中止接收线程的
* 异常抛出。
* @exception IOException
* @exception PduException
*/
private void checkReceiverState() throws IOException {
if (m_receiverException == null) {
return;
}
if (m_receiverException instanceof IOException) {
throw (IOException) m_receiverException;
}
}
/**
* 读取指定的PDU。如果指定PDU还未到达,则等待。
*/
public SMSData blockRead(int commandID, int seqAddr,int seqDate,int sequence)
throws IOException{
SMSData smsdata;
m_waitingThreads.add(commandID, seqAddr, seqDate,sequence,Thread.currentThread());
while (true) {
smsdata = read(commandID, seqAddr, seqDate,sequence);
if (smsdata != null) {
m_waitingThreads.remove(Thread.currentThread());
return smsdata;
}
checkReceiverState();
try {
while (true) {
Thread.sleep(100000);
}
} catch (InterruptedException e) {
}
}
}
/**
* 读取指定的obj。如果指定obj还未到达,则等待。
*/
public SMSData blockRead(int commandID) throws IOException {
SMSData smsdata;
m_waitingThreads.add(commandID, 0,0,0, Thread.currentThread());
while (true) {
smsdata = read(commandID);
if (smsdata != null) {
m_waitingThreads.remove(Thread.currentThread());
return smsdata;
}
checkReceiverState();
try {
while (true) {
Thread.sleep(100000);
}
} catch (InterruptedException e) {
}
}
}
/**
* 读取下一个obj。如果没有obj可读,则返回NULL。
*/
public SMSData read() {
SMSData smsdata;
if (m_obj.size() == 0) {
return null;
}
smsdata = (SMSData) m_obj.elementAt(0);
m_obj.remove(smsdata);
return smsdata;
}
/**
* 读取指定的obj。如果指定obj还未到达,则返回NULL。
*/
public SMSData read(int commandID) {
Object obj = m_obj.get(commandID);
if (obj != null) {
m_obj.remove(obj);
return (SMSData) obj;
}
return null;
}
/**
* 读取指定的obj。如果指定obj还未到达,则返回NULL。
*/
public SMSData read(int commandID, int seqAddr,int seqDate,int sequence) {
Object obj = m_obj.get(commandID,seqAddr ,seqDate,sequence);
if (obj != null) {
m_obj.remove(obj);
return (SMSData)obj;
}
return null;
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?