📄 i_queuetest.java
字号:
//------------------------------------ public void testSize1() { String queueType = "unknown"; try { QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); int max = 1; prop.setMaxEntries(max); prop.setMaxEntriesCache(max); queueType = this.queue.toString(); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/size1"); this.queue.initialize(queueId, prop); queue.clear(); assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries()); assertEquals(ME, 1L, queue.getMaxNumOfEntries()); size1(this.queue); } catch (XmlBlasterException ex) { fail("Exception when testing Size1 probably due to failed initialization of the queue of type " + queueType); } } /** * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear() */ private void size1(I_Queue queue) { this.queue = queue; ME = "I_QueueTest.size1(" + queue.getStorageId() + ")[" + this.queue.getClass().getName() + "]"; System.out.println("***" + ME); int maxEntries = (int)queue.getMaxNumOfEntries(); try { //========== Test 1: put(I_QueueEntry[]) int numLoop = 10; ArrayList list = new ArrayList(); //========== Test 2: put(I_QueueEntry) this.queue.removeStorageSizeListener(null); this.queue.addStorageSizeListener(this.queueSizeListener); this.queueSizeListener.clear(); for (int ii=0; ii<numLoop; ii++) { DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true); try { queue.put(queueEntry, false); assertEquals("number of entries incremented on last invocation", 1, this.queueSizeListener.getLastIncrementEntries()); assertEquals("number of bytes incremented on last invocation", queueEntry.getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes()); if (ii > maxEntries) { // queue allows on overload fail("Didn't expect more than " + maxEntries + " entries" + queue.toXml("")); } else list.add(queueEntry); } catch (XmlBlasterException e) { if (ii <= maxEntries) { fail("Didn't expect exception" + e.getMessage()); } } } assertEquals("number of invocations for queue size listener is wrong", maxEntries+1, this.queueSizeListener.getCount()); // The queues allow temporary oversize (one extra put()) assertEquals(ME+": Wrong total size " + queue.toXml(""), maxEntries+1, queue.getNumOfEntries()); this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue); log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue"); ArrayList entryList = null; try { entryList = queue.peekLowest(1, -1L, null, false); assertEquals("PEEK #1 failed"+queue.toXml(""), 1, entryList.size()); log.info("curr entries="+queue.getNumOfEntries()); } catch (XmlBlasterException e) { if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e; } //this.queue.removeStorageSizeListener(null); //this.queue.addStorageSizeListener(this.queueSizeListener); //this.queueSizeListener.clear(); entryList = queue.takeLowest(1, -1L, null, false); long singleSize = ((I_QueueEntry)entryList.get(0)).getSizeInBytes(); assertEquals("TAKE #1 failed"+queue.toXml(""), 1, entryList.size()); log.info("curr entries="+queue.getNumOfEntries()); assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries()); assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes()); entryList = queue.takeLowest(1, -1L, null, false); assertEquals("TAKE #2 failed"+queue.toXml(""), 1, entryList.size()); assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries()); assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes()); queue.clear(); assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries()); System.out.println("***" + ME + " [SUCCESS]"); queue.shutdown(); queue = null; } catch(XmlBlasterException e) { fail(ME + ": Exception thrown: " + e.getMessage()); } log.info("SUCCESS"); }//------------------------------------ public void testPutMsg() { String queueType = "unknown"; try { QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); queueType = this.queue.toString(); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/putMsg"); this.queue.initialize(queueId, prop); queue.clear(); assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries()); putMsg(this.queue); } catch (XmlBlasterException ex) { fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType); } } /** * @see checkSizeAndEntries(String, I_QueueEntry[], I_Queue) */ private void checkSizeAndEntries(String txt, ArrayList queueEntries, I_Queue queue) { checkSizeAndEntries(txt, (I_QueueEntry[])queueEntries.toArray(new I_QueueEntry[queueEntries.size()]), queue); } /** * Helper method to do a generic size check (size and number of entries) */ private void checkSizeAndEntries(String txt, I_QueueEntry[] queueEntries, I_Queue queue) { long sizeOfTransients = 0L; long numOfPersistents = 0; long numOfTransients = 0; long sizeOfPersistents = 0L; for (int i=0; i < queueEntries.length; i++) { I_QueueEntry entry = queueEntries[i]; if (entry.isPersistent()) { sizeOfPersistents += entry.getSizeInBytes(); numOfPersistents++; } else { sizeOfTransients += entry.getSizeInBytes(); numOfTransients++; } } long queueNumOfPersistents = queue.getNumOfPersistentEntries(); long queueNumOfTransients = queue.getNumOfEntries() - queueNumOfPersistents; long queueSizeOfPersistents = queue.getNumOfPersistentBytes(); long queueSizeOfTransients = queue.getNumOfBytes() - queueSizeOfPersistents; txt += " NumPersistents=" + queueNumOfPersistents + " NumOfTransients=" + queueNumOfTransients; txt += " SizeOfPersistents=" + queueSizeOfPersistents + " SizeOfTransients=" + queueSizeOfTransients; assertEquals(ME + ": " + txt + " wrong number of persistents ", numOfPersistents, queueNumOfPersistents); assertEquals(ME + ": " + txt + " wrong number of transients ", numOfTransients, queueNumOfTransients); assertEquals(ME + ": " + txt + " wrong size of persistents ", sizeOfPersistents, queueSizeOfPersistents); assertEquals(ME + ": " + txt + " wrong size of transients ", sizeOfTransients, queueSizeOfTransients); } /** * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear() */ private void putMsg(I_Queue queue) { ME = "I_QueueTest.putMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]"; System.out.println("***" + ME); try { //========== Test 1: put(I_QueueEntry[]) int numLoop = 10; ArrayList list = new ArrayList(); this.queue.removeStorageSizeListener(null); this.queue.addStorageSizeListener(this.queueSizeListener); this.queueSizeListener.clear(); for (int ii=0; ii<numLoop; ii++) { DummyEntry[] queueEntries = { new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)}; queue.put(queueEntries, false); assertEquals("number of entries incremented on last invocation", 3, this.queueSizeListener.getLastIncrementEntries()); assertEquals("number of bytes incremented on last invocation", 3*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes()); for (int i=0; i < 3; i++) list.add(queueEntries[i]); this.checkSizeAndEntries(" put(I_QueueEntry[]) ", list, queue); assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, queue.getNumOfEntries()); } assertEquals("number of invocations for queue size listener is wrong", numLoop, this.queueSizeListener.getCount()); int total = numLoop*3; assertEquals(ME+": Wrong total size", total, queue.getNumOfEntries()); log.info("#1 Success, filled " + queue.getNumOfEntries() + " messages into queue"); //========== Test 2: put(I_QueueEntry) for (int ii=0; ii<numLoop; ii++) { DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true); list.add(queueEntry); queue.put(queueEntry, false); } assertEquals(ME+": Wrong total size", numLoop+total, queue.getNumOfEntries()); this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue); log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue"); queue.clear(); assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries()); System.out.println("***" + ME + " [SUCCESS]"); queue.shutdown(); queue = null; } catch(XmlBlasterException e) { fail(ME + ": Exception thrown: " + e.getMessage()); } }// ------------------------------------ public void testPeekMsg() { String queueType = "unknown"; try { QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); queueType = this.queue.toString(); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg"); this.queue.initialize(queueId, prop); queue.clear(); assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries()); peekMsg(this.queue); } catch (XmlBlasterException ex) { log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType); } }// ------------------------------------ public void testPeekMsgBlocking() { String queueType = "unknown"; try { QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); queueType = this.queue.toString(); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg"); this.queue.initialize(queueId, prop); queue.clear(); assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries()); // fill the queue: DummyEntry[] queueEntries = { new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true) }; queue.put(queueEntries, false); BlockingQueueWrapper wrapper = new BlockingQueueWrapper(200L); wrapper.init(queue); int numOfEntries = 2; ArrayList ret = wrapper.blockingPeek(numOfEntries, 1000L); assertEquals("Wrong number of entries found", 2, ret.size());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -