📄 testpriorizeddispatchplugin.java
字号:
if (expectsNotify) { assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification } } else if (action.startsWith("destroy")) { publish(msgOid, priority); destroyCounter++; assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); int count = expectsNotify ? 1 : 0; assertEquals(text, count, this.update.count()); if (expectsNotify) { assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification } } else { log.severe(text + ": Action is not supported"); fail(text + ": Action is not supported"); } this.update.clear(); } // for prio } // for states text = "Checking ascending sequence of flushed " + queueCounter + " messages which where hold back"; this.update.clear(); changeStatus(statusOid, NORMAL_LINE); assertEquals(text, queueCounter, this.update.waitOnUpdate(2000L, msgOid, Constants.STATE_OK)); assertEquals(text, queueCounter, this.update.count()); Msg[] msgArr = this.update.getMsgs(); assertEquals(text, queueCounter, msgArr.length); int lastNum = -1; int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1; for (int i=0; i<msgArr.length; i++) { log.info("Received flushed hold back message " + msgArr[i].getUpdateKey().getOid() + " priority=" + msgArr[i].getUpdateQos().getPriority() + " content=" + msgArr[i].getContentStr() + " state=" + msgArr[i].getUpdateQos().getState()); } for (int i=0; i<msgArr.length; i++) { int currPrio = msgArr[i].getUpdateQos().getPriority().getInt(); int currNum = msgArr[i].getContentInt(); if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum) fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum); lastNum = currNum; lastPrio = currPrio; } this.update.clear(); } catch (XmlBlasterException e) { fail(e.toString()); } log.info("Success in testPriorizedDispatchPlugin()"); } /** * Tests to change the plugin configuration and different status message oids. */ public void testPriorizedDispatchPluginReconfigure() { log.info("testPriorizedDispatchPluginReconfigure() ..."); String statusOid2 = statusOid+"-2"; String config = "<msgDispatch defaultStatus='GO' defaultAction='send'>\n"+ " <onStatus oid='" + statusOid + "' content='GO' defaultAction='send'>\n" + " <action do='send' ifPriority='0-9'/>\n" + " </onStatus>\n" + " <onStatus oid='" + statusOid2 + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" + " <action do='queue' ifPriority='0-9'/>\n" + " </onStatus>\n" + "</msgDispatch>\n"; publishNewConfig(config); String text = "Testing configuration"; long sleep = 2000L; //try { subscribe(msgOid); int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1; // check normal operation changeStatus(statusOid, "GO"); for (int priority=0; priority < maxPrio; priority++) { publish(msgOid, priority); } assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); log.info("SUCCESS, state=GO"); this.update.clear(); // queue messages changeStatus(statusOid2, BACKUP_LINE); for (int priority=0; priority < maxPrio; priority++) { publish(msgOid, priority); } assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); log.info("SUCCESS, state=" + BACKUP_LINE); this.update.clear(); // flush the before queued messages changeStatus(statusOid, "GO"); assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); log.info("SUCCESS, state=GO"); this.update.clear(); // check unkown message content changeStatus(statusOid, "??YYXX"); for (int priority=0; priority < maxPrio; priority++) { publish(msgOid, priority); } assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); log.info("SUCCESS, state=GO"); this.update.clear(); /* } catch (XmlBlasterException e) { fail(e.toString()); } */ log.info("Success in testPriorizedDispatchPluginReconfigure()"); } /** * Change the configuration of the plugin */ private void publishNewConfig(String config) { String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // "PriorizedDispatchPlugin/config" try { String oid = "__cmd:sysprop/?" + configKey; String contentStr = config; PublishQos pq = new PublishQos(glob); PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml())); log.info("SUCCESS publish new configuration '" + oid + "' returned state=" + rq.getState()); assertEquals("Returned oid wrong", oid, rq.getKeyOid()); assertEquals("Return not OK", Constants.STATE_OK, rq.getState()); } catch(XmlBlasterException e) { log.warning("XmlBlasterException: " + e.toString()); fail("publish of configuration data - XmlBlasterException: " + e.getMessage()); } } /** * Test the notifySender message * 1. subscribe to a message * 2. change state to 64k * 3. send a message with prio 6 which should trigger a notify PtP message */ public void testPriorizedDispatchPluginOne() { log.info("testPriorizedDispatchPluginOne() ..."); long sleep = 2000L; String text = "state=" + BACKUP_LINE + " action=queue,notifySender"; // <action do='queue,notifySender' ifPriority='6'/> subscribe(msgOid); changeStatus(statusOid, BACKUP_LINE); try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Wait some time int priority = 6; log.info(text + ": Expecting notify"); this.update.clear(); publish(msgOid, priority); assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); assertEquals(text, 1, this.update.count()); assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification this.update.clear(); changeStatus(statusOid, NORMAL_LINE); log.info(text + ": Expecting queued message"); assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); log.info("Success in testPriorizedDispatchPluginOne()"); } /** * Tears down the fixture. * <p /> * cleaning up .... erase() the previous message OID and logout */ protected void tearDown() { try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time this.con.disconnect(null); this.con = null; if (this.startEmbedded) { try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread); this.serverThread = null; } // reset to default server port (necessary if other tests follow in the same JVM). Util.resetPorts(glob); this.glob = null; this.con = null; this.update = null; Global.instance().shutdown(); } /** * Method is used by TestRunner to load these tests */ public static Test suite() { TestSuite suite= new TestSuite(); suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginOne", "PriorizedDispatchPluginOne")); suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPlugin", "PriorizedDispatchPlugin")); suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginReconfigure", "PriorizedDispatchPluginRecovery")); return suite; } /** * Invoke: * <pre> * java org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin * <pre> */ public static void main(String args[]) { Global glob = new Global(); if (glob.init(args) != 0) { System.exit(0); } TestPriorizedDispatchPlugin testSub = new TestPriorizedDispatchPlugin(glob, "TestPriorizedDispatchPlugin", "TestPriorizedDispatchPlugin"); testSub.setUp(); testSub.testPriorizedDispatchPlugin(); //testSub.testPriorizedDispatchPluginReconfigure(); //testSub.testPriorizedDispatchPluginOne(); testSub.tearDown(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -