⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testreplication.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*------------------------------------------------------------------------------ Name:      TestReplication.java Project:   org.xmlBlasterProject:   xmlBlaster.org Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file ------------------------------------------------------------------------------*/package org.xmlBlaster.test.contrib.replication;import java.io.InputStream;import java.io.OutputStream;import java.io.PipedInputStream;import java.io.PipedOutputStream;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.Statement;import java.util.Properties;import java.util.Random;import java.util.logging.Logger;import org.custommonkey.xmlunit.XMLTestCase;import org.custommonkey.xmlunit.XMLUnit;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.DbInfo;import org.xmlBlaster.contrib.db.DbMetaHelper;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwriter.DbWriter;import org.xmlBlaster.contrib.replication.I_DbSpecific;import org.xmlBlaster.contrib.replication.ReplicationAgent;import org.xmlBlaster.contrib.replication.ReplicationConverter;import org.xmlBlaster.contrib.replication.TableToWatchInfo;import org.xmlBlaster.test.MsgInterceptor;import org.xmlBlaster.util.Global;/** * Test basic functionality for the replication. This test needs an instance of xmlBlaster running. * <p> * To run most of the tests you need to have a database (for example Postgres). * </p> * <p> * The connection configuration (url, password etc.) is configured as JVM * property or in {@link #createTest(I_Info, Map)} and * {@link #setUpDbPool(I_Info)} * </p> * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class TestReplication extends XMLTestCase {   private static Logger log = Logger.getLogger(TestReplication.class.getName());   private I_Info readerInfo;   private I_Info writerInfo;   private ReplicationAgent agent;   private SpecificHelper specificHelper;   private DbMetaHelper dbHelper;   private String tableName = "TEST_REPLICATION";   private String tableName2 = "TEST_REPLICATION2";   private String replPrefix = "repl_";   private long sleepDelay;   private MsgInterceptor interceptor;   /**    * Start the test.    * <pre>    *  java -Ddb=oracle junit.swingui.TestRunner -noloading org.xmlBlaster.test.contrib.replication.TestReplication    * </pre>    * @param args  Command line settings    */   public static void main(String[] args) {      // junit.swingui.TestRunner.run(TestReplication.class);      TestReplication test = new TestReplication();      try {         test.setUp();         test.testBigBlobs();         test.tearDown();         test.setUp();         test.testUntrimmedSpaces();         test.tearDown();         test.setUp();         test.testNullEntriesOnTable();         test.tearDown();         test.setUp();         test.testCreateAndInsert();         test.tearDown();         test.setUp();         test.testPerformAllOperationsOnTable();         test.tearDown();         // test.setUp();         // test.testMultiTransaction();         // test.tearDown();      }      catch (Exception ex) {         ex.printStackTrace();         fail();      }   }   /**    * Default ctor.    */   public TestReplication() {      super();      XMLUnit.setIgnoreWhitespace(true);      this.interceptor = new MsgInterceptor(new Global(), log, null, (I_Update)null);   }   /**    * Constructor for TestReplication.    *    * @param arg0    */   public TestReplication(String arg0) {      super(arg0);      XMLUnit.setIgnoreWhitespace(true);      this.interceptor = new MsgInterceptor(new Global(), log, null, (I_Update)null);   }   /**    * Helper method to fill the properties. If an entry is found in the system properties it is left as is.    *    * @param info    * @param key    * @param val    */   private void setProp(I_Info info, String key, String val, boolean force) {      String tmp = info.get(key, null);      if (tmp == null || force)         info.put(key, val);   }   /**    * This method is invoked directly by setUp.    * @param readerInfo The info object for the reader (the dbWatcher).    * @param writerInfo The info object for the writer (the DbWriter).    */   private String setupProperties(I_Info readerInfo, I_Info writerInfo, boolean extraUser) {      String ret = null;      if (readerInfo != null) {         ret = readerInfo.get("replication.prefix", "repl_");         setProp(readerInfo, "mom.loginName", "DbWatcherPlugin.testPoll/session/1", true);         setProp(readerInfo, "mom.topicName", "testRepl", true);         setProp(readerInfo, "changeDetector.groupColName", "repl_key", true);         setProp(readerInfo, "alertScheduler.pollInterval", "1000", false);         setProp(readerInfo, "changeDetector.class", "org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector", false);         setProp(readerInfo, "changeDetector.detectStatement", "SELECT MAX(repl_key) from " + this.replPrefix + "items", false);         setProp(readerInfo, "db.queryMeatStatement", "SELECT * FROM " + this.replPrefix + "items ORDER BY repl_key", false);         // setProp(readerInfo, "changeDetector.postUpdateStatement", "DELETE from " + this.replPrefix + "items", false);         setProp(readerInfo, "converter.addMeta", "false", false);         setProp(readerInfo, "converter.class", "org.xmlBlaster.contrib.replication.ReplicationConverter", false);         setProp(readerInfo, "alertProducer.class", "org.xmlBlaster.contrib.replication.ReplicationScheduler", false);         setProp(readerInfo, "replication.doBootstrap", "true", false);         setProp(readerInfo, "xmlBlaster/useSessionMarker", "true", false);      }      if (writerInfo != null) {  // and here for the dbWriter ...         // ---- Database settings -----         if (extraUser)            setProp(writerInfo, "mom.loginName", "DbWriterExtra/session/1", true);         else            setProp(writerInfo, "mom.loginName", "DbWriter/session/1", true);         // setProp(writerInfo, "replication.mapper.tables", "test_replication=test_replication2,test1=test1_replica,test2=test2_replica,test3=test3_replica", false);         setProp(writerInfo, "replication.mapper.table.test_replication", "test_replication2", false);         setProp(writerInfo, "replication.mapper.table.test1", "test1_replica", false);         setProp(writerInfo, "replication.mapper.table.test2", "test2_replica", false);         setProp(writerInfo, "replication.mapper.table.test3", "test3_replica", false);         String subscribeKey = System.getProperty("mom.subscribeKey", "<key oid='testRepl'/>");         setProp(writerInfo, "mom.subscribeKey", subscribeKey, true);         setProp(writerInfo, "mom.subscribeQos", "<qos><initialUpdate>false</initialUpdate><multiSubscribe>false</multiSubscribe><persistent>true</persistent></qos>", false);         setProp(writerInfo, "dbWriter.writer.class", "org.xmlBlaster.contrib.replication.ReplicationWriter", false);         // these are pure xmlBlaster specific properties         setProp(writerInfo, "dispatch/callback/retries", "-1", true);         setProp(writerInfo, "dispatch/callback/delay", "10000", true);         setProp(writerInfo, "queue/callback/maxEntries", "10000", true);         // we need this since we are in testing, otherwise it would loop after the second tearDown         // because there would be two XmlBlasterAccess having the same session name (and only one session is allowed)         setProp(writerInfo, "dbWriter.shutdownMom", "true", true);         setProp(writerInfo, "xmlBlaster/useSessionMarker", "true", false);      }      return ret;   }   private I_DbSpecific getDbSpecific() {      assertTrue("The agent must not be null", this.agent != null);      DbWriter writer = this.agent.getDbWriter();      assertTrue("The writer must not be null", writer != null);      I_Info tmpInfo = writer.getInfo();      assertTrue("The writer info must not be null", tmpInfo != null);      String dbSpecificClass = tmpInfo.get("replication.dbSpecific.class", "org.xmlBlaster.contrib.replication.impl.SpecificOracle");      I_DbSpecific dbSpecific = (I_DbSpecific)tmpInfo.getObject(dbSpecificClass + ".object");      assertTrue("The dbSpecific for the writer must not be null", dbSpecific != null);      return dbSpecific;   }   /**    * Configure database access.    * @see TestCase#setUp()    */   protected void setUp() throws Exception {      super.setUp();      this.specificHelper = new SpecificHelper(System.getProperties());      this.readerInfo = new PropertiesInfo((Properties)this.specificHelper.getProperties().clone());      this.writerInfo = new PropertiesInfo((Properties)this.specificHelper.getProperties().clone());      boolean extraUser = false;      this.replPrefix = setupProperties(this.readerInfo, this.writerInfo, extraUser);      extraUser = true; // to make it easy to recognize by the session name      // we use the writerInfo since this will not instantiate an publisher      I_Info tmpInfo =  new PropertiesInfo((Properties)this.specificHelper.getProperties().clone());      setupProperties(null, tmpInfo, extraUser);      boolean forceCreationAndInit = true;      I_DbSpecific dbSpecific = ReplicationConverter.getDbSpecific(tmpInfo, forceCreationAndInit);      I_DbPool pool = (I_DbPool)tmpInfo.getObject("db.pool");      DbInfo persistentInfo = new DbInfo(pool, "replication", tmpInfo);      String name = readerInfo.get("replication.prefix", "repl_") + ".oldReplKey";      persistentInfo.put(name, "0");      Connection conn = null;      try {         this.dbHelper = new DbMetaHelper(pool);         conn = pool.reserve();         boolean doWarn = false; // we don't want warnings on SQL Exceptions here.         log.info("setUp: going to cleanup now ...");         this.sleepDelay = this.readerInfo.getLong("test.sleepDelay", -1L);         if (this.sleepDelay < 0L)            this.sleepDelay = 25000L;         log.info("setUp: The sleep delay will be '" + this.sleepDelay + "' ms");         dbSpecific.cleanup(conn, doWarn);         try {            pool.update("DROP TABLE " + this.tableName);         }         catch (Exception e) {         }         try {            pool.update("DROP TABLE " + this.tableName2);         }         catch (Exception e) {         }         for (int i=1; i < 5; i++) { // make sure we have deleted all triggers            try {               pool.update("DROP TRIGGER " + this.replPrefix + i);            }            catch (Exception ex) {            }         }         log.info("setUp: cleanup done, going to bootstrap now ...");         boolean force = true;         dbSpecific.bootstrap(conn, doWarn, force);         dbSpecific.shutdown();         pool.shutdown();         pool = null;         dbSpecific = null;         tmpInfo = null;      }      catch (Exception ex) {         if (conn != null && pool != null)            pool.release(conn);      }      log.info("setUp: Instantiating");      this.agent = new ReplicationAgent();      this.agent.init(this.readerInfo, this.writerInfo);      this.interceptor.clear();      this.agent.registerForUpdates(this.interceptor);      log.info("setUp: terminated");   }   /*    * @see TestCase#tearDown()    */   protected void tearDown() throws Exception {      super.tearDown();      // here we should also cleanup all resources on the database : TODO      this.agent.registerForUpdates(null);      this.agent.shutdown();      this.agent = null;   }   /**    *    * If the table does not exist we expect a null ResultSet    *    * @throws Exception Any type is possible    */   public final void testCreateAndInsert() throws Exception {      log.info("Start");      I_DbPool pool = (I_DbPool)this.readerInfo.getObject("db.pool");      assertNotNull("pool must be instantiated", pool);      Connection conn = null;      try {         conn  = pool.reserve();         try {            pool.update("DROP TABLE " + this.tableName + this.specificHelper.getCascade());         }         catch (Exception ex) {            conn = pool.reserve();         }         try {            pool.update("DROP TABLE " + this.tableName2 + this.specificHelper.getCascade()); // This is the replica         }         catch (Exception ex) {            conn = pool.reserve();         }         // verifying         try {            ResultSet rs = conn.getMetaData().getTables(null, this.specificHelper.getOwnSchema(pool), this.dbHelper.getIdentifier(this.tableName), null);            assertFalse("Testing if the tables have been cleaned up. The table '" + this.tableName + "' is still here", rs.next()); // should be empty            rs.close();         }         catch (Exception ex) {            ex.printStackTrace();            assertTrue("Testing that the tables have been cleaned up correctly. An exception shall not occur here", false);         }         try {            ResultSet rs = conn.getMetaData().getTables(null, this.specificHelper.getOwnSchema(pool), this.dbHelper.getIdentifier(this.tableName2), null);            assertFalse("Testing if the tables have been cleaned up. The table '" + tableName2 + "' is still here", rs.next()); // should be empty            rs.close();         }         catch (Exception ex) {            ex.printStackTrace();            assertTrue("Testing that the tables have been cleaned up correctly. An exception shall not occur here", false);         }         try {            boolean force = false;            String destination = null;            boolean forceSend = false;            TableToWatchInfo tableToWatch = new TableToWatchInfo(null, this.specificHelper.getOwnSchema(pool), tableName);            tableToWatch.setActions("IDU");            getDbSpecific().addTableToWatch(tableToWatch, force, new String[] { destination }, forceSend);         }         catch (Exception ex) {            ex.printStackTrace();            assertTrue("Testing if addition of table '" + tableName + "' to tables to replicate (" + this.replPrefix + "tables) succeeded: An exception should not occur here", false);         }         this.interceptor.clear();         String sql = "CREATE TABLE " + tableName + "(name VARCHAR(20), city VARCHAR(15), PRIMARY KEY (name))";         pool.update(sql);         sql = "INSERT INTO " + tableName + " (name, city) VALUES ('michele', 'caslano')";         pool.update(sql);         /* The DbWatcher shall now detect a table creation and an insert (after maximum two seconds)          * here the xmlBlaster must run to allow the DbWatcher publish the messages.          * The DbWriter shall receive the messages it subscribed to and the replica shall be created and filled.          */         this.interceptor.waitOnUpdate(this.sleepDelay, 2);         // a new table must have been created ...         conn = pool.reserve();         ResultSet rs = conn.getMetaData().getTables(null, this.specificHelper.getOwnSchema(pool), this.dbHelper.getIdentifier(this.tableName2), null);         boolean isThere = rs.next();         log.info("the replicated table is " + (isThere ? "" : "not ") + "there");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -