⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 .#platformapi.java.1.7

📁 采用JAVA开发
💻 7
字号:
package com.gctech.sms.sp.cms.core;

import java.util.*;
import java.net.*;
import java.io.*;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.apache.log4j.Logger;
import com.gctech.sms.sp.cms.msg.*;
import com.gctech.sms.sp.cms.util.*;



/**
 * 用于与平台打交道的API
 *
 * http://211.147.4.156 linbo linbo 参考
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author lijz@gctech.com.cn
 * @version 1.0
 */
public class PlatformAPI
{
  String spId;
  String secret;
  String host;
  int port;
  Socket socket = null;
  InputStream in = null;
  OutputStream out = null;
  PersistQueue squeue = new PersistQueue(false);
//  PooledExecutor msgExe = new PooledExecutor(5);
  LoginMessage loginMessage = null;
  static int seq_start;
  Map seqMap = Collections.synchronizedMap(new HashMap(20));
  ActiveTester activeTester = null;
  ActiveTestMessage testMessage = new ActiveTestMessage();
  MessageSender sender  = null;
  MessageReveiver receiver = null;
  List connectlistenerList = Collections.synchronizedList(new ArrayList(3)) ;
  List recvListenerList = Collections.synchronizedList(new ArrayList(3)) ;
  static Logger logger = Logger.getLogger(PlatformAPI.class);
  PooledExecutor pool ;
  private boolean isConnected = false;



  public PlatformAPI(String host,int port)
  {
    this.host = host;
    this.port = port;
    pool = new PooledExecutor(50);
    pool.runWhenBlocked();
  }

  /**
   * 设置pool池,用于处理消息接收
   * @param pool
   */
  public void setPooledExecutor(PooledExecutor pool)
  {
    this.pool = pool ;
  }
  public int relogin()
  {

    isConnected = false;
    if(this.activeTester!=null)
    {
      activeTester.interrupt();
      activeTester.isRun = false ;
    }
    if(this.sender!=null)
    {
      sender.interrupt();
      sender.isRun = false;
    }

    if(this.receiver!=null)
    {
      receiver.interrupt();
      receiver.isRun = false;
    }

    logger.debug("try to relogin");
    return login(spId,secret);
  }


  /**
   *
   * @param mesg
   * @return 0,成功;1,验正没有通过 2,平台不可连 3,其他错误
   */
  public int login(String spId,String secret)
  {

    isConnected = false;
    this.spId = spId;
    this.secret = secret;
    try
    {
      socket = new Socket(host,port);
      in = socket.getInputStream();
      out = socket.getOutputStream();
      if(logger.isDebugEnabled())logger.debug("connect "+host+":"+port+" ok!");

      loginMessage = new LoginMessage(spId,secret);
      out.write(loginMessage.toBytes());
      byte[] temp = new byte[4];
      in.read(temp); //读取消息长度
      int length = TypesTools.byte2int(temp);
      if(length != LoginResMessage.MESSAGE_LENGTH)
      {
        socket.close();
        return 2;
      }
      byte[] bytes = new byte[LoginResMessage.MESSAGE_LENGTH - 4];
      int expectLength = in.read(bytes);
      if(expectLength != bytes.length)
      {
        socket.close();
        return 2;
      }

      LoginResMessage resMsg = LoginResMessage.createMessage(bytes);
      byte[] serverMd5 = resMsg.getMd5();
      //generate md5 to verify
      ProtocolBuffer pb = ProtocolBuffer.create(1 + 16 + secret.length());
//      pb.appendInt(resMsg.commandStatus);
      pb.appendByte((byte)resMsg.commandStatus);
      byte[] clientMd5 = loginMessage.getMD5();
      pb.appendBytes(clientMd5);
      pb.appendString(secret);
//      logger.debug("before md5 :"+TypesTools.byteArrayToHexString(pb.toBytes()));;
      MD5 md5 = new MD5();
      byte[] verifyServerMd5 = md5.getMD5ofBytes(pb.toBytes(),
                                                 pb.toBytes().length);
//      logger.debug("after md5 :"+TypesTools.byteArrayToHexString(verifyServerMd5));
      for(int i = 0;i < verifyServerMd5.length;i++)
      {
        if(verifyServerMd5[i] != serverMd5[i])
        {
          //shoud tace detail infomation
          socket.close();
          return 1;
        }
      }
      //login ok
      // 心跳包
      activeTester = new ActiveTester(this);
      activeTester.start();
      //从队列中取出消息,发送消息到平台
      sender = new MessageSender(this);
      sender.start();
      //从平台接收消息,然后交给listener处理
      receiver = new MessageReveiver(this);
      receiver.start();
      isConnected = true;

    }
    catch(Exception ex)
    {

      logger.fatal(ex.getMessage());
      return 3;
    }
    return 0;
  }

  /**
   * 重新连接
   * @param l
   * @param max
   */
  public void addReconnectListener(ReconnectListener l)
  {
    connectlistenerList.add(l);
  }
  public void removeReconnectListener(ReconnectListener l)
  {
    connectlistenerList.remove(l);
  }

  /**
   * logout,此方法实现的太粗糙
   * @throws PlatformException
   */
  public void logout()

  {
    //add "close message" to platform
    //close queue
    //close sokect until all message are handed
    //return ok
    try
    {
      isConnected = false;
      if(socket!=null)this.socket.close();
      pool.shutdownNow();
    }
    catch(Exception e)
    {
      e.printStackTrace();
    }
  }

  /**
   * 清除
   * @throws PlatformException
   */
//  public void clear()
//      throws PlatformException
//  {
//    try
//    {
//      socket.close();
//    }
//    catch(Exception ex)
//    {
//     throw new PlatformException(ex.getMessage()) ;
//    }
//  }

  /**
   * 异步发送消息
   * @param msg
   */
  public void addMessage(Message msg) throws PlatformException
  {
    if(!isConnected) throw new PlatformException("没有连接到短信平台,不能发送消息");
    squeue.putMessage(msg);
  }

  public boolean isConnected()
  {
    return this.isConnected;
  }



  /**
   * 增加接收消息listern
   * @param rv
   */
  public void addRecevierListener(RecevierListener rv)
  {
    recvListenerList.add(rv);
  }

  /**
   * 删除消息listener
   * @param rv
   */
  public void removeRecevierListener(RecevierListener rv)
  {
    recvListenerList.remove(rv);
  }





  /**
   * 发送消息到平台,
   * @param msg
   * @return
   */
  protected void sendToPlatform(Message msg) throws PlatformException
  {
    try
    {
//      if(logger.isDebugEnabled())logger.debug("send message to platform type is "+msg.commandId);

//      if(msg.commandId==Message.SUBMIT_REQ)
//      {
//        seqMap.put(new Integer(msg.sequenceId),msg);
//      }
      byte[] bs = msg.toBytes();
      out.write(bs);

    }
    catch(IOException ioe)
    {

      throw new PlatformException(ioe.getMessage());
    }


  }

  /**
   * 从平台获取消息,如果没有,则返回null
   * @return
   */
  protected Message readMessageFromPlatform()
  {
    if(this.chekSocket())
    {
      //网络出现的各种问题,以及读到各种数据
       return Message.create(in);
    }
    else
    {
      return null;
    }
  }


  protected void handMessage(Message msg)
  {
    //if(msg is replyMessage) then ignore
    //可能性能问题,可能会导致choke//tbd
    //不处理心跳包,submit 的相应信息
    if(msg.commandId==Message.ACTIVE_TEST_RES||msg.commandId==Message.SUBMIT_RES) return ;
    if(msg.commandId==Message.DELIVER)
    {
      DeliverResMessage res = new DeliverResMessage();
      res.sequenceId = res.sequenceId;
      //message id改怎么写
      res.messageId = ((DeliverMessage)msg).messageId;
      try
      {
        this.addMessage(res);
      }
      catch(Exception ex)
      {
        ex.printStackTrace();
        //continue;
      }

      RecevierListener rl = null;
      for(int i = 0;i < this.recvListenerList.size();i++)
      {
        rl = (RecevierListener) recvListenerList.get(i);
        //rl.handMessage(msg);这个语句与下面的语句哪个更好呢

        MessageExecutor executor = new MessageExecutor(msg,rl);
        try
        {
          pool.execute(executor);
        }
        catch(Exception ex)
        {
          ex.printStackTrace();
        }


      }

    }


  }



  protected void activityTest()
  {
    try
    {
      this.sendToPlatform(testMessage);
    }
    catch(Exception ex)
    {
      System.out.println(ex.getClass());
      reconnect();
    }
//    if(chekSocket()) //checkSocket 错误,无法检测到服务器断是否可用
//    {
//
//      try
//      {
//        this.addMessage(testMessage);
//      }
//      catch(Exception ex)
//      {
//        logger.fatal("can not send testMessage "+ex.getMessage());
//      }
//
//    }
//    else
//    {
//      System.out.println("网络连接出现问题,无法发送心跳包,");
//      reconnect();
//    }
  }

  protected boolean  chekSocket() //不能检测服务是否连接正确
  {
    return socket != null && socket.isConnected();

  }



  protected void reconnect()
  {
    isConnected = false;
    ReconnectListener rcl = null;
    System.out.print("准备重新连接...");
    while(true)
    {
      System.out.print("重新连接...");
      int result = relogin();
      if(result!=0)
      {
        try
        {
          System.out.println("falure");
          Thread.sleep(3000);
        }
        catch(Exception ex)
        {

        }

      }
      else
      {
        System.out.println("ok");
      }

      for(int i=0;i<this.connectlistenerList.size();i++)
      {
        rcl  = (ReconnectListener)connectlistenerList.get(i);
        rcl.hand(this,result);
      }

      if(result==0)
      {
        break;
      }
    }
    isConnected = true;
  }

  public static void main(String[] args)
  {
//    PlatformAPI platformAPI1 = new PlatformAPI();
  }

}
/**
 * 发送消息
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author lijz@gctech.com.cn
 * @version 1.0
 */
class MessageSender extends Thread
{
  PlatformAPI client = null;
  public  boolean isRun = true;
  public MessageSender(PlatformAPI client)
  {
    this.client = client;
  }

  public void run()
  {
    Message msg = null;
    Message reply = null;
    System.out.println("start sender ok");
    while(isRun)
    {
      if(client.chekSocket())
      {
        try
        {

          msg = client.squeue.takeMessage(500);
          if(msg==null) continue;
          else client.sendToPlatform(msg);
        }
        catch(PlatformException pe)
        {
          pe.printStackTrace();
          break ;
        }

      }
      else
      {

        try
        {
          // activtiy tester will reconnect();
          Thread.sleep(5 * 1000);
        }
        catch(InterruptedException ex)
        {

        }

      }
    }
    System.out.println("stop sender  ok");

  }
}

class MessageReveiver extends Thread
{
  PlatformAPI client = null;
  public  boolean isRun = true;
  public MessageReveiver(PlatformAPI client)
  {
    this.client = client;

  }

  public void run()
  {
    System.out.println("start receiver ok");
    Message msg = null;
    while(isRun&&client.chekSocket())
    {
      msg = client.readMessageFromPlatform();
      if(msg==null)
      {
        try
        {
          /**
           * 没有读到数据,休息500毫秒
           */

          Thread.sleep(500l);
        }
        catch(Exception e)
        {
          e.printStackTrace();
          break ;
        }
      }
      else
      {
        client.handMessage(msg);
      }
    }
     System.out.println("stop receiver  ok");
  }
}

/**
 * 检测包
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author lijz@gctech.com.cn
 * @version 1.0
 */
class ActiveTester
     extends Thread
{

  PlatformAPI client = null;
  public  boolean isRun = true;
  public ActiveTester(PlatformAPI client)
  {
    this.client = client;
  }
  int interval = 5 * 1000;
  public void run()
  {
    System.out.println("start activity test ok");
    while(isRun)
    {
      try
      {

        Thread.sleep(interval);
      }
      catch(InterruptedException ex)
      {
        System.out.println("Activity Error:"+ex.getMessage());
      }
      try
      {
        client.activityTest();
      }
      catch(Exception ex)
      {
        System.out.println("Activity Error"+ex.getMessage());
      }

    }
    System.out.println("stop activity test ok");
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -