⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jdbcqueuetest.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.xmlBlaster.test.classtest.queue;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.StopWatch;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.queuemsg.DummyEntry;import java.sql.Connection;import java.util.ArrayList;import junit.framework.*;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.QueuePluginManager;import org.xmlBlaster.util.queue.jdbc.JdbcConnectionPool;import org.xmlBlaster.util.plugin.PluginInfo;/** * Test JdbcQueuePlugin failover when persistent store disappears.  * <p> * Invoke: java org.xmlBlaster.test.classtest.queue.JdbcQueueTest * </p> * <p> * Test database with PostgreSQL: * </p> * <pre> * initdb /tmp/postgres * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres    (edit host access) * createdb test * postmaster -i -D /tmp/postgres * </pre> * @see org.xmlBlaster.util.queue.I_Queue * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin */public class JdbcQueueTest extends TestCase {         public class ConnectionConsumer extends Thread {      private JdbcConnectionPool pool;      private int count;            public ConnectionConsumer(JdbcConnectionPool pool, int count) {         this.pool = pool;         this.count = count;         start();      }            public void run() {         boolean success = true;         try {            log.info("connectionConsumer " + this.count + " starting");            Connection conn = this.pool.getConnection();            log.info("connectionConsumer " + this.count + " got the connection " + conn);            if (conn != null)                this.pool.releaseConnection(conn, success);         }         catch (XmlBlasterException ex) {            log.info("connectionConsumer exception " + ex.getMessage());            if (ex.getErrorCode().getErrorCode().equals(ErrorCode.RESOURCE_TOO_MANY_THREADS.getErrorCode())) {               synchronized(JdbcQueueTest.class) {                  exceptionCount++;               }            }         }      }         }      int exceptionCount = 0;      private String ME = "JdbcQueueTest";   protected Global glob;   private static Logger log = Logger.getLogger(JdbcQueueTest.class.getName());   private StopWatch stopWatch = new StopWatch();   private int numOfQueues = 10;   private int numOfMsg    = 10000;   private long sizeOfMsg  = 100L;   private I_Queue queue   = null;   public ArrayList queueList = null;//   public static String[] PLUGIN_TYPES = { new String("JDBC"), new String("CACHE") };   public static String[] PLUGIN_TYPES = { new String("JDBC") };   public int count = 0;   boolean suppressTest = false;   boolean doExecute = true;   /** Constructor for junit not possible since we need to run it 3 times   public JdbcQueueTest(String name) {      super(name);      for (int i=0; i < NUM_IMPL; i++)         initialize(new Global(), name, i);   }   */   public JdbcQueueTest(Global glob, String name, int currImpl, boolean doExecute) {      super(name);      this.doExecute = doExecute;      initialize(glob, name, currImpl);   }   private void initialize(Global glob, String name, int currImpl) {      this.glob = Global.instance();      this.numOfQueues = glob.getProperty().get("queues", 2);      this.numOfMsg = glob.getProperty().get("entries", 100);      this.sizeOfMsg = glob.getProperty().get("sizes", 10L);      this.suppressTest = false;      this.count = currImpl;      try {         String type = PLUGIN_TYPES[currImpl];         this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");         QueuePluginManager pluginManager = new QueuePluginManager(glob);         PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");         java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();         prop.put("tableNamePrefix", "TEST");         prop.put("entriesTableName", "_entries");         CbQueueProperty cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");         StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue");         this.queue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);         this.queue.shutdown(); // to allow to initialize again      }      catch (Exception ex) {         log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");      }   }   protected void setUp() {      try {         glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");         ME = "JdbcQueueTest with class: " + PLUGIN_TYPES[this.count];      }      catch (Exception ex) {         log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'" + ex.getMessage());      }      // cleaning up the database from previous runs ...      try {         // test initialize()//         this.queue.destroy();         this.queue.shutdown();      }      catch (Exception ex) {         log.severe("could not propertly set up the database: " + ex.getMessage());         this.suppressTest = true;      }   }   public void tearDown() {      try {         this.queue.clear();         this.queue.shutdown();      }      catch (Exception ex) {         log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp " + ex.getMessage());      }   }      public void testPutWithBreak() {      if (this.suppressTest) {         log.severe("JDBC test is not driven as no database was found");         return;      }      try {         if (this.doExecute) putWithBreak();         else {            log.warning("test desactivated since needs to be run manually");            log.warning("please invoke it as 'java org.xmlBlaster.test.classtest.queue.JdbcQueueTest'");         }      }      catch (XmlBlasterException ex) {         fail("Exception when testing PutWithBreak probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );         ex.printStackTrace();      }   }   public void putWithBreak() throws XmlBlasterException {      String me = ME + ".putWithBreak";      // set up the queues ....      QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");      prop.setMaxEntries(10000);      StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "putWithBreak");      queue.initialize(queueId, prop);      queue.clear();      int num = 30;      boolean success = false;      for (int i=0; i < num; i++) {         try {            log.info("put with break entry " + i + "/" + num + " please kill the DB manually to test reconnect");            DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);            queue.put(entry, false);            try {               Thread.sleep(5000L);            }            catch (Exception ex) {            }         }         catch (XmlBlasterException ex) {            if (log.isLoggable(Level.FINE))  log.fine(ex.getMessage());            if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {               log.info("the communication to the db has been lost");               success = true;               break;            }            else throw ex;         }      }            assertTrue(me + ": Timed out when waiting to loose the connection to the DB", success);      success = false; // reset the flag      log.info("preparing to reconnect again ...");      for (int i=0; i < num; i++) {         try {            log.info("put with break entry " + i + "/" + num + " please restart the the DB to test reconnect");            DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);            queue.put(entry, false);            log.info("the communication to the db has been reestablished");            success = true;            break;         }         catch (XmlBlasterException ex) {            if (log.isLoggable(Level.FINE))  log.fine(ex.getMessage());            if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {               try {                  Thread.sleep(5000L);               }               catch (Exception e) {               }            }            else throw ex;         }      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -