📄 i_queuetest.java
字号:
package org.xmlBlaster.test.classtest.queue;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;import org.xmlBlaster.util.queue.BlockingQueueWrapper;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queuemsg.DummyEntry;import java.util.ArrayList;import junit.framework.*;import org.xmlBlaster.util.queue.QueuePluginManager;/** * Test RamQueuePlugin. * <p> * The sorting order is priority,timestamp: * </p> * <pre> * -> 5,100 - 5,98 - 5,50 - 9,3000 - 9,2500 -> * </pre> * <p> * As 9 is highest priority it is the first to be taken out.<br /> * As we need to maintain the timely sequence and * id is a timestamp in (more or less) nano seconds elapsed since 1970) * the id 2500 (it is older) has precedence to the id 3000 * </p> * <p> * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.I_QueueTest * </p> * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry) * @see org.xmlBlaster.util.queue.I_Queue * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin */public class I_QueueTest extends TestCase { class QueueSizeListener implements I_StorageSizeListener { private long lastNumEntries = 0L, lastNumBytes = 0L, lastIncrementEntries = 0L, lastIncrementBytes = 0L; private int count = 0; public long getLastIncrementEntries() { return this.lastIncrementEntries; } public long getLastIncrementBytes() { return this.lastIncrementBytes; } public int getCount() { return this.count; } public void clear() { this.lastNumEntries = 0L; this.lastNumBytes = 0L; this.lastIncrementEntries = 0L; this.lastIncrementBytes = 0L; this.count = 0; } public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) { this.lastIncrementEntries = numEntries - this.lastNumEntries; this.lastIncrementBytes = numBytes - this.lastNumBytes; this.lastNumEntries = numEntries; this.lastNumBytes = numBytes; this.count++; } } private String ME = "I_QueueTest"; protected Global glob; private static Logger log = Logger.getLogger(I_QueueTest.class.getName()); private I_Queue queue; private QueueSizeListener queueSizeListener = new QueueSizeListener(); static String[] PLUGIN_TYPES = { new String("RAM"), new String("JDBC"), new String("CACHE") };/* static I_Queue[] IMPL = { new org.xmlBlaster.util.queue.ram.RamQueuePlugin(), new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(), new org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin() };*/ public class QueuePutter extends Thread { private I_Queue queue; private long delay; private int numOfEntries; private boolean ignoreInterceptor; public QueuePutter(I_Queue queue, long delay, int numOfEntries, boolean ignoreInterceptor) { this.queue = queue; this.delay = delay; this.numOfEntries = numOfEntries; this.ignoreInterceptor = ignoreInterceptor; } public void run() { try { for (int i=0; i < this.numOfEntries; i++) { Thread.sleep(this.delay); DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true); this.queue.put(queueEntry, this.ignoreInterceptor); } } catch (Exception ex) { ex.printStackTrace(); } } } public I_QueueTest(String name, int currImpl, Global glob) { super(name);// this.queue = IMPL[currImpl]; //this.ME = "I_QueueTest[" + this.queue.getClass().getName() + "]"; if (glob == null) this.glob = Global.instance(); else this.glob = glob; try { String type = PLUGIN_TYPES[currImpl]; this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); QueuePluginManager pluginManager = new QueuePluginManager(glob); PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0"); java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters(); prop.put("tableNamePrefix", "TEST"); prop.put("entriesTableName", "_entries"); this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters()); pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0"); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId"); this.queue = pluginManager.getPlugin(pluginInfo, queueId, new CbQueueProperty(this.glob, Constants.RELATING_CALLBACK, this.glob.getStrippedId())); this.queue.shutdown(); // to allow to initialize again } catch (Exception ex) { log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'"); } } protected void setUp() { // cleaning up the database from previous runs ... QueuePropertyBase prop = null; try { prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId"); queue.initialize(queueId, prop); queue.clear(); queue.shutdown(); } catch (Exception ex) { log.severe("could not propertly set up the database: " + ex.getMessage()); } } /** * Tests QueuePropertyBase() and getStorageId() * @param queueTypeList A space separated list of names for the * implementations to be tested. Valid names are: * RamQueuePlugin JdbcQueuePlugin */ public void testConfig() { config(this.queue); } /** * Tests initialize(), getProperties(), setProperties() and capacity() * @param queue !!!Is not initialized in this case!!!! */ private void config(I_Queue queue) { ME = "I_QueueTest.config(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]"; System.out.println("***" + ME); QueuePropertyBase prop1 = null; QueuePropertyBase prop = null; try { // test initialize() prop1 = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); int max = 12; prop1.setMaxEntries(max); prop1.setMaxEntriesCache(max); assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries()); assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache()); //PluginInfo pluginInfo = new PluginInfo(glob, null, ""); //queue.init(glob, pluginInfo); // Init from pluginloader is first StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId"); queue.initialize(queueId, prop1); assertEquals(ME+": Wrong queue ID", queueId, queue.getStorageId()); try { prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, "/node/test"); prop.setMaxEntries(99); prop.setMaxEntriesCache(99); queue.setProperties(prop); } catch(XmlBlasterException e) { fail("Changing properties failed"); } } catch(XmlBlasterException e) { fail(ME + ": Exception thrown: " + e.getMessage()); } long len = prop.getMaxEntries(); assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), queue.getMaxNumOfEntries()); assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)queue.getProperties()).getMaxEntries()); assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries()); try { for (int ii=0; ii<len; ii++) { queue.put(new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), false); } assertEquals(ME+": Wrong total size", len, queue.getNumOfEntries()); try { DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true); queue.put(queueEntry, false); queue.put(queueEntry, false); fail("Did expect an exception on overflow"); } catch(XmlBlasterException e) { log.info("SUCCESS the exception is OK: " + e.getMessage()); } log.info("toXml() test:" + queue.toXml("")); log.info("usage() test:" + queue.usage()); assertEquals(ME+": should not be shutdown", false, queue.isShutdown()); queue.shutdown(); assertEquals(ME+": should be shutdown", true, queue.isShutdown()); log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue"); System.out.println("***" + ME + " [SUCCESS]"); queue.shutdown(); queue = null; } catch(XmlBlasterException e) { fail(ME + ": Exception thrown: " + e.getMessage()); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -