⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationagent.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------
 Name:      ReplicationAgent.java
 Project:   org.xmlBlasterProject:   xmlBlaster.org
 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
 ------------------------------------------------------------------------------*/
package org.xmlBlaster.contrib.replication;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Logger;

import org.xmlBlaster.contrib.GlobalInfo;
import org.xmlBlaster.contrib.I_Info;
import org.xmlBlaster.contrib.I_Update;
import org.xmlBlaster.contrib.PropertiesInfo;
import org.xmlBlaster.contrib.db.I_DbPool;
import org.xmlBlaster.contrib.dbwatcher.DbWatcher;
import org.xmlBlaster.contrib.dbwriter.DbWriter;
import org.xmlBlaster.contrib.replication.I_DbSpecific;
import org.xmlBlaster.contrib.replication.ReplicationConverter;
import org.xmlBlaster.contrib.replication.impl.SpecificDefault;
import org.xmlBlaster.util.Global;
import org.xmlBlaster.util.XmlBlasterException;
import org.xmlBlaster.util.plugin.PluginInfo;

/**
 * Test basic functionality.
 * <p>
 * To run most of the tests you need to have a database (for example Postgres).
 * </p>
 * <p>
 * The connection configuration (url, password etc.) is configured as JVM
 * property or in {@link #createTest(I_Info, Map)} and
 * {@link #setUpDbPool(I_Info)}
 * </p>
 * 
 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 */
public class ReplicationAgent {
   
   private static Logger log = Logger.getLogger(ReplicationAgent.class.getName());

   // private I_Info readerInfo;
   // private I_Info writerInfo;
   
   private DbWatcher dbWatcher;
   private DbWriter dbWriter;
   // private static String replPrefix = "repl_";

   public class OwnGlobalInfo extends GlobalInfo {
      
      private final static boolean ON_SERVER = false;
      
      public OwnGlobalInfo(Global global, I_Info additionalInfo, String infoId) throws Exception {
         super(global, additionalInfo, ON_SERVER);
         put(ID, infoId);
      }

      public OwnGlobalInfo(GlobalInfo globalInfo, I_Info additionalInfo, String infoId) throws Exception {
         super(globalInfo, additionalInfo, ON_SERVER);
         put(ID, infoId);
      }

      protected void doInit(Global global, PluginInfo pluginInfo) throws XmlBlasterException {
      }
      
   }
   
   private GlobalInfo createOwnGlobalInfo(Global global, I_Info additionalInfo, String infoId) throws Exception {
      return new OwnGlobalInfo(global, additionalInfo, infoId);
   }
   
   
   
   /**
    * Keys are the info objects and values are maps containing the used properties as key/value pairs.
    */
   public static void main(String[] args) {
      try {
         // I_Info cfgInfo = new PropertiesInfo(new Properties());
         
         ReplicationAgent agent = new ReplicationAgent();
         Global global = new Global(args);
         GlobalInfo cfgInfo = agent.createOwnGlobalInfo(global, null, "configuration");
         
         agent.fillInfoWithCommandLine(args, cfgInfo);
         
         I_Info readerInfo = agent.createReaderInfo(cfgInfo);
         I_Info writerInfo = agent.createWriterInfo(cfgInfo);
         
         if (ReplicationAgent.needsHelp(args)) {
            agent.displayHelp(readerInfo, writerInfo);
            System.exit(-1);
         }

         boolean isInteractive = cfgInfo.getBoolean("interactive", false);
         agent.init(readerInfo, writerInfo);

         log.info("REPLICATION AGENT IS NOW READY");
         if (isInteractive)
            agent.process(readerInfo, writerInfo);
         else {
            while (true) {
               try {
                  Thread.sleep(5000L);
               }
               catch (Exception ex) {
                  
               }
            }
         }
         agent.shutdown();

      } 
      catch (Throwable ex) {
         log.severe("An exception occured when starting '" + ex.getMessage() + "'");
         ex.printStackTrace();
      }
   }

   public DbWatcher getDbWatcher() {
      return this.dbWatcher;
   }

   public DbWriter getDbWriter() {
      return this.dbWriter;
   }

   /**
    * Default ctor.
    */
   public ReplicationAgent() {
   }

   private static final Map getCommonDefaultMap(I_Info subInfo) {
      String driversDefault = "org.hsqldb.jdbcDriver:" +
      "oracle.jdbc.driver.OracleDriver:" +
      "com.microsoft.jdbc.sqlserver.SQLServerDriver:" + 
      "com.microsoft.sqlserver.jdbc.SQLServerDriver:" +
      "org.postgresql.Driver";
      if (subInfo != null)
         driversDefault = subInfo.get("JdbcDriver.drivers", driversDefault);
      Map defaultMap = new HashMap();
      defaultMap.put("jdbc.drivers", driversDefault);
      defaultMap.put("db.url", "jdbc:postgresql:test//localhost");
      defaultMap.put("db.user", "postgres");
      defaultMap.put("db.password", "");
      return defaultMap;
   }
   
   
   private static final Map getReaderDefaultMap(I_Info readerInfo) {
      Map defaultMap = getCommonDefaultMap(readerInfo);
      String prefix = readerInfo.get("replication.prefix", "repl_");
      defaultMap.put("mom.loginName", "DbWatcherPlugin.testPoll/1");
      defaultMap.put("mom.topicName", "trans_key");
      defaultMap.put("alertScheduler.pollInterval", "2000");
      defaultMap.put("changeDetector.class", "org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector");
      defaultMap.put("changeDetector.detectStatement", "SELECT MAX(repl_key) from " + prefix + "items");
      defaultMap.put("db.queryMeatStatement", "SELECT * FROM " + prefix + "items ORDER BY repl_key");
      defaultMap.put("converter.addMeta", "false");
      defaultMap.put("converter.class", "org.xmlBlaster.contrib.replication.ReplicationConverter");
      defaultMap.put("alertProducer.class", "org.xmlBlaster.contrib.replication.ReplicationScheduler");
      defaultMap.put("replication.doBootstrap", "true");
      return defaultMap;
   }

   private static final Map getWriterDefaultMap(I_Info writerInfo) {
      Map defaultMap = getCommonDefaultMap(writerInfo);
      defaultMap.put("mom.loginName", "DbWriter/1");
      defaultMap.put("replication.mapper.tables", "test1=test1_replica,test2=test2_replica,test3=test3_replica");
      defaultMap.put("dbWriter.writer.class", "org.xmlBlaster.contrib.replication.ReplicationWriter");
      defaultMap.put("dispatch/callback/retries", "-1");
      defaultMap.put("dispatch/callback/delay", "10000");
      defaultMap.put("dispatch/connection/retries", "-1");
      defaultMap.put("dispatch/connection/delay", "10000");
      return defaultMap;
   }

   private final static void showSubHelp(I_Info info, Map defaultMap, PrintStream out) {
      String[] keys = (String[])defaultMap.keySet().toArray(new String[defaultMap.size()]);
      out.println("" + keys.length + " default properties displayed: ");
      for (int i=0; i < keys.length; i++) {
         String value = info.get(keys[i], "");
         out.println("  " + keys[i] + "=" + value);
      }
   }
   
   private I_Info createReaderInfo(GlobalInfo cfgInfo) throws Exception {
      String masterFilename = cfgInfo.get("masterFilename", null);
      if (masterFilename == null)
         return null;
      Properties props = new Properties();
      if (!masterFilename.equalsIgnoreCase("default")) {
         InputStream is = getFileFromClasspath(masterFilename);
         props.load(is);
         is.close();
      }
      I_Info readerInfo = new OwnGlobalInfo(cfgInfo, new PropertiesInfo(props), "reader");
      Map defaultMap = getReaderDefaultMap(readerInfo);
      
      String[] keys = (String[])defaultMap.keySet().toArray(new String[defaultMap.size()]);
      for (int i=0; i < keys.length; i++) {
         if (readerInfo.get(keys[i], null) == null)
            readerInfo.put(keys[i], (String)defaultMap.get(keys[i]));
      }
      return readerInfo;
   }
   
   
   public I_Info createWriterInfo(GlobalInfo cfgInfo) throws Exception {
      String slaveFilename = cfgInfo.get("slaveFilename", null);

      if (slaveFilename == null)
         return null;

      Properties props = new Properties();
      if (!slaveFilename.equalsIgnoreCase("default")) {
         InputStream is = getFileFromClasspath(slaveFilename);
         props.load(is);
         is.close();
      }
      I_Info writerInfo = new OwnGlobalInfo(cfgInfo, new PropertiesInfo(props), "writer");
      
      Map defaultMap = getWriterDefaultMap(writerInfo);
      String[] keys = (String[])defaultMap.keySet().toArray(new String[defaultMap.size()]);
      for (int i=0; i < keys.length; i++) {
         if (writerInfo.get(keys[i], null) == null)
            writerInfo.put(keys[i], (String)defaultMap.get(keys[i]));
      }
      return writerInfo;
      
   }
   
   private static InputStream getFileFromClasspath(String filename) throws IOException {
      Class clazz = ReplicationAgent.class;
      Enumeration enm = clazz.getClassLoader().getResources(filename);
      if(enm.hasMoreElements()) {
         URL url = (URL)enm.nextElement();
         log.fine(" loading file '" + url.getFile() + "'");
         while(enm.hasMoreElements()) {
            url = (URL)enm.nextElement();
            log.warning("init: an additional matching file has been found in the classpath at '"
               + url.getFile() + "' please check that the correct one has been loaded (see info above)"
            );
         }
         return clazz.getClassLoader().getResourceAsStream(filename); 
      }
      else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -