⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 i_queuetest.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
package org.xmlBlaster.test.classtest.queue;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;import org.xmlBlaster.util.queue.BlockingQueueWrapper;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queuemsg.DummyEntry;import java.util.ArrayList;import junit.framework.*;import org.xmlBlaster.util.queue.QueuePluginManager;/** * Test RamQueuePlugin. * <p> * The sorting order is priority,timestamp: * </p> * <pre> *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   -> * </pre> * <p> * As 9 is highest priority it is the first to be taken out.<br /> * As we need to maintain the timely sequence and * id is a timestamp in (more or less) nano seconds elapsed since 1970) * the id 2500 (it is older) has precedence to the id 3000 * </p> * <p> * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.I_QueueTest * </p> * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry) * @see org.xmlBlaster.util.queue.I_Queue * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin */public class I_QueueTest extends TestCase {   class QueueSizeListener  implements I_StorageSizeListener {      private long lastNumEntries = 0L,                    lastNumBytes = 0L,                   lastIncrementEntries = 0L,                   lastIncrementBytes = 0L;      private int count = 0;            public long getLastIncrementEntries() {         return this.lastIncrementEntries;      }            public long getLastIncrementBytes() {         return this.lastIncrementBytes;      }            public int getCount() {         return this.count;      }            public void clear() {         this.lastNumEntries = 0L;          this.lastNumBytes = 0L;         this.lastIncrementEntries = 0L;         this.lastIncrementBytes = 0L;         this.count = 0;      }            public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) {         this.lastIncrementEntries = numEntries - this.lastNumEntries;         this.lastIncrementBytes = numBytes - this.lastNumBytes;         this.lastNumEntries = numEntries;         this.lastNumBytes = numBytes;         this.count++;      }   }      private String ME = "I_QueueTest";   protected Global glob;   private static Logger log = Logger.getLogger(I_QueueTest.class.getName());   private I_Queue queue;   private QueueSizeListener queueSizeListener = new QueueSizeListener();      static String[] PLUGIN_TYPES = {                   new String("RAM"),                   new String("JDBC"),                   new String("CACHE")                 };/*   static I_Queue[] IMPL = {                   new org.xmlBlaster.util.queue.ram.RamQueuePlugin(),                   new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),                   new org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin()                 };*/   public class QueuePutter extends Thread {            private I_Queue queue;      private long delay;      private int numOfEntries;      private boolean ignoreInterceptor;            public QueuePutter(I_Queue queue, long delay, int numOfEntries, boolean ignoreInterceptor) {         this.queue = queue;         this.delay = delay;         this.numOfEntries = numOfEntries;         this.ignoreInterceptor = ignoreInterceptor;      }            public void run() {         try {            for (int i=0; i < this.numOfEntries; i++) {               Thread.sleep(this.delay);               DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);               this.queue.put(queueEntry, this.ignoreInterceptor);            }         }         catch (Exception ex) {            ex.printStackTrace();         }      }         }         public I_QueueTest(String name, int currImpl, Global glob) {      super(name);//      this.queue = IMPL[currImpl];      //this.ME = "I_QueueTest[" + this.queue.getClass().getName() + "]";      if (glob == null) this.glob = Global.instance();      else this.glob = glob;      try {         String type = PLUGIN_TYPES[currImpl];         this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");         QueuePluginManager pluginManager = new QueuePluginManager(glob);         PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");         java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();         prop.put("tableNamePrefix", "TEST");         prop.put("entriesTableName", "_entries");         this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());         pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");         StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");         this.queue = pluginManager.getPlugin(pluginInfo, queueId, new CbQueueProperty(this.glob, Constants.RELATING_CALLBACK, this.glob.getStrippedId()));         this.queue.shutdown(); // to allow to initialize again      }      catch (Exception ex) {         log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");      }   }   protected void setUp() {      // cleaning up the database from previous runs ...      QueuePropertyBase prop = null;      try {         prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");         StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");         queue.initialize(queueId, prop);         queue.clear();         queue.shutdown();      }      catch (Exception ex) {         log.severe("could not propertly set up the database: " + ex.getMessage());      }   }   /**    * Tests QueuePropertyBase() and getStorageId()    * @param queueTypeList A space separated list of names for the    *        implementations to be tested. Valid names are:    *        RamQueuePlugin JdbcQueuePlugin    */   public void testConfig() {      config(this.queue);   }   /**    * Tests initialize(), getProperties(), setProperties() and capacity()    * @param queue !!!Is not initialized in this case!!!!    */   private void config(I_Queue queue) {      ME = "I_QueueTest.config(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";      System.out.println("***" + ME);      QueuePropertyBase prop1 = null;      QueuePropertyBase prop = null;      try {         // test initialize()         prop1 = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");         int max = 12;         prop1.setMaxEntries(max);         prop1.setMaxEntriesCache(max);         assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());         assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());         //PluginInfo pluginInfo = new PluginInfo(glob, null, "");         //queue.init(glob, pluginInfo);     // Init from pluginloader is first         StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");         queue.initialize(queueId, prop1);         assertEquals(ME+": Wrong queue ID", queueId, queue.getStorageId());         try {            prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, "/node/test");            prop.setMaxEntries(99);            prop.setMaxEntriesCache(99);            queue.setProperties(prop);         }         catch(XmlBlasterException e) {            fail("Changing properties failed");         }      }      catch(XmlBlasterException e) {         fail(ME + ": Exception thrown: " + e.getMessage());      }      long len = prop.getMaxEntries();      assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), queue.getMaxNumOfEntries());      assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)queue.getProperties()).getMaxEntries());      assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());      try {         for (int ii=0; ii<len; ii++) {            queue.put(new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), false);         }         assertEquals(ME+": Wrong total size", len, queue.getNumOfEntries());         try {            DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);            queue.put(queueEntry, false);            queue.put(queueEntry, false);            fail("Did expect an exception on overflow");         }         catch(XmlBlasterException e) {            log.info("SUCCESS the exception is OK: " + e.getMessage());         }         log.info("toXml() test:" + queue.toXml(""));         log.info("usage() test:" + queue.usage());         assertEquals(ME+": should not be shutdown", false, queue.isShutdown());         queue.shutdown();         assertEquals(ME+": should be shutdown", true, queue.isShutdown());         log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");         System.out.println("***" + ME + " [SUCCESS]");         queue.shutdown();         queue = null;      }      catch(XmlBlasterException e) {         fail(ME + ": Exception thrown: " + e.getMessage());      }   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -