⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmpplinker.java

📁 短信系统SMS:支持普通短信、长短信和wap push短信的发送。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package com.ekun.common.sms;

/**
 * <p>Title: </p>
 *
 * <p>Description: </p>
 *
 * <p>Copyright: Copyright (c) 2006</p>
 *
 * <p>Company: </p>
 *
 * @author ekun
 * @version 1.0
 */
import java.io.*;

import com.ekun.common.log.*;
import com.ekun.sms.cmpp2.*;
import com.ekun.sms.common.*;
import java.util.ArrayList;
import java.util.Properties;

public class CmppLinker
{
  private static Logger logger = LoggerFactory.getLogger(CmppLinker.class);
  //是否返回流量控制错(submitResponse)
  private String iniFileName = ""; //配置文件名
  private static int SUBMIT_RETRY_TIMES = 2; //submit重试次数
  private CmppApis cm;
  private CmppMsgHandler processor;
  private SlipWindowCtrl slipCtrl;
  private int socketTimeOut = 10 * 1000; //socket超时单位:毫秒!
  //用于限定每秒发送多少条短信!
  private static int SubmitPreSecond = 150;
  private RecvTrd recvTrd = null;
  private ReSendTrd reSendTrd = null;
  private boolean linkerTerminal = false;
  private boolean isTooFaster = false;
  private static CmppLinker instance = null;
  private Object sockSendLock = new Object();
  private Object reConnLock = new Object();
  private boolean isWaitForConn = false;
  public String sendPort;
  /**
   * 内部类,实现数据的收取,同时维护Socket连接!
   */
  private class RecvTrd
      extends Thread
  {
    long oldTime = System.currentTimeMillis();
    private int sendActive(int spaceSecond)
    {
      int nRet = 0;
      if ( (System.currentTimeMillis() - oldTime) > spaceSecond)
      {
        nRet = sendOtherPacket(new RecCActiveTest());
        oldTime = System.currentTimeMillis();
      }
      return nRet;
    }

    public void run()
    {
      int nRet = -1;
      RecCCmppPacket packet = new RecCCmppPacket();
      while (!linkerTerminal)
      {
        try
        {
          nRet = cm.nCMPP_RecvPacket(packet);
          if (nRet >= 0)
            parseRcvPacket(packet, nRet);
          else if (0 != sendActive( -1))
          {
            reOpen();
          }
          //如果发送线程还在等待,则通知他!
          if (isWaitForConn)
          {
            synchronized (reConnLock)
            {
              reConnLock.notifyAll();
            }
          }
        }
        catch(Exception ex)
        {
          logger.error(ex);
        }
      }
      logger.error("严重错误:Exit CMPP Recv Thread!!!");
      System.out.println("Exit CMPP Recv Thread!!!");
    }
  }

  private class ReSendTrd
      extends Thread
  {
    public void run()
    {
      while (!linkerTerminal)
      {
        //获取所有需要重发的数据
        ArrayList pends = slipCtrl.getPending();
        if (pends.size() <= 0)
        {
          try
          {
            sleep(1000);
          }
          catch (InterruptedException ie)
          {}
        }
        java.util.Iterator it = pends.iterator();
        while (it.hasNext())
        {
          SmsInfo smsInfo = (SmsInfo) it.next();
          //判断是否无效请求!
          if ( (smsInfo.SeqID != -999) || (smsInfo.DBMsgTo.trim().length() != 0))
          {
            logger.info("CMPP重发阻塞数据:" + smsInfo);
            sendSubmit(smsInfo);
          }
          else
          {
            logger.info("CMPP重发阻塞数据,未匹配的Response:" + smsInfo);
          }
        }
      }
    }
  }

  /**
   * 默认构造函数
   */
  private CmppLinker()
  {
    processor = null;
    cm = new CmppApis();
    linkerTerminal = false;
    slipCtrl = new SlipWindowCtrl();
  }

  public static CmppLinker getInstance()
  {
    synchronized (CmppLinker.class)
    {
      if (instance == null)
      {
        instance = new CmppLinker();
      }
    }
    return instance;
  }

  /**
   * 注册消息处理器,目前先处理一个,以后可添加多个!
   * @param processor:消息处理器
   */
  public void setProcessor(CmppMsgHandler processor)
  {
    this.processor = processor;
  }

  /**
   * 打开连接,开始服务
   */
  public void reOpen()
  {
    logger.debug("CMPPLinker begin reOpen...");
    connectToISMG();
    slipCtrl.setSlipWindowSize(cm.sendSpeed);
    System.out.println("连接ISMG成功!");
    logger.debug("Connect ISMG successed!");
    if (recvTrd == null)
    {
      recvTrd = new RecvTrd();
      recvTrd.setName("CMPPLinker.RecvTrd");
      recvTrd.start();
    }
    if (reSendTrd == null)
    {
      reSendTrd = new ReSendTrd();
      reSendTrd.setName("CMPPLinker.ReSendTrd");
      reSendTrd.start();
    }
    synchronized (reConnLock)
    {
      reConnLock.notifyAll();
    }
    logger.debug("CMPPLinker reOpen Complete!");
  }

  /**
   * 执行清除动作,关闭线程及对象!
   */
  public void cleanup()
  {
    linkerTerminal = true;
    cm.nCMPP_Terminate();
  }

  /**
   * 解析ISMG下发的数据包
   * @param packet
   */
  private void parseRcvPacket(RecCCmppPacket packet, int nRet)
  {
    if (processor == null)
      return;
    try
    {
      switch (packet.nCmdId)
      {
        case CmppPacket.CMPP_Deliver:
          if ( (nRet == 0) && (processor != null))
          {
            RecCDeliverRep recDeliverRep = new RecCDeliverRep();
            recDeliverRep.nSeqId = packet.recDeliver.nSeqId;
            recDeliverRep.usMsgId = new byte[8];
            System.arraycopy(packet.recDeliver.usMsgId, 0,
                             recDeliverRep.usMsgId, 0, 8);
            recDeliverRep.ucResult = 0;
            sendOtherPacket(recDeliverRep);
            processor.onCmppMO(packet.recDeliver);
          }
          else if ( (nRet == 1) && (processor != null))
          {
            processor.onRcvReceipt(packet.recReceipt);
          }
          break;
        case CmppPacket.CMPP_Submit_REP:
          //1、得到response的SeqID,用以匹配等待对象!
          String key = String.valueOf(nRet);
          //2、从滑动窗口中获得等待对象
          SmsInfo respSmsInfo = new SmsInfo();
          respSmsInfo.SMSMsgID = BuffMemoUtil.getMsgId(packet.recSubmitRep.
                         usMsgId);
          respSmsInfo.SendResult = packet.recSubmitRep.ucResult;
          SmsInfo smsInfo = (SmsInfo) slipCtrl.popPending(key, respSmsInfo);
          if ( (processor != null) && (smsInfo != null))
                {
                //3、赋SMSID,以便后续状态报告时匹配之用!
                smsInfo.SMSMsgID = respSmsInfo.SMSMsgID;
                smsInfo.SendResult = respSmsInfo.SendResult;
                //4、触发onMT消息
                processor.onCmppMT(smsInfo);
            }
          if (packet.recSubmitRep.ucResult == 8)
          {
            logger.info("ISMG 返回流量控制错误!");
            isTooFaster = true;
          }
          break;
        case CmppPacket.CMPP_Active_Test:
          RecCActiveTestRep precActiveRep = new RecCActiveTestRep();
          precActiveRep.nSeqId = packet.recActiveTest.nSeqId;
          precActiveRep.ucSuccId = 0;
          synchronized (sockSendLock)
          {
            cm.nCMPP_SendActiveRep(precActiveRep);
          }
          break;
        case CmppPacket.CMPP_Terminate:
          RecCTerminateRep precPkg = new RecCTerminateRep();
          precPkg.nSeqId = packet.recTerminate.nSeqId;
          synchronized (sockSendLock)
          {
            cm.nCMPP_ReplyTerminate(precPkg);
          }
          break;
        default:
          ;
      }
    }
    catch (Exception e)
    {
      logger.error("doRcvPacket Error!", e);
    }
  }

  /**
   * 读取配置参数
   * @return boolean
   */
  private boolean loadConfig()
  {
    Properties prop = new Properties();
    if ( (iniFileName == null) || (iniFileName.equals("")))
    {
      logger.error("找不到CMPP API 配置文件!");
      return false;
    }
    try
    {
      prop.load(new FileInputStream(iniFileName));
    }
    catch (IOException e)
    {
      logger.error("CMPP API 配置文件打开出错!");
      return false;
    }

    cm.SPID = prop.getProperty("SP_ID", "").trim();
    cm.SPPassword = prop.getProperty("SP_Pwd", "").trim();
    cm.CorpID = prop.getProperty("CorpID", "").trim();
    cm.hostIp = prop.getProperty("HostAddr", "").trim();
    sendPort = prop.getProperty("SendPort").trim();
    String sPort = prop.getProperty("HostPort", "").trim();
    String sSendSpeed = prop.getProperty("SendSpeed", "60").trim();

    if (cm.SPID.equals("") || cm.SPPassword.equals("") || cm.CorpID.equals(""))
    {
      logger.error("CMPP API 配置文件读取错误: SP_ID/SP_Pwd/CorpID参数读取失败!");
      return false;
    }
    try
    {
      cm.port = Integer.parseInt(sPort.trim());
      cm.sendSpeed = Integer.parseInt(sSendSpeed.trim());
      cm.hostIp = cm.hostIp.trim();
    }
    catch (NumberFormatException e)
    {
      logger.error("CMPP API 配置文件读取错误: HostAddr/HostPort/SendSpeed参数读取失败!");
      return false;
    }
    String smaxMsgAscii = prop.getProperty("MaxMsgLenAscii", "");
    String smaxMsgOther = prop.getProperty("MaxMsgLenOther", "");
    try
    {
      cm.maxMsgLenAscii = Integer.parseInt(smaxMsgAscii.trim());
      cm.maxMsgLenOther = Integer.parseInt(smaxMsgOther.trim());
      if (cm.maxMsgLenAscii > 160)
        cm.maxMsgLenAscii = 159;
      if (cm.maxMsgLenOther > 140)
        cm.maxMsgLenOther = 140;
    }
    catch (NumberFormatException e)
    {
      cm.maxMsgLenAscii = 159;
      cm.maxMsgLenOther = 140;
      logger.error("CMPP API 配置文件读取错误: MaxMsgLenAscii/MaxMsgLenOther参数读取失败!");
      return false;
    }
    logger.debug("ConnInfo:\nSPID:\t" + cm.SPID +
                 "\nCorpID:\t" + cm.CorpID +
                 "\nSPPWD:\t" + cm.SPPassword +
                 "\nHOSTIP:\t" + cm.hostIp +
                 "\nPORT:\t" + cm.port +
                 "\nSendSpeed:\t" + cm.sendSpeed);
    return true;
  }

  /**
   * 发送CMPP_CONNECT请求
   * @return:连接情况
   */
  private void connectToISMG()
  {
    /*每次读取配置文件,保证动态加载*/
    while (!linkerTerminal)
    {
      disconnectToISMG();
      if (!loadConfig())
      {
        try
        {
          Thread.currentThread().sleep(1000);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -