📄 transceiver.java
字号:
package com.khan.sms.cmpp2control;
import com.khan.sms.*;
import java.net.Socket;
import java.io.*;
import com.khan.sms.cmpp2.*;
import com.khan.datetime.SMPTime;
import com.khan.util.*;
import java.net.SocketTimeoutException;
import com.khan.socket.SocketPoolCon;
import java.net.SocketException;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Copyright: Copyright (c) 2006</p>
*
* <p>Company: </p>
*
* @author not attributable
* @version 1.0
*/
public class Transceiver implements Runnable {
static boolean flag = true;
CmppParam cp = CmppParam.getInstance();
SendQueue sq = SendQueue.getInstance();
RecvQueue rq = RecvQueue.getInstance();
private long sendtime = SMPTime.getNow();
private int send_test_flag = 0;
public static void stopThread() {
flag = false;
}
public Transceiver() {
common.Assert("debug:Transceiver线程建立");
}
/**到达规定时间后通知发送线程发送active包*/
private boolean getSendTestStatus() {
if (0 == send_test_flag) {
if (SMPTime.getNow() - sendtime > cp.ACTIVE_INTERVAL) {
common.Assert("debug:Transceiver准备发送Active_resp包!");
send_test_flag = 1;
return true;
}
}
return false;
}
/**复位active包发送状态*/
private void resetTestStatus() {
send_test_flag = 0;
sendtime = SMPTime.getNow();
}
/**到达规定时间没有收到active_test_resp包则返回超时状态*/
private boolean getRecvTestStatus() {
if (1 == send_test_flag) {
if (SMPTime.getNow() - sendtime > (cp.ACTIVE_INTERVAL + cp.MAX_ACK_TIMEOUT)) {
common.Assert("debug:Transceiver未收到Active_resp包,超时!");
send_test_flag = 0;
sendtime = SMPTime.getNow();
return true;
}
}
return false;
}
private void Send(byte[] data) throws IOException {
SocketPoolCon spc = null;
try{
spc = cp.Cmpp_Socket.get();
DataOutputStream dos = new DataOutputStream(spc.getSocket().getOutputStream());
dos.write(data);
}finally{
cp.Cmpp_Socket.release(spc);
}
}
private byte[] Receive() throws IOException {
SocketPoolCon spc = null;
byte[] data = null;
try{
spc = cp.Cmpp_Socket.get();
DataInputStream dis = new DataInputStream(spc.getSocket().getInputStream());
data = new byte[4096];
SocketCommon. FillChar(data, (byte)0x0);
dis.read(data);
}finally{
cp.Cmpp_Socket.release(spc);
}
int length = SocketCommon.getDWord(data, 0);
byte[] result = new byte[length];
System.arraycopy(data, 0, result, 0, result.length);
//MsgID.PrintDataHex(result);
return result;
}
/**产生一个active_test包*/
private byte[] getTestData() {
Cmpp2ActiveTest cat = new Cmpp2ActiveTest();
cat.setSeqID(cp.ORIGINATOR ++ );
byte[] data = cat.encodeCmpp();
return data;
}
/**获得包的commandID*/
private int getCommandID(byte[] data) {
if (data.length < 12) {
return 0;
}
return SocketCommon.getDWord(data, 4);
}
/**
* 处理消息
* @param command_id int 命令号
* @param data byte[] 数据包
* @throws IOException
*/
private void processMO(int command_id, byte[] data) throws IOException {
if (command_id == Cmpp2Command.CMPP_DELIVER) {
Cmpp2Deliver cd = (Cmpp2Deliver)new Cmpp2Deliver(data).decodeCmpp();
cp.LogMain.logOut(cd.toString());
MsgID msgid = cd.Msg_ID;
Cmpp2DeliverResp cdr = new Cmpp2DeliverResp(msgid, (byte) 0);
cdr.setSeqID(cd.getSeqID());
byte deliver_resp[] = cdr.encodeCmpp();
Send(deliver_resp);
rq.put(cd);
} else if (command_id == Cmpp2Command.CMPP_SUBMIT_RESP) {
//MsgID.PrintDataHex(data); //test
Cmpp2SubmitResp csr =(Cmpp2SubmitResp) new Cmpp2SubmitResp(data).decodeCmpp();
rq.put(csr);
} else if (command_id == Cmpp2Command.CMPP_TERMINATE_RESP) {
Cmpp2Login.connected = false;
//Cmpp2TerminateResp ctr = (Cmpp2TerminateResp)new Cmpp2TerminateResp(data).decodeCmpp();
//rq.put(ctr);
}
}
public void run() {
common.Assert("debug:Transceiver线程启动");
while (flag ) {
common.sleep(cp.SEND_SLEEP);
if (Cmpp2Login.connected) {
try {
/*发送数据(ActiveTest , submit)*/
if (getSendTestStatus()) {
//common.Assert("debug:Transceiver开始发送Active_resp包!");
Send(getTestData());
} else {
byte[] data = sq.get();
if (data != null){
//MsgID.PrintDataHex(data);
//common.Assert("debug:Transceiver开始发送submit包!");
Send(data);
}
}
/*接收数据,并检测是否为CMPP_ACTIVE_TEST_RESP, 否则直接加入到接收队列*/
byte[] data = Receive();
// MsgID.PrintDataHex(data);
if (data == null) continue;
int command_id = getCommandID(data);
//common.Assert("debug:Transceiver取得的CommandID:"+ MsgID.toHexString(MsgID.toByteArray( command_id)));
if (command_id == 0){
cp.ErrMain.logOut("error:Command_ID无效!");
continue;
}else{
if (command_id == Cmpp2Command.CMPP_ACTIVE_TEST_RESP) {
common.Assert("debug:Transceiver取得CMPP_ACTIVE_TEST_RESP包,开始重新计时!");
resetTestStatus();
continue;
} else {
processMO(command_id, data);
}
}
/**超时未收到CMPP_ACTIVE_TEST_RESP 则退出线程并停止发送*/
if (getRecvTestStatus()) {
Cmpp2Login.connected = false;
}
} catch(NullPointerException e){
e.printStackTrace();
cp.ErrMain.logOut("空指针异常!", e);
} catch(SocketTimeoutException e){
//System.out.println(e.getMessage());
//e.printStackTrace();
} catch(SocketException e){
cp.ErrMain.logOut("socket连接断开!", e);
Cmpp2Login.connected = false;
} catch (IOException e) {
e.printStackTrace();
cp.ErrMain.logOut("socket通讯io错误!", e);
} catch (Exception e) {
e.printStackTrace();
cp.ErrMain.logOut("数据解包错误!", e);
}
}
}
common.Assert("debug:Transceiver线程停止");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -