📄 .#platformapi.java.1.7
字号:
package com.gctech.sms.sp.cms.core;
import java.util.*;
import java.net.*;
import java.io.*;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.apache.log4j.Logger;
import com.gctech.sms.sp.cms.msg.*;
import com.gctech.sms.sp.cms.util.*;
/**
* 用于与平台打交道的API
*
* http://211.147.4.156 linbo linbo 参考
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author lijz@gctech.com.cn
* @version 1.0
*/
public class PlatformAPI
{
String spId;
String secret;
String host;
int port;
Socket socket = null;
InputStream in = null;
OutputStream out = null;
PersistQueue squeue = new PersistQueue(false);
// PooledExecutor msgExe = new PooledExecutor(5);
LoginMessage loginMessage = null;
static int seq_start;
Map seqMap = Collections.synchronizedMap(new HashMap(20));
ActiveTester activeTester = null;
ActiveTestMessage testMessage = new ActiveTestMessage();
MessageSender sender = null;
MessageReveiver receiver = null;
List connectlistenerList = Collections.synchronizedList(new ArrayList(3)) ;
List recvListenerList = Collections.synchronizedList(new ArrayList(3)) ;
static Logger logger = Logger.getLogger(PlatformAPI.class);
PooledExecutor pool ;
private boolean isConnected = false;
public PlatformAPI(String host,int port)
{
this.host = host;
this.port = port;
pool = new PooledExecutor(50);
pool.runWhenBlocked();
}
/**
* 设置pool池,用于处理消息接收
* @param pool
*/
public void setPooledExecutor(PooledExecutor pool)
{
this.pool = pool ;
}
public int relogin()
{
isConnected = false;
if(this.activeTester!=null)
{
activeTester.interrupt();
activeTester.isRun = false ;
}
if(this.sender!=null)
{
sender.interrupt();
sender.isRun = false;
}
if(this.receiver!=null)
{
receiver.interrupt();
receiver.isRun = false;
}
logger.debug("try to relogin");
return login(spId,secret);
}
/**
*
* @param mesg
* @return 0,成功;1,验正没有通过 2,平台不可连 3,其他错误
*/
public int login(String spId,String secret)
{
isConnected = false;
this.spId = spId;
this.secret = secret;
try
{
socket = new Socket(host,port);
in = socket.getInputStream();
out = socket.getOutputStream();
if(logger.isDebugEnabled())logger.debug("connect "+host+":"+port+" ok!");
loginMessage = new LoginMessage(spId,secret);
out.write(loginMessage.toBytes());
byte[] temp = new byte[4];
in.read(temp); //读取消息长度
int length = TypesTools.byte2int(temp);
if(length != LoginResMessage.MESSAGE_LENGTH)
{
socket.close();
return 2;
}
byte[] bytes = new byte[LoginResMessage.MESSAGE_LENGTH - 4];
int expectLength = in.read(bytes);
if(expectLength != bytes.length)
{
socket.close();
return 2;
}
LoginResMessage resMsg = LoginResMessage.createMessage(bytes);
byte[] serverMd5 = resMsg.getMd5();
//generate md5 to verify
ProtocolBuffer pb = ProtocolBuffer.create(1 + 16 + secret.length());
// pb.appendInt(resMsg.commandStatus);
pb.appendByte((byte)resMsg.commandStatus);
byte[] clientMd5 = loginMessage.getMD5();
pb.appendBytes(clientMd5);
pb.appendString(secret);
// logger.debug("before md5 :"+TypesTools.byteArrayToHexString(pb.toBytes()));;
MD5 md5 = new MD5();
byte[] verifyServerMd5 = md5.getMD5ofBytes(pb.toBytes(),
pb.toBytes().length);
// logger.debug("after md5 :"+TypesTools.byteArrayToHexString(verifyServerMd5));
for(int i = 0;i < verifyServerMd5.length;i++)
{
if(verifyServerMd5[i] != serverMd5[i])
{
//shoud tace detail infomation
socket.close();
return 1;
}
}
//login ok
// 心跳包
activeTester = new ActiveTester(this);
activeTester.start();
//从队列中取出消息,发送消息到平台
sender = new MessageSender(this);
sender.start();
//从平台接收消息,然后交给listener处理
receiver = new MessageReveiver(this);
receiver.start();
isConnected = true;
}
catch(Exception ex)
{
logger.fatal(ex.getMessage());
return 3;
}
return 0;
}
/**
* 重新连接
* @param l
* @param max
*/
public void addReconnectListener(ReconnectListener l)
{
connectlistenerList.add(l);
}
public void removeReconnectListener(ReconnectListener l)
{
connectlistenerList.remove(l);
}
/**
* logout,此方法实现的太粗糙
* @throws PlatformException
*/
public void logout()
{
//add "close message" to platform
//close queue
//close sokect until all message are handed
//return ok
try
{
isConnected = false;
if(socket!=null)this.socket.close();
pool.shutdownNow();
}
catch(Exception e)
{
e.printStackTrace();
}
}
/**
* 清除
* @throws PlatformException
*/
// public void clear()
// throws PlatformException
// {
// try
// {
// socket.close();
// }
// catch(Exception ex)
// {
// throw new PlatformException(ex.getMessage()) ;
// }
// }
/**
* 异步发送消息
* @param msg
*/
public void addMessage(Message msg) throws PlatformException
{
if(!isConnected) throw new PlatformException("没有连接到短信平台,不能发送消息");
squeue.putMessage(msg);
}
public boolean isConnected()
{
return this.isConnected;
}
/**
* 增加接收消息listern
* @param rv
*/
public void addRecevierListener(RecevierListener rv)
{
recvListenerList.add(rv);
}
/**
* 删除消息listener
* @param rv
*/
public void removeRecevierListener(RecevierListener rv)
{
recvListenerList.remove(rv);
}
/**
* 发送消息到平台,
* @param msg
* @return
*/
protected void sendToPlatform(Message msg) throws PlatformException
{
try
{
// if(logger.isDebugEnabled())logger.debug("send message to platform type is "+msg.commandId);
// if(msg.commandId==Message.SUBMIT_REQ)
// {
// seqMap.put(new Integer(msg.sequenceId),msg);
// }
byte[] bs = msg.toBytes();
out.write(bs);
}
catch(IOException ioe)
{
throw new PlatformException(ioe.getMessage());
}
}
/**
* 从平台获取消息,如果没有,则返回null
* @return
*/
protected Message readMessageFromPlatform()
{
if(this.chekSocket())
{
//网络出现的各种问题,以及读到各种数据
return Message.create(in);
}
else
{
return null;
}
}
protected void handMessage(Message msg)
{
//if(msg is replyMessage) then ignore
//可能性能问题,可能会导致choke//tbd
//不处理心跳包,submit 的相应信息
if(msg.commandId==Message.ACTIVE_TEST_RES||msg.commandId==Message.SUBMIT_RES) return ;
if(msg.commandId==Message.DELIVER)
{
DeliverResMessage res = new DeliverResMessage();
res.sequenceId = res.sequenceId;
//message id改怎么写
res.messageId = ((DeliverMessage)msg).messageId;
try
{
this.addMessage(res);
}
catch(Exception ex)
{
ex.printStackTrace();
//continue;
}
RecevierListener rl = null;
for(int i = 0;i < this.recvListenerList.size();i++)
{
rl = (RecevierListener) recvListenerList.get(i);
//rl.handMessage(msg);这个语句与下面的语句哪个更好呢
MessageExecutor executor = new MessageExecutor(msg,rl);
try
{
pool.execute(executor);
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
}
protected void activityTest()
{
try
{
this.sendToPlatform(testMessage);
}
catch(Exception ex)
{
System.out.println(ex.getClass());
reconnect();
}
// if(chekSocket()) //checkSocket 错误,无法检测到服务器断是否可用
// {
//
// try
// {
// this.addMessage(testMessage);
// }
// catch(Exception ex)
// {
// logger.fatal("can not send testMessage "+ex.getMessage());
// }
//
// }
// else
// {
// System.out.println("网络连接出现问题,无法发送心跳包,");
// reconnect();
// }
}
protected boolean chekSocket() //不能检测服务是否连接正确
{
return socket != null && socket.isConnected();
}
protected void reconnect()
{
isConnected = false;
ReconnectListener rcl = null;
System.out.print("准备重新连接...");
while(true)
{
System.out.print("重新连接...");
int result = relogin();
if(result!=0)
{
try
{
System.out.println("falure");
Thread.sleep(3000);
}
catch(Exception ex)
{
}
}
else
{
System.out.println("ok");
}
for(int i=0;i<this.connectlistenerList.size();i++)
{
rcl = (ReconnectListener)connectlistenerList.get(i);
rcl.hand(this,result);
}
if(result==0)
{
break;
}
}
isConnected = true;
}
public static void main(String[] args)
{
// PlatformAPI platformAPI1 = new PlatformAPI();
}
}
/**
* 发送消息
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author lijz@gctech.com.cn
* @version 1.0
*/
class MessageSender extends Thread
{
PlatformAPI client = null;
public boolean isRun = true;
public MessageSender(PlatformAPI client)
{
this.client = client;
}
public void run()
{
Message msg = null;
Message reply = null;
System.out.println("start sender ok");
while(isRun)
{
if(client.chekSocket())
{
try
{
msg = client.squeue.takeMessage(500);
if(msg==null) continue;
else client.sendToPlatform(msg);
}
catch(PlatformException pe)
{
pe.printStackTrace();
break ;
}
}
else
{
try
{
// activtiy tester will reconnect();
Thread.sleep(5 * 1000);
}
catch(InterruptedException ex)
{
}
}
}
System.out.println("stop sender ok");
}
}
class MessageReveiver extends Thread
{
PlatformAPI client = null;
public boolean isRun = true;
public MessageReveiver(PlatformAPI client)
{
this.client = client;
}
public void run()
{
System.out.println("start receiver ok");
Message msg = null;
while(isRun&&client.chekSocket())
{
msg = client.readMessageFromPlatform();
if(msg==null)
{
try
{
/**
* 没有读到数据,休息500毫秒
*/
Thread.sleep(500l);
}
catch(Exception e)
{
e.printStackTrace();
break ;
}
}
else
{
client.handMessage(msg);
}
}
System.out.println("stop receiver ok");
}
}
/**
* 检测包
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author lijz@gctech.com.cn
* @version 1.0
*/
class ActiveTester
extends Thread
{
PlatformAPI client = null;
public boolean isRun = true;
public ActiveTester(PlatformAPI client)
{
this.client = client;
}
int interval = 5 * 1000;
public void run()
{
System.out.println("start activity test ok");
while(isRun)
{
try
{
Thread.sleep(interval);
}
catch(InterruptedException ex)
{
System.out.println("Activity Error:"+ex.getMessage());
}
try
{
client.activityTest();
}
catch(Exception ex)
{
System.out.println("Activity Error"+ex.getMessage());
}
}
System.out.println("stop activity test ok");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -