📄 platformapi.java
字号:
package com.gctech.sms.sp.cms.core2;
import java.util.*;
import java.net.*;
import java.io.*;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.apache.log4j.Logger;
import com.gctech.sms.sp.cms.msg2.*;
import com.gctech.sms.sp.cms.util.*;
/**
* 用于与平台打交道的API
*
* http://211.147.4.156 linbo linbo 参考
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author lijz@gctech.com.cn
* @version 1.0
*/
public class PlatformAPI {
String spId;
String secret;
String host;
int port;
Socket socket = null;
InputStream in = null;
OutputStream out = null;
PersistQueue squeue = new PersistQueue(false);
// PooledExecutor msgExe = new PooledExecutor(5);
LoginMessage loginMessage = null;
static int seq_start;
Map seqMap = Collections.synchronizedMap(new HashMap(20));
ActiveTester activeTester = null;
ActiveTestMessage testMessage = new ActiveTestMessage();
MessageSender sender = null;
MessageReveiver receiver = null;
List connectlistenerList = Collections.synchronizedList(new ArrayList(3));
List recvListenerList = Collections.synchronizedList(new ArrayList(3));
List sendedListenerList = Collections.synchronizedList(new ArrayList(3));
static Logger logger = Logger.getLogger(PlatformAPI.class);
PooledExecutor pool;
private boolean isConnected = false;
public boolean isStop = false;
public PlatformAPI(String host, int port) {
this.host = host;
this.port = port;
pool = new PooledExecutor(50);
pool.runWhenBlocked();
}
/**
* 设置pool池,用于处理消息接收
* @param pool
*/
public void setPooledExecutor(PooledExecutor pool) {
this.pool = pool;
}
public int relogin() {
isStop = true;
isConnected = false;
// logout();
// if(this.activeTester != null)
// {
// activeTester.interrupt();
//
// }
// if(this.sender != null)
// {
// sender.interrupt();
//
// }
//
// if(this.receiver != null)
// {
// receiver.interrupt();
//
// }
logger.debug("try to relogin");
return login(spId, secret);
}
/**
*
* @param mesg
* @return 0,成功;1,验正没有通过 2,平台不可连 3,其他错误
*/
public int login(String spId, String secret) {
isConnected = false;
this.spId = spId;
this.secret = secret;
try {
socket = new Socket(host, port);
in = socket.getInputStream();
out = socket.getOutputStream();
if (logger.isDebugEnabled())
logger.debug("connect " + host + ":" + port + " ok!");
loginMessage = new LoginMessage(spId, secret);
out.write(loginMessage.toBytes());
logger.debug("send loginMessage ok");
byte[] temp = new byte[4];
in.read(temp); //读取消息长度
int length = TypesTools.byte2int(temp);
if (length != LoginResMessage.MESSAGE_LENGTH) {
socket.close();
return 2;
}
byte[] bytes = new byte[LoginResMessage.MESSAGE_LENGTH - 4];
int expectLength = in.read(bytes);
if (expectLength != bytes.length) {
socket.close();
return 2;
}
LoginResMessage resMsg = LoginResMessage.createMessage(bytes);
byte[] serverMd5 = resMsg.getMd5();
//generate md5 to verify
ProtocolBuffer pb = ProtocolBuffer.create(1 + 16 + secret.length());
// pb.appendInt(resMsg.commandStatus);
pb.appendByte((byte) resMsg.commandStatus);
byte[] clientMd5 = loginMessage.getMD5();
pb.appendBytes(clientMd5);
pb.appendString(secret);
logger.debug("before md5 :" + TypesTools.byteArrayToHexString(pb.toBytes()));
;
MD5 md5 = new MD5();
byte[] verifyServerMd5 = md5.getMD5ofBytes(pb.toBytes(), pb.toBytes().length);
logger.debug("after md5 :" + TypesTools.byteArrayToHexString(verifyServerMd5));
for (int i = 0; i < verifyServerMd5.length; i++) {
if (verifyServerMd5[i] != serverMd5[i]) {
//shoud tace detail infomation
socket.close();
return 1;
}
}
logger.debug("read loginRsMessage ok");
//login ok
// 心跳包
isStop = false;
activeTester = new ActiveTester(this);
activeTester.start();
//从队列中取出消息,发送消息到平台
sender = new MessageSender(this);
sender.start();
//从平台接收消息,然后交给listener处理
receiver = new MessageReveiver(this);
receiver.start();
isConnected = true;
logger.debug("PlatformAPI startup ok");
} catch (Exception ex) {
ex.printStackTrace();
logger.fatal(ex.getMessage());
return 3;
}
return 0;
}
/**
* 重新连接
* @param l
* @param max
*/
public void addReconnectListener(ReconnectListener l) {
connectlistenerList.add(l);
}
public void removeReconnectListener(ReconnectListener l) {
connectlistenerList.remove(l);
}
/**
* logout,此方法实现的太粗糙
* @throws PlatformException
*/
public void logout() {
//add "close message" to platform
//close queue
//close sokect until all message are handed
//return ok
try {
this.sendToPlatform(new ExitMessage());
Thread.sleep(1000);
isConnected = false;
isStop = true;
if (socket != null)
this.socket.close();
logger.info("close socket");
pool.shutdownNow();
if (activeTester != null)
activeTester.interrupt();
if (sender != null)
this.sender.interrupt();
if (receiver != null)
this.receiver.interrupt();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 清除
* @throws PlatformException
*/
// public void clear()
// throws PlatformException
// {
// try
// {
// socket.close();
// }
// catch(Exception ex)
// {
// throw new PlatformException(ex.getMessage()) ;
// }
// }
/**
* 异步发送消息
* @param msg
*/
public void addMessage(Message msg) throws InterruptedException {
if (!isConnected)
throw new IllegalStateException("没有连接到短信平台,不能发送消息");
squeue.putMessage(msg);
}
public boolean isConnected() {
return this.isConnected;
}
/**
* 增加接收消息listern
* @param rv
*/
public void addRecevierListener(RecevierListener rv) {
recvListenerList.add(rv);
}
public void addSendedListener(SendedLinstener sl) {
sendedListenerList.add(sl);
}
/**
* 删除消息listener
* @param rv
*/
public void removeRecevierListener(RecevierListener rv) {
recvListenerList.remove(rv);
}
/**
* 发送消息到平台,
* @param msg
* @return
*/
protected void sendToPlatform(Message msg) throws PlatformException {
try {
// if(logger.isDebugEnabled())logger.debug("send message to platform type is "+msg.commandId);
// if(msg.commandId==Message.SUBMIT_REQ)
// {
// seqMap.put(new Integer(msg.sequenceId),msg);
// }
byte[] bs = msg.toBytes();
out.write(bs);
SendedLinstener sl = null;
for (int i = 0; i < this.sendedListenerList.size(); i++) {
sl = (SendedLinstener) sendedListenerList.get(i);
MessageSndExecutor executor = new MessageSndExecutor(msg, sl);
try {
pool.execute(executor);
} catch (InterruptedException ex) {
logger.debug("Interrupte :Send Messag to Platform ");
return;
}
}
} catch (IOException ioe) {
throw new PlatformException(ioe.getMessage());
}
}
/**
* 从平台获取消息,如果没有,则返回null
* @return
*/
protected Message readMessageFromPlatform() throws InterruptedIOException {
if (this.chekSocket()) {
//网络出现的各种问题,以及读到各种数据
return Message.create(in);
} else {
return null;
}
}
protected void handMessage(Message msg) {
//if(msg is replyMessage) then ignore
//可能性能问题,可能会导致choke//tbd
//不处理心跳包,submit 的相应信息
if (msg.commandId == Message.ACTIVE_TEST_RES || msg.commandId == Message.SUBMIT_RES) {
// logger.debug("receive response "+msg.commandId);
return;
}
if (msg.commandId == Message.DELIVER) {
DeliverResMessage res = new DeliverResMessage();
res.sequenceId = res.sequenceId;
//message id改怎么写
res.messageId = ((DeliverMessage) msg).messageId;
try {
this.addMessage(res);
} catch (InterruptedException ex) {
ex.printStackTrace();
return;
}
RecevierListener rl = null;
for (int i = 0; i < this.recvListenerList.size(); i++) {
rl = (RecevierListener) recvListenerList.get(i);
//rl.handMessage(msg);这个语句与下面的语句哪个更好呢
MessageRevExecutor executor = new MessageRevExecutor(msg, rl);
try {
pool.execute(executor);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
protected void activityTest() {
try {
this.sendToPlatform(testMessage);
} catch (Exception ex) {
System.out.println(ex.getClass());
reconnect();
}
// if(chekSocket()) //checkSocket 错误,无法检测到服务器断是否可用
// {
//
// try
// {
// this.addMessage(testMessage);
// }
// catch(Exception ex)
// {
// logger.fatal("can not send testMessage "+ex.getMessage());
// }
//
// }
// else
// {
// System.out.println("网络连接出现问题,无法发送心跳包,");
// reconnect();
// }
}
protected boolean chekSocket() //不能检测服务是否连接正确
{
return socket != null;
}
protected void reconnect() {
if (isStop) {
logger.info("gate way hava stoped ,can not reconnect!");
return;
}
ReconnectListener rcl = null;
System.out.print("try reconnect...");
while (true) {
System.out.print("connect...");
int result = relogin();
if (result != 0) {
try {
System.out.println("falure");
Thread.sleep(3000);
} catch (Exception ex) {
}
} else {
System.out.println("ok");
}
for (int i = 0; i < this.connectlistenerList.size(); i++) {
rcl = (ReconnectListener) connectlistenerList.get(i);
rcl.hand(this, result);
}
if (result == 0) {
break;
}
}
isConnected = true;
}
public static void main(String[] args) {
// PlatformAPI platformAPI1 = new PlatformAPI();
logger.info("test!");
}
}
/**
* 发送消息
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author lijz@gctech.com.cn
* @version 1.0
*/
class MessageSender extends Thread {
PlatformAPI client = null;
// public boolean isRun = true;
public MessageSender(PlatformAPI client) {
this.client = client;
}
public void run() {
Message msg = null;
Message reply = null;
client.logger.info("start sender ok");
while ((!client.isStop) && (!interrupted())) {
// System.out.println("test sender ");
if (client.chekSocket()) {
try {
msg = client.squeue.takeMessage(500);
if (msg == null)
continue;
else {
client.sendToPlatform(msg);
System.out.println("msg:" + msg.commandId);
}
} catch (InterruptedException pe) {
client.logger.info("Interrupt MessageSender");
break;
} catch (PlatformException fe) {
fe.printStackTrace();
continue;
}
} else {
try {
// activtiy tester will reconnect();
Thread.sleep(5 * 1000);
} catch (InterruptedException ex) {
client.logger.info("Interrupt MessageSender");
break;
}
}
}
client.logger.info("stop sender ok");
}
}
class MessageReveiver extends Thread {
PlatformAPI client = null;
// public boolean isRun = true;
public MessageReveiver(PlatformAPI client) {
this.client = client;
}
public void run() {
client.logger.info("start receiver ok");
Message msg = null;
while ((!client.isStop) && (!interrupted())) {
// System.out.println("test receiver ");
try {
msg = client.readMessageFromPlatform();
if (msg == null) {
/**
* 没有读到数据,休息500毫秒
*/
Thread.sleep(500l);
} else {
client.handMessage(msg);
}
} catch (InterruptedIOException ioe) {
client.logger.info("Interrupt MessageReceive");
break;
} catch (InterruptedException e) {
client.logger.info("Interrupt MessageReceive");
break;
}
}
client.logger.info("Stop MessageReceive Ok");
}
}
/**
* 检测包
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author lijz@gctech.com.cn
* @version 1.0
*/
class ActiveTester extends Thread {
PlatformAPI client = null;
public ActiveTester(PlatformAPI client) {
this.client = client;
}
int interval = 5 * 1000;
public void run() {
client.logger.info("start activity test ok");
while ((!client.isStop) && (!interrupted())) {
// System.out.println("test activet ");
try {
Thread.sleep(interval);
} catch (InterruptedException ex) {
client.logger.info("Interrupt Activity");
break;
}
client.activityTest();
}
client.logger.info("Stop activity test ok");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -