⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cachequeuetest.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.xmlBlaster.test.classtest.queue;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_StorageProblemListener;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.queuemsg.DummyEntry;import java.util.ArrayList;import java.util.Hashtable;import junit.framework.*;import java.util.Enumeration;import org.xmlBlaster.util.queue.QueuePluginManager;import org.xmlBlaster.util.plugin.PluginInfo;/** * Test CacheQueueInterceptorPlugin. * <p> * The sorting order is priority,timestamp: * </p> * <pre> *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   -> * </pre> * <p> * As 9 is highest priority it is the first to be taken out.<br /> * As we need to maintain the timely sequence and * id is a timestamp in (more or less) nano seconds elapsed since 1970) * the id 2500 (it is older) has precedence to the id 3000 * </p> * <p> * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.CacheQueueTest * </p> * <p> * Configuration example: * </p> * <pre> * JdbcDriver.drivers=org.postgresql.Driver * JdbcDriver.postgresql.mapping=string=text,longint=bigint,int=integer,boolean=boolean * queue.callback.url=jdbc:postgresql://localhost/test * queue.callback.user=postgres * queue.callback.password= * </pre> * <p> * Test database with PostgreSQL: * </p> * <pre> * initdb /tmp/postgres * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres    (edit host access) * createdb test * postmaster -i -D /tmp/postgres * </pre> * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry) * @see org.xmlBlaster.util.queue.I_Queue * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin */public class CacheQueueTest extends TestCase {   private String ME = "CacheQueueTest";   protected Global glob;   private static Logger log = Logger.getLogger(CacheQueueTest.class.getName());   private CacheQueueInterceptorPlugin queue = null;   private I_Queue[] queues;   public ArrayList queueList = null;   public CacheQueueTest(String name) {      this(Global.instance(), name);   }   public CacheQueueTest(Global glob, String name) {      super(name);      this.glob = glob;   }   protected void setUp() {      glob = Global.instance();      QueuePropertyBase cbProp = null;      try {         glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");         cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");         StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue");         this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");         QueuePluginManager pluginManager = new QueuePluginManager(glob);         PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");         java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();         prop.put("tableNamePrefix", "TEST");         prop.put("entriesTableName", "_entries");         this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());         pluginInfo = new PluginInfo(glob, pluginManager, "CACHE", "1.0");         this.queue = (CacheQueueInterceptorPlugin)pluginManager.getPlugin(pluginInfo, queueId, cbProp);         this.queues = new I_Queue[3];         pluginInfo = new PluginInfo(glob, pluginManager, "RAM", "1.0");         this.queues[0] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);         pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");         this.queues[1] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);         this.queues[2] = queue;         for (int i=0; i < 3; i++) this.queues[i].shutdown(); // to allow to initialize again      }      catch (Exception ex) {         log.severe("could not propertly set up the database: " + ex.getMessage());      }   }   public void tearDown() {      try {         for (int i=0; i < 3; i++) {            this.queues[i].clear();            this.queues[i].shutdown();         }      }      catch (Exception ex) {         log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp");      }   }   public void testConfig() {      String queueType = "CACHE";      try {         config(20L, 10L, 500L, 200L);      }      catch (XmlBlasterException ex) {         fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);         ex.printStackTrace();      }   }   public StorageId config(long maxEntries, long maxEntriesCache, long maxBytes, long maxBytesCache)      throws XmlBlasterException {      // set up the queues ....      QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");      prop.setMaxEntries(maxEntries);      prop.setMaxEntriesCache(maxEntriesCache);      prop.setMaxBytes(maxBytes);      prop.setMaxBytesCache(maxBytesCache);      StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/config");      // this.queue = new CacheQueueInterceptorPlugin();      this.queue.initialize(queueId, prop);      long persistentSize = this.queue.getPersistentQueue().getMaxNumOfBytes();      long persistentMsg  = this.queue.getPersistentQueue().getMaxNumOfEntries();      long transientSize  = this.queue.getTransientQueue().getMaxNumOfBytes();      long transientMsg   = this.queue.getTransientQueue().getMaxNumOfEntries();      assertEquals("Wrong persistent size", maxBytes, persistentSize);      assertEquals("Wrong persistent num of msg", maxEntries, persistentMsg);      if (maxBytesCache != transientSize)         log.severe("ERROR: Wrong transient size" + this.queue.getTransientQueue().toXml(""));      assertEquals("Wrong transient size" + this.queue.getTransientQueue().toXml(""), maxBytesCache, transientSize);      assertEquals("Wrong num of transient msg", maxEntriesCache, transientMsg);      return queueId;   }   public void testClearWithSwappedEntries() {      String queueType = "CACHE";      try {         StorageId id = config(20L, 3L, 500L, 100L);         PriorityEnum prio = PriorityEnum.toPriorityEnum(5);         for (int i=0; i < 15; i++) {            boolean persistent =  (i | 1) == 0; // some persistent and some transient            long entrySize = 10L;            DummyEntry entry = new DummyEntry(glob, prio, id, entrySize, persistent);            this.queue.put(entry, true);         }                  long ret = this.queue.clear();         assertEquals("wrong number of entries returned by clear", 15L, ret);                  long numOfEntries = this.queue.getNumOfEntries();         long numOfBytes = this.queue.getNumOfBytes();         assertEquals("the queue should be empty", 0L, numOfEntries);         assertEquals("the size of the queue should be 0", 0L, numOfBytes);      }      catch (XmlBlasterException ex) {         fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);         ex.printStackTrace();      }   }   public void testPutPeekRemove() {      String queueType = this.glob.getProperty().get("queueType", "CACHE");      log.info("testPutPeekRemove will be done with a queue of type '" + queueType + "'");      log.info("if you want to test with another queue type invoke '-queueType $TYPE' on the cmd line where $TYPE is either RAM JDBC or CACHE");      int index = 2;      if ("RAM".equalsIgnoreCase(queueType)) index = 0;      else if ("JDBC".equalsIgnoreCase(queueType)) index = 1;      try {         putPeekRemove(this.queues[index]);      }      catch (XmlBlasterException ex) {         fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);         ex.printStackTrace();      }   }   public void putPeekRemove(I_Queue refQueue) throws XmlBlasterException {      // set up the queues ....      // every content is 80 bytes which gives an entry size of 100 bytes (80+20)      long entrySize = 100;      String lastSuccessfulLocation = "";      long maxNumOfBytesCache[] = {700L, 10000L};      long maxNumOfBytes[] = {700L, 50000L};      int numOfTransientEntries[] = { 2, 50, 200};      int numOfPersistentEntries[] =  { 0, 2, 50, 200};//      int numPrio[] = { 1, 5, 9};//      int it=0, id=0, ic=0, is=0;//      try {         for (int ic=0; ic < maxNumOfBytesCache.length; ic++) {            for (int is=0; is < maxNumOfBytes.length; is++) {               log.info("**** TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is]);               // a new queue each time here ...               QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");               prop.setMaxEntries(2000L);               prop.setMaxEntriesCache(1000L);               prop.setMaxBytes(maxNumOfBytes[is]);               prop.setMaxBytesCache(maxNumOfBytesCache[ic]);               StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes[is] + "/ram" + maxNumOfBytesCache[ic]);//               this.queue = new CacheQueueInterceptorPlugin();               refQueue.clear();               refQueue.shutdown();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -