⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 platformapi.java

📁 采用JAVA开发
💻 JAVA
字号:
package com.gctech.sms.sp.cms.core2;

import java.util.*;
import java.net.*;
import java.io.*;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.apache.log4j.Logger;
import com.gctech.sms.sp.cms.msg2.*;
import com.gctech.sms.sp.cms.util.*;

/**
 * 用于与平台打交道的API
 *
 * http://211.147.4.156 linbo linbo 参考
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author lijz@gctech.com.cn
 * @version 1.0
 */
public class PlatformAPI {
	String spId;
	String secret;
	String host;
	int port;
	Socket socket = null;
	InputStream in = null;
	OutputStream out = null;
	PersistQueue squeue = new PersistQueue(false);
	//  PooledExecutor msgExe = new PooledExecutor(5);
	LoginMessage loginMessage = null;
	static int seq_start;
	Map seqMap = Collections.synchronizedMap(new HashMap(20));
	ActiveTester activeTester = null;
	ActiveTestMessage testMessage = new ActiveTestMessage();
	MessageSender sender = null;
	MessageReveiver receiver = null;
	List connectlistenerList = Collections.synchronizedList(new ArrayList(3));
	List recvListenerList = Collections.synchronizedList(new ArrayList(3));
	List sendedListenerList = Collections.synchronizedList(new ArrayList(3));
	static Logger logger = Logger.getLogger(PlatformAPI.class);
	PooledExecutor pool;
	private boolean isConnected = false;
	public boolean isStop = false;

	public PlatformAPI(String host, int port) {
		this.host = host;
		this.port = port;
		pool = new PooledExecutor(50);
		pool.runWhenBlocked();
	}

	/**
	 * 设置pool池,用于处理消息接收
	 * @param pool
	 */
	public void setPooledExecutor(PooledExecutor pool) {
		this.pool = pool;
	}

	public int relogin() {

		isStop = true;
		isConnected = false;
		//    logout();
		//    if(this.activeTester != null)
		//    {
		//      activeTester.interrupt();
		//
		//    }
		//    if(this.sender != null)
		//    {
		//      sender.interrupt();
		//
		//    }
		//
		//    if(this.receiver != null)
		//    {
		//      receiver.interrupt();
		//
		//    }

		logger.debug("try to relogin");
		return login(spId, secret);
	}

	/**
	 *
	 * @param mesg
	 * @return 0,成功;1,验正没有通过 2,平台不可连 3,其他错误
	 */
	public int login(String spId, String secret) {

		isConnected = false;

		this.spId = spId;
		this.secret = secret;
		try {
			socket = new Socket(host, port);
			in = socket.getInputStream();
			out = socket.getOutputStream();
			if (logger.isDebugEnabled())
				logger.debug("connect " + host + ":" + port + " ok!");

			loginMessage = new LoginMessage(spId, secret);
			out.write(loginMessage.toBytes());
			logger.debug("send loginMessage ok");
			byte[] temp = new byte[4];
			in.read(temp); //读取消息长度
			int length = TypesTools.byte2int(temp);
			if (length != LoginResMessage.MESSAGE_LENGTH) {
				socket.close();
				return 2;
			}
			byte[] bytes = new byte[LoginResMessage.MESSAGE_LENGTH - 4];
			int expectLength = in.read(bytes);
			if (expectLength != bytes.length) {
				socket.close();
				return 2;
			}

			LoginResMessage resMsg = LoginResMessage.createMessage(bytes);
			byte[] serverMd5 = resMsg.getMd5();
			//generate md5 to verify
			ProtocolBuffer pb = ProtocolBuffer.create(1 + 16 + secret.length());
			//      pb.appendInt(resMsg.commandStatus);
			pb.appendByte((byte) resMsg.commandStatus);
			byte[] clientMd5 = loginMessage.getMD5();
			pb.appendBytes(clientMd5);
			pb.appendString(secret);
			logger.debug("before md5 :" + TypesTools.byteArrayToHexString(pb.toBytes()));
			;
			MD5 md5 = new MD5();
			byte[] verifyServerMd5 = md5.getMD5ofBytes(pb.toBytes(), pb.toBytes().length);
			logger.debug("after md5 :" + TypesTools.byteArrayToHexString(verifyServerMd5));
			for (int i = 0; i < verifyServerMd5.length; i++) {
				if (verifyServerMd5[i] != serverMd5[i]) {
					//shoud tace detail infomation
					socket.close();
					return 1;
				}
			}
			logger.debug("read loginRsMessage ok");
			//login ok
			// 心跳包
			isStop = false;
			activeTester = new ActiveTester(this);
			activeTester.start();
			//从队列中取出消息,发送消息到平台
			sender = new MessageSender(this);
			sender.start();
			//从平台接收消息,然后交给listener处理
			receiver = new MessageReveiver(this);
			receiver.start();
			isConnected = true;
			logger.debug("PlatformAPI startup ok");

		} catch (Exception ex) {
			ex.printStackTrace();
			logger.fatal(ex.getMessage());
			return 3;
		}
		return 0;
	}

	/**
	 * 重新连接
	 * @param l
	 * @param max
	 */
	public void addReconnectListener(ReconnectListener l) {
		connectlistenerList.add(l);
	}

	public void removeReconnectListener(ReconnectListener l) {
		connectlistenerList.remove(l);
	}

	/**
	 * logout,此方法实现的太粗糙
	 * @throws PlatformException
	 */
	public void logout() {
		//add "close message" to platform
		//close queue
		//close sokect until all message are handed
		//return ok
		try {

			this.sendToPlatform(new ExitMessage());
			Thread.sleep(1000);
			isConnected = false;
			isStop = true;
			if (socket != null)
				this.socket.close();
			logger.info("close socket");
			pool.shutdownNow();
			if (activeTester != null)
				activeTester.interrupt();
			if (sender != null)
				this.sender.interrupt();
			if (receiver != null)
				this.receiver.interrupt();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 清除
	 * @throws PlatformException
	 */
	//  public void clear()
	//      throws PlatformException
	//  {
	//    try
	//    {
	//      socket.close();
	//    }
	//    catch(Exception ex)
	//    {
	//     throw new PlatformException(ex.getMessage()) ;
	//    }
	//  }

	/**
	 * 异步发送消息
	 * @param msg
	 */
	public void addMessage(Message msg) throws InterruptedException {
		if (!isConnected)
			throw new IllegalStateException("没有连接到短信平台,不能发送消息");
		squeue.putMessage(msg);
	}

	public boolean isConnected() {
		return this.isConnected;
	}

	/**
	 * 增加接收消息listern
	 * @param rv
	 */
	public void addRecevierListener(RecevierListener rv) {
		recvListenerList.add(rv);
	}

	public void addSendedListener(SendedLinstener sl) {
		sendedListenerList.add(sl);
	}

	/**
	 * 删除消息listener
	 * @param rv
	 */
	public void removeRecevierListener(RecevierListener rv) {
		recvListenerList.remove(rv);
	}

	/**
	 * 发送消息到平台,
	 * @param msg
	 * @return
	 */
	protected void sendToPlatform(Message msg) throws PlatformException {
		try {
			//      if(logger.isDebugEnabled())logger.debug("send message to platform type is "+msg.commandId);

			//      if(msg.commandId==Message.SUBMIT_REQ)
			//      {
			//        seqMap.put(new Integer(msg.sequenceId),msg);
			//      }
			byte[] bs = msg.toBytes();
			out.write(bs);
			SendedLinstener sl = null;
			for (int i = 0; i < this.sendedListenerList.size(); i++) {
				sl = (SendedLinstener) sendedListenerList.get(i);

				MessageSndExecutor executor = new MessageSndExecutor(msg, sl);
				try {
					pool.execute(executor);
				} catch (InterruptedException ex) {
					logger.debug("Interrupte :Send Messag to Platform ");
					return;
				}

			}

		} catch (IOException ioe) {

			throw new PlatformException(ioe.getMessage());
		}

	}

	/**
	 * 从平台获取消息,如果没有,则返回null
	 * @return
	 */
	protected Message readMessageFromPlatform() throws InterruptedIOException {
		if (this.chekSocket()) {
			//网络出现的各种问题,以及读到各种数据
			return Message.create(in);
		} else {
			return null;
		}
	}

	protected void handMessage(Message msg) {
		//if(msg is replyMessage) then ignore
		//可能性能问题,可能会导致choke//tbd
		//不处理心跳包,submit 的相应信息
		if (msg.commandId == Message.ACTIVE_TEST_RES || msg.commandId == Message.SUBMIT_RES) {
			//      logger.debug("receive response "+msg.commandId);
			return;
		}

		if (msg.commandId == Message.DELIVER) {
			DeliverResMessage res = new DeliverResMessage();
			res.sequenceId = res.sequenceId;
			//message id改怎么写
			res.messageId = ((DeliverMessage) msg).messageId;
			try {
				this.addMessage(res);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
				return;
			}

			RecevierListener rl = null;
			for (int i = 0; i < this.recvListenerList.size(); i++) {
				rl = (RecevierListener) recvListenerList.get(i);
				//rl.handMessage(msg);这个语句与下面的语句哪个更好呢

				MessageRevExecutor executor = new MessageRevExecutor(msg, rl);
				try {
					pool.execute(executor);
				} catch (Exception ex) {
					ex.printStackTrace();
				}

			}

		}

	}

	protected void activityTest() {
		try {

			this.sendToPlatform(testMessage);

		} catch (Exception ex) {
			System.out.println(ex.getClass());
			reconnect();
		}
		//    if(chekSocket()) //checkSocket 错误,无法检测到服务器断是否可用
		//    {
		//
		//      try
		//      {
		//        this.addMessage(testMessage);
		//      }
		//      catch(Exception ex)
		//      {
		//        logger.fatal("can not send testMessage "+ex.getMessage());
		//      }
		//
		//    }
		//    else
		//    {
		//      System.out.println("网络连接出现问题,无法发送心跳包,");
		//      reconnect();
		//    }
	}

	protected boolean chekSocket() //不能检测服务是否连接正确
	{
		return socket != null;

	}

	protected void reconnect() {
		if (isStop) {
			logger.info("gate way hava stoped ,can not reconnect!");
			return;
		}

		ReconnectListener rcl = null;
		System.out.print("try reconnect...");
		while (true) {
			System.out.print("connect...");
			int result = relogin();
			if (result != 0) {
				try {
					System.out.println("falure");
					Thread.sleep(3000);
				} catch (Exception ex) {

				}

			} else {
				System.out.println("ok");
			}

			for (int i = 0; i < this.connectlistenerList.size(); i++) {
				rcl = (ReconnectListener) connectlistenerList.get(i);
				rcl.hand(this, result);
			}

			if (result == 0) {
				break;
			}
		}
		isConnected = true;
	}

	public static void main(String[] args) {
		//    PlatformAPI platformAPI1 = new PlatformAPI();
		logger.info("test!");
	}

}

/**
 * 发送消息
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author lijz@gctech.com.cn
 * @version 1.0
 */
class MessageSender extends Thread {
	PlatformAPI client = null;
	//  public  boolean isRun = true;
	public MessageSender(PlatformAPI client) {
		this.client = client;
	}

	public void run() {
		Message msg = null;
		Message reply = null;
		client.logger.info("start sender ok");
		while ((!client.isStop) && (!interrupted())) {
			//      System.out.println("test sender ");
			if (client.chekSocket()) {
				try {

					msg = client.squeue.takeMessage(500);

					if (msg == null)
						continue;
					else {
						client.sendToPlatform(msg);
						System.out.println("msg:" + msg.commandId);
					}

				} catch (InterruptedException pe) {

					client.logger.info("Interrupt MessageSender");
					break;
				} catch (PlatformException fe) {
					fe.printStackTrace();
					continue;
				}

			} else {

				try {
					// activtiy tester will reconnect();
					Thread.sleep(5 * 1000);
				} catch (InterruptedException ex) {
					client.logger.info("Interrupt MessageSender");
					break;
				}

			}
		}
		client.logger.info("stop sender ok");

	}
}

class MessageReveiver extends Thread {
	PlatformAPI client = null;
	//  public  boolean isRun = true;
	public MessageReveiver(PlatformAPI client) {
		this.client = client;

	}

	public void run() {
		client.logger.info("start receiver ok");
		Message msg = null;
		while ((!client.isStop) && (!interrupted())) {
			//       System.out.println("test receiver ");
			try {
				msg = client.readMessageFromPlatform();
				if (msg == null) {

					/**
					 * 没有读到数据,休息500毫秒
					 */

					Thread.sleep(500l);

				} else {
					client.handMessage(msg);
				}
			} catch (InterruptedIOException ioe) {
				client.logger.info("Interrupt MessageReceive");
				break;
			} catch (InterruptedException e) {
				client.logger.info("Interrupt MessageReceive");
				break;

			}
		}
		client.logger.info("Stop MessageReceive Ok");

	}
}

/**
 * 检测包
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author lijz@gctech.com.cn
 * @version 1.0
 */
class ActiveTester extends Thread {

	PlatformAPI client = null;

	public ActiveTester(PlatformAPI client) {
		this.client = client;
	}

	int interval = 5 * 1000;
	public void run() {
		client.logger.info("start activity test ok");
		while ((!client.isStop) && (!interrupted())) {
			//       System.out.println("test activet ");
			try {

				Thread.sleep(interval);
			} catch (InterruptedException ex) {
				client.logger.info("Interrupt Activity");
				break;
			}

			client.activityTest();
		}
		client.logger.info("Stop activity test ok");
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -