📄 jdbcqueuetest.java
字号:
assertTrue(me + ": Timed out when waiting to regain the connection to the DB", success); log.info("successfully ended"); } public void testInitialEntries() { if (this.suppressTest) { log.severe("JDBC test is not driven as no database was found"); return; } try { initialEntries(); } catch (XmlBlasterException ex) { fail("Exception when testing InitialEntries probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() ); ex.printStackTrace(); } } public void initialEntries() throws XmlBlasterException { // set up the queues .... log.info("initialEntries test starts"); QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); cbProp.setMaxEntries(10000L); cbProp.setMaxBytes(200000L); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "initialEntries"); try { String type = PLUGIN_TYPES[this.count]; this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); QueuePluginManager pluginManager = new QueuePluginManager(glob); PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0"); java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters(); prop.put("tableNamePrefix", "TEST"); prop.put("entriesTableName", "_entries"); I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp); tmpQueue.clear(); // add some persistent entries and then shutdown ... DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true); tmpQueue.put(entry, false); entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true); tmpQueue.put(entry, false); entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true); tmpQueue.put(entry, false); tmpQueue.shutdown(); // to allow to initialize again I_Queue tmpQueue2 = pluginManager.getPlugin(pluginInfo, queueId, cbProp); long numOfEntries = tmpQueue2.getNumOfEntries(); assertEquals("Wrong number of entries in queue", 3L, numOfEntries); ArrayList lst = tmpQueue2.peek(-1, -1L); assertEquals("Wrong number of entries retrieved from queue", 3, lst.size()); queue.shutdown(); } catch (Exception ex) { log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'"); ex.printStackTrace(); assertTrue("exception occured when testing initialEntries", false); } log.info("initialEntries test successfully ended"); } public void testMultiplePut() { try { multiplePut(); } catch (XmlBlasterException ex) { fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() ); ex.printStackTrace(); } } public void multiplePut() throws XmlBlasterException { // set up the queues .... log.info("initialEntries test starts"); QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); cbProp.setMaxEntries(10000L); cbProp.setMaxBytes(200000L); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "initialEntries"); try { String type = PLUGIN_TYPES[this.count]; this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); QueuePluginManager pluginManager = new QueuePluginManager(glob); PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0"); java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters(); prop.put("tableNamePrefix", "TEST"); prop.put("entriesTableName", "_entries"); I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp); tmpQueue.clear(); // add some persistent entries and then shutdown ... int nmax = 1; int size = 100; for (int j=0; j < 4; j++) { DummyEntry[] entries = new DummyEntry[nmax]; for (int i=0; i < nmax; i++) { entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), size, true); } long time1 = System.currentTimeMillis(); tmpQueue.put(entries, false); long delta = System.currentTimeMillis() - time1; log.info("multiple put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry"); ArrayList list = tmpQueue.peek(-1, -1L); assertEquals("Wrong number of entries in queue", nmax, list.size()); time1 = System.currentTimeMillis(); tmpQueue.removeRandom(entries); delta = System.currentTimeMillis() - time1; log.info("multiple remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry"); tmpQueue.clear(); time1 = System.currentTimeMillis(); for (int i=0; i < nmax; i++) { tmpQueue.put(entries[i], false); } delta = System.currentTimeMillis() - time1; log.info("repeated single put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry"); time1 = System.currentTimeMillis(); for (int i=0; i < nmax; i++) tmpQueue.removeRandom(entries[i]); delta = System.currentTimeMillis() - time1; log.info("repeated single remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry"); nmax *= 10; } tmpQueue.shutdown(); // to allow to initialize again } catch (Exception ex) { log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'"); ex.printStackTrace(); assertTrue("exception occured when testing initialEntries", false); } log.info("initialEntries test successfully ended"); } public void testConnectionPool() { try { String me = ME + "-testConnectionPool"; log.info(" starting "); int numConn = 3; int maxWaitingThreads = 10; Global ownGlobal = this.glob.getClone(null); QueuePluginManager pluginManager = new QueuePluginManager(ownGlobal); PluginInfo pluginInfo = new PluginInfo(ownGlobal, pluginManager, "JDBC", "1.0"); pluginInfo.getParameters().put("connectionBusyTimeout", "10000"); pluginInfo.getParameters().put("maxWaitingThreads", "" + maxWaitingThreads); pluginInfo.getParameters().put("connectionPoolSize", "" + numConn); JdbcConnectionPool pool = new JdbcConnectionPool(); pool.initialize(ownGlobal, pluginInfo.getParameters()); Connection[] conn = new Connection[numConn]; for (int i=0; i < numConn; i++) { log.info(" getting connection " + i); conn[i] = pool.getConnection(); assertNotNull("The connection " + i + " shall not be null", conn[i]); } log.info(" getting extra connection"); Connection extraConn = null; try { extraConn = pool.getConnection(); assertTrue("An Exception should have occured here: ", false); } catch (Exception ex) { } // should wait 10 seconds and then return null assertNull("the extra connection should be null", extraConn); boolean success = true; pool.releaseConnection(conn[0], success); extraConn = pool.getConnection(); assertNotNull("the extra connection should not be null", extraConn); //pool.releaseConnection(extraConn); this.exceptionCount = 0; int expectedEx = 4; for (int i=0; i < maxWaitingThreads + expectedEx; i++) { ConnectionConsumer cc = new ConnectionConsumer(pool, i); } try { Thread.sleep(15000L); } catch (InterruptedException ex) { } assertEquals("Number of exceptions due to too many waiting threads is wrong", expectedEx, this.exceptionCount); log.info(" successfully ended "); } catch (Exception ex) { fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() ); ex.printStackTrace(); } } /** * Method is used by TestRunner to load these tests */ public static Test suite() { TestSuite suite= new TestSuite(); Global glob = new Global(); for (int i=0; i < PLUGIN_TYPES.length; i++) { suite.addTest(new JdbcQueueTest(glob, "testConnectionPool", i, true)); suite.addTest(new JdbcQueueTest(glob, "testMultiplePut", i, true)); suite.addTest(new JdbcQueueTest(glob, "testPutWithBreak", i, false)); suite.addTest(new JdbcQueueTest(glob, "testInitialEntries", i, true)); } return suite; } /** * <pre> * java org.xmlBlaster.test.classtest.queue.JdbcQueueTest * </pre> */ public static void main(String args[]) { Global glob = new Global(args); for (int i=0; i < PLUGIN_TYPES.length; i++) { JdbcQueueTest testSub = new JdbcQueueTest(glob, "JdbcQueueTest", i, true); testSub.setUp(); testSub.testConnectionPool(); testSub.tearDown(); testSub.setUp(); testSub.testMultiplePut(); testSub.tearDown(); testSub.setUp(); testSub.testPutWithBreak(); testSub.tearDown(); testSub.setUp(); testSub.testInitialEntries(); testSub.tearDown(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -