⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 objectdbsender.java

📁 中国联通短信通信协议
💻 JAVA
字号:
package com.wireless.sms.gwif.smsagent.workthread;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.Hashtable;
import java.util.Map;
import java.util.Vector;

import org.apache.log4j.xml.DOMConfigurator;

import com.wireless.gwif.socketconn.Client;
import com.wireless.gwif.socketconn.ClientDefaultSendTemplet;
import com.wireless.gwif.socketconn.ObjectSendAdapter;
import com.wireless.sms.gwif.smsagent.global.LoggerConstant;
import com.wireless.sms.gwif.smsagent.global.SmsGWIFGlobal;
import com.wireless.sms.pub.entity.MT;
import com.wireless.sms.pub.entity.Stat;
import com.wireless.sms.pub.mq.ObjectQueue;

public class ObjectDbSender {

    private static ObjectQueue objqueue = ObjectQueue.getInstance();
    private static Map map = new Hashtable();
    private static Vector vosa = new Vector();
    private static boolean startFlag = false;
    private static long INTERVAL = 60000;

    public static void start(String[] hosts, String[] ports) {
    	LoggerConstant.stat_log.info("启动入库线程....");
        if( startFlag )
            return;
        else
            startFlag = true;

        for(int i=0; i<hosts.length && i<ports.length; i++){
            ObjectSendAdapter osa = new ObjectSendAdapter();
            ObjectSendAdapter.Templet send = osa.new Templet() {
                public void processSend() {
                  try{
                    SendMonitor.getInstance().setSendObjFlag(true);

                    if (objqueue.size() > 0 && !this.isClosed()) {
                      Object obj = objqueue.removeNoWait();

                      if (obj == null)
                        return;

                      boolean sendFlag = send(obj);
                      if (!sendFlag) {
                        objqueue.add(obj);
                        SmsGWIFGlobal.sendMonitor("K000101");
                        log.error("gateway : " + SmsGWIFGlobal.getInstance().GATEWAY +
                                  " fail send obj to db module!");
                      }
                    }
                  }catch(Exception e){
                    log.error("Exception : ", e);
                  }
                }

                /**
                 * processException
                 * @param cause Throwable
                 */
                public void processException(Throwable cause) {
                    log.error("Exception : ", cause);

                    if (cause instanceof java.io.IOException || cause instanceof java.lang.IllegalStateException) {
                      log.info("Exception occur, restart ... " + this);
                      stopSendT();
                      if( this.idleFlag ){
                        SmsGWIFGlobal.sendMonitor("K000101");
                        restart(this);
                      }
                    }
                }

                public void processClosed() {
                  stopSendT();
                  if (this.idleFlag) {
						log.info("Connection closed, restart ... " + this);
						SmsGWIFGlobal.sendMonitor("K000101");
						restart(this);
					}
                }
            };

            send.setLog(LoggerConstant.mo_log);
            osa.start(send, hosts[i], ports[i]);
            vosa.add(osa);
        }
    }

    private synchronized static void restart(ClientDefaultSendTemplet templet){
      if( map.containsKey(templet) ){
        long time = ((Long)map.get(templet)).longValue();
        long time_interval = System.currentTimeMillis() - time;

        if( time_interval < (INTERVAL/2) ){
          LoggerConstant.mo_log.info("Same templet restart in " + (INTERVAL/2) + " milliseconds, discard ...");
          return;
        }
      }

      map.put(templet, new Long(System.currentTimeMillis()));

      final ClientDefaultSendTemplet tmpTemplet = templet;

      Thread restartThread = new Thread(){
        public void run(){
          try{
            for (int i = 0; i < vosa.size(); i++) {
              Client client = ( (ObjectSendAdapter) vosa.elementAt(i)).getClient();
              if (client.getConnTemplet() == tmpTemplet) {
                try {Thread.sleep(INTERVAL);}catch (Exception e){}
                client.restart();

                LoggerConstant.mo_log.info("Client witch has templet " + client.getConnTemplet() + " restart over ...");
                break;
              }
            }
          }catch(Exception e){
            LoggerConstant.mo_log.error("DBModule Exception", e);
          }
        }
      };

      restartThread.setDaemon(true);
      restartThread.start();
    }

    public static void stop(){

        if (!startFlag) {
          return;
        }

        try{
          for (int i = 0; i < vosa.size(); i++) {
            ObjectSendAdapter tmposa = (ObjectSendAdapter) vosa.elementAt(i);
            if (startFlag && tmposa != null) {
              tmposa.stop();
            }
          }
        }catch(Exception e){
          LoggerConstant.mo_log.error("Exception : ", e);
        }

        startFlag = false;
    }


    public static void main(String args[]){
        DOMConfigurator.configure(System.getProperty("log4j"));

        try {
            SmsGWIFGlobal agent = SmsGWIFGlobal.getInstance();
            agent.initYD_System();
        } catch (Throwable ex) {
            ex.printStackTrace();
        }

//        ClientController.start();
//        ObjectDbSender.start(new String[]{"192.168.2.85", "192.168.2.85", "192.168.2.85", "192.168.2.85" ,"192.168.2.85"}, new String[]{"9000", "9001", "9002", "9003", "9004"});
        ObjectDbSender.start(new String[]{"192.168.1.87", "192.168.1.87", "192.168.1.87", "192.168.1.87" ,"192.168.1.87"}, new String[]{"9035", "9036", "9037", "9038", "9039"});
        new Thread(){
          public void run(){
            int sum = 1000;
            for(int i=0; i<sum; i++){
                MT mt = new MT();
                mt.setMsgContent("MT 信息 : " + i + " 测试状态");
                mt.setFeeTermID("01011111112");
                mt.setMtType((i%7) + "");
//                mt.setMtType("2");

                mt.setMsgID("TestForInsertDataToDatabase " + i);
                ObjectQueue.getInstance().add(mt);
            }

            int errstatsum = 0;
            for(int i=0; i<errstatsum; i++){
                Stat stat = new Stat();
                stat.setStat("DELIVRD");
                stat.setGatewayID("999");
                stat.setMsgContent("测试信息 " + i + " 发送成功");
                stat.setMsgID("TestForInsertErrorDataToDatabase " + i);
                ObjectQueue.getInstance().add(stat);
            }

            int statsum = 1000;
            for(int i=0; i<statsum; i++){
                Stat stat = new Stat();
                stat.setStat("DELIVRD");
                stat.setGatewayID("888");
                stat.setMsgContent("测试信息 " + i + " 发送成功");
                stat.setMsgID("TestForInsertDataToDatabase " + i);
                ObjectQueue.getInstance().add(stat);
            }

          }
        }.start();




//        ObjectDbSender.start(SmsGWIFGlobal.OBJOUT_IP, SmsGWIFGlobal.OBJOUT_PORT);
//        ObjectDbSender.start(new String[]{"192.168.2.85"}, new String[]{"9000"});
//        ObjectDbSender.start(new String[]{"192.168.2.85", "192.168.2.85", "192.168.2.85", "192.168.2.85" ,"192.168.2.85"}, new String[]{"9000", "9001", "9002", "9003", "9004"});
//        ObjectDbSender.start(new String[]{"127.0.0.1"}, new String[]{"9000"});
//        ObjectDbSender.start(new String[]{
//                             "192.168.1.187",
//                             "192.168.1.187",
//                             "192.168.1.187",
//                             "192.168.1.187",
//                             "192.168.1.187"},
//                             new String[]{
//                             "9000",
//                             "9001",
//                             "9002",
//                             "9003",
//                             "9004"});
//        ObjectDbSender.start(new String[]{"192.168.1.87"}, new String[]{"8005"});
//        ObjectDbSender.start(new String[]{
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",
//                             "218.97.254.134",}, new String[]{
//                             "9000",
//                             "9005",
//                             "9010",
//                             "9015",
//                             "9020",
//                             "9025",
//                             "9030",
//                             "9035",
//                             "9040",
//                             "9045",


//        });
//        ObjectDbSender.start(new String[]{"211.94.156.185", "211.94.156.185",
//            "211.94.156.185", "211.94.156.185",
//            "211.94.156.185", "211.94.156.185",
//            "211.94.156.185", "211.94.156.185",
//            "211.94.156.185", "211.94.156.185"}, new String[]{"9000", "9001", "9002", "9003", "9004", "9005", "9006", "9007", "9008", "9009"});
//        ObjectDbSender.start(new String[]{"211.94.156.185"}, new String[]{"9000"});

//        new Thread(){
//            public void run(){
//                try{
//                    Thread.currentThread().sleep(30*1000);
//                }catch(Exception e){}
//
//                ObjectDbSender.stop();
//                ClientController.stop();
//            }
//        }.start();


    }

//  public static void main(String args[]){
//      DOMConfigurator.configure(System.getProperty("log4j"));
//
//      try {
//          SmsGWIFGlobal agent = SmsGWIFGlobal.getInstance();
//          agent.initYD_System();
//      } catch (Throwable ex) {
//          ex.printStackTrace();
//          return;
//      }
//
//      ObjectDbSender sender = new ObjectDbSender();
//      sender.doReSave();
//
//      ObjectDbSender.start(SmsGWIFGlobal.OBJOUT_IP, SmsGWIFGlobal.OBJOUT_PORT);
//  }

  public void doReSave(){
    ObjectQueue queue = ObjectQueue.getInstance();
    File file = new File("D:\\work\\sms\\javasms\\gwifnewest\\resave\\mtlog.txt");
    new ReSaveThread(queue, file, 0);
  }

  class ReSaveThread extends Thread{
    private ObjectQueue queue = null;
    private java.io.File file = null;
    private long lastModify = 0;
    private int counter = 0;

    public ReSaveThread(ObjectQueue queue,java.io.File file, long lastModify){
      this.queue = queue;
      this.file = file;
      this.lastModify = lastModify;
      start();
    }

    public void run(){
      while(true){
        if( file.lastModified() > lastModify ){
          BufferedReader reader = null;
          try{
            reader = new BufferedReader(new FileReader(file));
            String msg = null;
            while( (msg = reader.readLine() ) != null ){
              int index = msg.indexOf("<?xml");
              if( index != -1 ){
                msg = msg.substring(index);
                Object obj = null;
                if( msg.indexOf("<mt><mtID>") != -1 ){
                  obj = MT.getInstance(msg);
                }
                else{
                  obj = getStat(msg);
                }

                if( obj != null ){
                  LoggerConstant.mo_log.info("msg " + (++counter) + " : " + obj.toString());
                  queue.add(obj);

                  if( queue.size() > 5000 ){
                    try{
                      Thread.sleep(10000);
                    }catch(Exception e){}
                  }
                }
              }
              else{
                LoggerConstant.mo_log.info("当前匹配失败 : " + msg);
              }
            }

            lastModify = file.lastModified();
          }
          catch(Exception e){
            e.printStackTrace();
          }
          finally{
            if( reader != null ){
              try { reader.close(); } catch (IOException ex) { }
            }
          }
        }

        System.out.println("处理完一轮 ...");

        try{
          Thread.sleep(5000);
        }catch(Exception e){}
      }
    }


    public Stat getStat(String xmlContent){
      Stat stat = null;
      org.apache.commons.betwixt.io.BeanReader beanReader = new org.apache.commons.betwixt.io.BeanReader();
      beanReader.getXMLIntrospector().setAttributesForPrimitives(false);
      beanReader.setMatchIDs(false);
      try {
        beanReader.registerBeanClass("stat", com.wireless.sms.pub.entity.Stat.class);
        stat = (Stat) beanReader.parse(new StringReader(new String(xmlContent)));
        stat.setSubmitTime(null);
        stat.setDoneTime("");
      }
      catch (java.beans.IntrospectionException ex2) {
        ex2.printStackTrace();
      }
      catch (org.xml.sax.SAXException ex) {
        ex.printStackTrace();
      }
      catch (IOException ex) {
        ex.printStackTrace();
      }

      return stat;
    }

  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -