📄 cmpp3transceiver.java
字号:
package com.khan.sms.cmpp3control;
import com.khan.datetime.*;
import com.khan.sms.MsgID;
import java.net.*;
import java.io.*;
import com.khan.sms.cmpp3.*;
import com.khan.util.*;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Copyright: Copyright (c) 2006</p>
*
* <p>Company: </p>
*
* @author not attributable
* @version 1.0
*/
public class Cmpp3Transceiver implements Runnable {
static boolean flag = true;
private Socket spc = null;
private CmppParam cp = null;
private SendQueue sq = null;
private RecvQueue rq = null;
private Cmpp3MT mt = null;
private Alarm alsend = null;
private Alarm alrecv = null;
public static void stopThread() {
flag = false;
}
public Cmpp3Transceiver(CmppParam cp, RecvQueue rq) {
this.cp = cp;
this.rq = rq;
sq = new SendQueue(cp.MAX_TRANSMIT);
mt = new Cmpp3MT(cp, sq);
alsend = new Alarm(cp.ACTIVE_INTERVAL, cp.ACTIVE_TIMEOUT_INTERVAL);
alrecv = new Alarm(cp.ACTIVE_RECV_TIMEOUT, cp.ACTIVE_RECV_TIMEOUT);
common.Assert("debug:Cmpp3Transceiver处理线程建立");
}
/**产生一个active_test包*/
private byte[] getTestData() {
Cmpp3ActiveTest cat = new Cmpp3ActiveTest();
cat.setSeqID(cp.ORIGINATOR++);
byte[] data = cat.encodeCmpp();
return data;
}
/**获得包的commandID*/
private int getCommandID(byte[] data) {
if (data.length < 12) {
return 0;
}
return SocketCommon.getDWord(data, 4);
}
/**
* 处理消息
* @param command_id int 命令号
* @param data byte[] 数据包
* @throws IOException
*/
private void processMO(int command_id, byte[] data) throws IOException {
if (command_id == Cmpp3Command.CMPP_ACTIVE_TEST) {
alrecv.resetAlarmStatus();
Cmpp3ActiveTest cat = (Cmpp3ActiveTest)new Cmpp3ActiveTest(data).decodeCmpp();
Cmpp3ActiveTestResp car = new Cmpp3ActiveTestResp();
car.setSeqID(cat.getSeqID());
byte active_resp[] = car.encodeCmpp();
spc = cp.Cmpp_Socket.get();
SocketCommon.Send(spc, active_resp);
} else if (command_id == Cmpp3Command.CMPP_DELIVER) {
Cmpp3Deliver cd = (Cmpp3Deliver)new Cmpp3Deliver(data).decodeCmpp();
//cp.LogMain.logOut(cd.toString());
MsgID msgid = cd.Msg_ID;
Cmpp3DeliverResp cdr = new Cmpp3DeliverResp(msgid, 0);
cdr.setSeqID(cd.getSeqID());
byte deliver_resp[] = cdr.encodeCmpp();
common.Assert("-------------------");
SocketCommon.PrintDataHex(data);//
SocketCommon.PrintDataHex(deliver_resp);//
common.Assert("-------------------");
spc = cp.Cmpp_Socket.get();
SocketCommon.Send(spc, deliver_resp);
rq.put(cd);
} else if (command_id == Cmpp3Command.CMPP_SUBMIT_RESP) {
Cmpp3SubmitResp csr = (Cmpp3SubmitResp)new Cmpp3SubmitResp(data).decodeCmpp();
rq.put(csr);
} else if (command_id == Cmpp3Command.CMPP_TERMINATE_RESP) {
cp.Cmpp_Socket.setConnect(false);
}
}
/**发送数据(ActiveTest , submit)
* */
private void SendMT() throws IOException {
if (cp.TRANSCEIVER_MODE % 2 == 1) { //如果是发送模式
byte[] data = null;
spc = cp.Cmpp_Socket.get();
if (alsend.getSendTestStatus()) {
SocketCommon.Send(spc, getTestData());
} else {
data = sq.get();
if (data != null) { //如果有普通下行发送,则不发送Active_test
//SocketCommon.PrintDataHex(data);//
SocketCommon.Send(spc, data);
alsend.resetAlarmStatus();
}
}
}
}
public void run() {
common.Assert("debug:" + Thread.currentThread().getName() + "线程启动");
while (flag) {
//common.Assert("debug:" + Thread.currentThread().getName() + " 连接状态:"+cp.Cmpp_Socket.getConnected());
if (cp.Cmpp_Socket.getConnected()) {
if (cp.TRANSCEIVER_MODE % 2 == 1) { //如果是发送模式
mt.run();
//从数据库装入记录到队列
}
try {
SendMT();
/*接收数据,并检测是否为CMPP_ACTIVE_TEST_RESP, 否则直接加入到接收队列*/
byte[] data = null;
spc = cp.Cmpp_Socket.get();
data = SocketCommon.Receive(spc);
//common.Assert("debug:" + Thread.currentThread().getName() + ":");
//SocketCommon.PrintDataHex(data);//
if (data != null) {
int command_id = getCommandID(data);
if (command_id != 0) {
if (command_id == Cmpp3Command.CMPP_ACTIVE_TEST_RESP) {
//common.Assert("debug:" + Thread.currentThread().getName() + "取得CMPP3_ACTIVE_TEST_RESP包,开始重新计时!");
alsend.resetAlarmStatus();
continue;
} else {
processMO(command_id, data);
}
} else {
cp.ErrMain.logOut("error:Command_ID无效!");
}
}
if (cp.TRANSCEIVER_MODE % 2 == 0) { //如果是接收模式
if (alrecv.getSendTestStatus()) {
cp.Cmpp_Socket.setConnect(false);
}
}
/**超时未收到CMPP_ACTIVE_TEST_RESP 则退出线程并停止发送*/
if (alsend.getTimeoutStatus()) {
cp.Cmpp_Socket.setConnect(false);
}
common.sleep(cp.SEND_SLEEP);
} catch (NullPointerException e) {
e.printStackTrace();
cp.ErrMain.logOut("空指针异常!", e);
} catch (SocketTimeoutException e) {
} catch (SocketException e) {
cp.ErrMain.logOut("socket连接断开!", e);
cp.Cmpp_Socket.setConnect(false);
} catch (IOException e) {
e.printStackTrace();
cp.ErrMain.logOut("socket通讯io错误!", e);
} catch (Exception e) {
e.printStackTrace();
cp.ErrMain.logOut("数据解包错误!", e);
}
}
}
common.Assert("debug:" + Thread.currentThread().getName() + "线程停止");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -