📄 cachequeuetest.java
字号:
package org.xmlBlaster.test.classtest.queue;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_StorageProblemListener;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.queuemsg.DummyEntry;import java.util.ArrayList;import java.util.Hashtable;import junit.framework.*;import java.util.Enumeration;import org.xmlBlaster.util.queue.QueuePluginManager;import org.xmlBlaster.util.plugin.PluginInfo;/** * Test CacheQueueInterceptorPlugin. * <p> * The sorting order is priority,timestamp: * </p> * <pre> * -> 5,100 - 5,98 - 5,50 - 9,3000 - 9,2500 -> * </pre> * <p> * As 9 is highest priority it is the first to be taken out.<br /> * As we need to maintain the timely sequence and * id is a timestamp in (more or less) nano seconds elapsed since 1970) * the id 2500 (it is older) has precedence to the id 3000 * </p> * <p> * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.CacheQueueTest * </p> * <p> * Configuration example: * </p> * <pre> * JdbcDriver.drivers=org.postgresql.Driver * JdbcDriver.postgresql.mapping=string=text,longint=bigint,int=integer,boolean=boolean * queue.callback.url=jdbc:postgresql://localhost/test * queue.callback.user=postgres * queue.callback.password= * </pre> * <p> * Test database with PostgreSQL: * </p> * <pre> * initdb /tmp/postgres * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres (edit host access) * createdb test * postmaster -i -D /tmp/postgres * </pre> * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry) * @see org.xmlBlaster.util.queue.I_Queue * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin */public class CacheQueueTest extends TestCase { private String ME = "CacheQueueTest"; protected Global glob; private static Logger log = Logger.getLogger(CacheQueueTest.class.getName()); private CacheQueueInterceptorPlugin queue = null; private I_Queue[] queues; public ArrayList queueList = null; public CacheQueueTest(String name) { this(Global.instance(), name); } public CacheQueueTest(Global glob, String name) { super(name); this.glob = glob; } protected void setUp() { glob = Global.instance(); QueuePropertyBase cbProp = null; try { glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue"); this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); QueuePluginManager pluginManager = new QueuePluginManager(glob); PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0"); java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters(); prop.put("tableNamePrefix", "TEST"); prop.put("entriesTableName", "_entries"); this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters()); pluginInfo = new PluginInfo(glob, pluginManager, "CACHE", "1.0"); this.queue = (CacheQueueInterceptorPlugin)pluginManager.getPlugin(pluginInfo, queueId, cbProp); this.queues = new I_Queue[3]; pluginInfo = new PluginInfo(glob, pluginManager, "RAM", "1.0"); this.queues[0] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp); pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0"); this.queues[1] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp); this.queues[2] = queue; for (int i=0; i < 3; i++) this.queues[i].shutdown(); // to allow to initialize again } catch (Exception ex) { log.severe("could not propertly set up the database: " + ex.getMessage()); } } public void tearDown() { try { for (int i=0; i < 3; i++) { this.queues[i].clear(); this.queues[i].shutdown(); } } catch (Exception ex) { log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp"); } } public void testConfig() { String queueType = "CACHE"; try { config(20L, 10L, 500L, 200L); } catch (XmlBlasterException ex) { fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType); ex.printStackTrace(); } } public StorageId config(long maxEntries, long maxEntriesCache, long maxBytes, long maxBytesCache) throws XmlBlasterException { // set up the queues .... QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); prop.setMaxEntries(maxEntries); prop.setMaxEntriesCache(maxEntriesCache); prop.setMaxBytes(maxBytes); prop.setMaxBytesCache(maxBytesCache); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/config"); // this.queue = new CacheQueueInterceptorPlugin(); this.queue.initialize(queueId, prop); long persistentSize = this.queue.getPersistentQueue().getMaxNumOfBytes(); long persistentMsg = this.queue.getPersistentQueue().getMaxNumOfEntries(); long transientSize = this.queue.getTransientQueue().getMaxNumOfBytes(); long transientMsg = this.queue.getTransientQueue().getMaxNumOfEntries(); assertEquals("Wrong persistent size", maxBytes, persistentSize); assertEquals("Wrong persistent num of msg", maxEntries, persistentMsg); if (maxBytesCache != transientSize) log.severe("ERROR: Wrong transient size" + this.queue.getTransientQueue().toXml("")); assertEquals("Wrong transient size" + this.queue.getTransientQueue().toXml(""), maxBytesCache, transientSize); assertEquals("Wrong num of transient msg", maxEntriesCache, transientMsg); return queueId; } public void testClearWithSwappedEntries() { String queueType = "CACHE"; try { StorageId id = config(20L, 3L, 500L, 100L); PriorityEnum prio = PriorityEnum.toPriorityEnum(5); for (int i=0; i < 15; i++) { boolean persistent = (i | 1) == 0; // some persistent and some transient long entrySize = 10L; DummyEntry entry = new DummyEntry(glob, prio, id, entrySize, persistent); this.queue.put(entry, true); } long ret = this.queue.clear(); assertEquals("wrong number of entries returned by clear", 15L, ret); long numOfEntries = this.queue.getNumOfEntries(); long numOfBytes = this.queue.getNumOfBytes(); assertEquals("the queue should be empty", 0L, numOfEntries); assertEquals("the size of the queue should be 0", 0L, numOfBytes); } catch (XmlBlasterException ex) { fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType); ex.printStackTrace(); } } public void testPutPeekRemove() { String queueType = this.glob.getProperty().get("queueType", "CACHE"); log.info("testPutPeekRemove will be done with a queue of type '" + queueType + "'"); log.info("if you want to test with another queue type invoke '-queueType $TYPE' on the cmd line where $TYPE is either RAM JDBC or CACHE"); int index = 2; if ("RAM".equalsIgnoreCase(queueType)) index = 0; else if ("JDBC".equalsIgnoreCase(queueType)) index = 1; try { putPeekRemove(this.queues[index]); } catch (XmlBlasterException ex) { fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType); ex.printStackTrace(); } } public void putPeekRemove(I_Queue refQueue) throws XmlBlasterException { // set up the queues .... // every content is 80 bytes which gives an entry size of 100 bytes (80+20) long entrySize = 100; String lastSuccessfulLocation = ""; long maxNumOfBytesCache[] = {700L, 10000L}; long maxNumOfBytes[] = {700L, 50000L}; int numOfTransientEntries[] = { 2, 50, 200}; int numOfPersistentEntries[] = { 0, 2, 50, 200};// int numPrio[] = { 1, 5, 9};// int it=0, id=0, ic=0, is=0;// try { for (int ic=0; ic < maxNumOfBytesCache.length; ic++) { for (int is=0; is < maxNumOfBytes.length; is++) { log.info("**** TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is]); // a new queue each time here ... QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); prop.setMaxEntries(2000L); prop.setMaxEntriesCache(1000L); prop.setMaxBytes(maxNumOfBytes[is]); prop.setMaxBytesCache(maxNumOfBytesCache[ic]); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes[is] + "/ram" + maxNumOfBytesCache[ic]);// this.queue = new CacheQueueInterceptorPlugin(); refQueue.clear(); refQueue.shutdown();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -