📄 teststreammessages.java
字号:
XBStreamingMessage msg = session.createStreamingMessage(null); if (doInterrupt) msg.setBooleanProperty("interrupted", true); msg.setIntProperty(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, maxChunkSize); msg.setStringProperty("nameOfTest", name); // to recognize it in '__sys__deadMessage' ByteArrayInputStream bais = new ByteArrayInputStream(content); msg.setInputStream(bais); producer.send(msg); } private byte[] createRandomContent(int size) { byte[] ret = new byte[size]; Random random = new Random(); random.nextBytes(ret); return ret; } public void testManyChunks() { int maxChunkSize = 128; byte[] content = createRandomContent(maxChunkSize*5 - 1); try { this.updateInterceptor.clear(); doPublish(content, maxChunkSize, false, "testManyChunks"); int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1); assertEquals("wrong number of updates when testing testManyChunks", 1, ret); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing testManyChunks", 1, msgs.length); assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length); assertTrue("", compareContent(content, msgs[0].getContent())); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } /** * This test is to check that we don't have a problem in the buffer of the Pipes due to * large chunks of messages. */ public void testManyBigChunks() { String name = "testManyBigChunks"; int maxChunkSize = 1000 * 1000; // since JMS implementation does not allow more byte[] content = createRandomContent(maxChunkSize*3 -1); try { this.updateInterceptor.clear(); doPublish(content, maxChunkSize, false, name); int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1); assertEquals("wrong number of updates when testing " + name, 1, ret); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing " + name, 1, msgs.length); assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length); assertTrue("", compareContent(content, msgs[0].getContent())); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } public void testManyChunksTwoMessages() { int maxChunkSize = 64; byte[] content = createRandomContent(maxChunkSize*5 - 1); try { this.updateInterceptor.clear(); doPublish(content, maxChunkSize, false, "testManyChunksTwoMessages1"); content = createRandomContent(maxChunkSize*5 - 1); doPublish(content, maxChunkSize, false, "testManyChunksTwoMessages2"); int ret = this.updateInterceptor.waitOnUpdate(this.delay, 2); assertEquals("wrong number of updates when testing testManyChunksTwoMessages", 2, ret); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing testManyChunksTwoMessages", 2, msgs.length); assertEquals("Wrong size of returned buffer", content.length, msgs[1].getContent().length); assertTrue("", compareContent(content, msgs[1].getContent())); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } public void testSingleChunk() { int maxChunkSize = 200; byte[] content = createRandomContent(maxChunkSize-10); try { this.updateInterceptor.clear(); doPublish(content, maxChunkSize, false, "testSingleChunk"); int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1); assertEquals("wrong number of updates when testing testSingleChunk", 1, ret); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length); assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length); assertTrue("", compareContent(content, msgs[0].getContent())); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } public void testException() { int maxChunkSize = 200; byte[] content = createRandomContent(900); try { this.updateInterceptor.clear(); doPublish(content, maxChunkSize, false, "testException"); int ret = this.updateInterceptor.waitOnUpdate(2000L, 1); assertEquals("wrong number of updates when testing testSingleChunk", 0, ret); this.updateInterceptor.clear(); ((XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess()).setCallbackDispatcherActive(true); ret = this.updateInterceptor.waitOnUpdate(this.delay, 1); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length); assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length); assertTrue("", compareContent(content, msgs[0].getContent())); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } public void testInterruptedRead() { int maxChunkSize = 256; byte[] content = createRandomContent(maxChunkSize*5-10); try { this.updateInterceptor.clear(); doPublish(content, maxChunkSize, true /* interruped */, "testSingleChunk"); int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1); assertEquals("wrong number of updates when testing testSingleChunk", 1, ret); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length); assertTrue("", content.length > msgs[0].getContent().length); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } public void testNormalMessage() { int maxChunkSize = 500; byte[] content = createRandomContent(maxChunkSize); try { this.updateInterceptor.clear(); this.connGlobal.getXmlBlasterAccess().publish(new MsgUnit(new PublishKey(this.connGlobal, this.oid), content, new PublishQos(this.connGlobal))); int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1); assertEquals("wrong number of updates when testing testSingleChunk", 1, ret); Msg[] msgs = this.updateInterceptor.getMsgs(); assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length); assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length); assertTrue("", compareContent(content, msgs[0].getContent())); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail(); } } private boolean compareContent(byte[] buf1, byte[] buf2) { if (buf1 == null && buf2 == null) return true; if (buf1 == null || buf2 == null) return false; if (buf1.length != buf2.length) return false; for (int i=0; i < buf1.length; i++) { if (buf1[i] != buf2[i]) return false; } return true; } /** * Invoke: java org.xmlBlaster.test.client.TestStreamMessages * <p /> * @deprecated Use the TestRunner from the testsuite to run it:<p /> * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestStreamMessages</pre> */ public static void main(String args[]) { Global global = new Global(); if (global.init(args) != 0) { System.out.println(ME + ": Init failed"); System.exit(1); } TestStreamMessages test = new TestStreamMessages(global); test.setUp(); test.testManyBigChunks(); test.tearDown(); test.setUp(); test.testManyChunks(); test.tearDown(); test.setUp(); test.testException(); test.tearDown(); test.setUp(); test.testSingleChunk(); test.tearDown(); test.setUp(); test.testInterruptedRead(); test.tearDown(); test.setUp(); test.testNormalMessage(); test.tearDown(); test.setUp(); test.testManyChunksTwoMessages(); test.tearDown(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -