📄 testpriorizeddispatchwithlostcallback.java
字号:
try { SubscribeKey sk = new SubscribeKey(glob, oid); SubscribeQos sq = new SubscribeQos(glob); SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml()); log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState()); } catch(XmlBlasterException e) { log.warning("XmlBlasterException: " + e.getMessage()); fail("subscribe - XmlBlasterException: " + e.getMessage()); } } /** * Tests what happens if dispatcher frameworks looses the callback connection to us * and starts polling */ public void testPriorizedDispatchPluginConnectionState() { log.info("testPriorizedDispatchPluginConnectionState() ..."); String config = "<msgDispatch defaultStatus='" + NORMAL_LINE + "' defaultAction='send'>\n"+ " <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" + " <action do='send' ifPriority='0-9'/>\n" + " </onStatus>\n" + " <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue' connectionState='polling'>\n" + " <action do='queue' ifPriority='4-9'/>\n" + " <action do='destroy' ifPriority='0-3'/>\n" + " </onStatus>\n" + "</msgDispatch>\n"; publishNewConfig(config); String text = "Testing configuration"; long sleep = 2000L; subscribe(msgOid); int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1; // First check normal operation //changeStatus(statusOid, NORMAL_LINE); publish(msgOid, 1); assertEquals(text, 1, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); this.updateInterceptor.clear(); // Now kill our callback server log.info("Shutdown callback, expecting messages to be queued or destroyed depending on the priority"); try { con.getCbServer().shutdown(); } catch (XmlBlasterException e) { fail("ShutdownCB: " + e.getMessage()); } this.updateInterceptor.clear(); // These messages are depending on the priority queued or destroyed // as the callback connection is polling ... for (int priority=0; priority < maxPrio; priority++) { publish(msgOid, priority); } assertEquals(text, 0, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); this.updateInterceptor.clear(); // Now reestablish the callback server ... I_CallbackServer cbServer = null; try { updateMsgs = new MsgInterceptor(glob, log, null); // just used as message container this.updateInterceptor.clear(); try { cbServer = new XmlRpcCallbackServer(); CallbackAddress cbAddress = new CallbackAddress(glob); cbServer.initialize(this.glob, name, cbAddress, new AbstractCallbackExtended(glob) { public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException { try { String contentStr = new String(content); String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr; log.info("Receiving update of a message oid=" + updateKey.getOid() + " priority=" + updateQos.getPriority() + " state=" + updateQos.getState() + " content=" + cont); if (!updateQos.isErased()) { updateMsgs.add(new Msg(cbSessionId, updateKey, content, updateQos)); } } catch (Throwable e) { log.severe("Error in update method: " + e.toString()); e.printStackTrace(); } return ""; } public I_ClientPlugin getSecurityPlugin() { return null; } }); // Establish new callback server } catch (Throwable e) { log.severe("Can't restart callback server: " + e.toString()); fail("Can't restart callback server: " + e.toString()); } log.info("Waiting long enough that xmlBlaster reconnected to us and expecting the 6 queued messages ..."); try { Thread.sleep(3000L); } catch( InterruptedException i) {} assertEquals(text, 0, this.updateInterceptor.getMsgs().length); assertEquals(text, 6, updateMsgs.getMsgs(msgOid, Constants.STATE_OK).length); Msg[] msgArr = updateMsgs.getMsgs(); assertEquals(text, 6, msgArr.length); int lastNum = -1; int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1; for (int i=0; i<msgArr.length; i++) { int currPrio = msgArr[i].getUpdateQos().getPriority().getInt(); int currNum = msgArr[i].getContentInt(); if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum) fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum); lastNum = currNum; lastPrio = currPrio; } assertEquals("", PriorityEnum.MAX_PRIORITY, msgArr[0].getUpdateQos().getPriority()); assertEquals("", 4, msgArr[5].getUpdateQos().getPriority().getInt()); updateMsgs.clear(); this.updateInterceptor.clear(); } finally { if (cbServer != null) { try { cbServer.shutdown(); } catch (Exception e) { log.severe(e.toString()); }; } } log.info("Success in testPriorizedDispatchPluginConnectionState()"); } /** * Tears down the fixture. * <p /> * cleaning up .... erase() the previous message OID and logout */ protected void tearDown() { try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time con.disconnect(null); con = null; try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread); this.serverThread = null; // reset to default server port (necessary if other tests follow in the same JVM). Util.resetPorts(glob); this.glob = null; this.connectQos = null; this.con = null; this.updateInterceptor = null; this.updateMsgs = null; Global.instance().shutdown(); } /** * Method is used by TestRunner to load these tests */ public static Test suite() { TestSuite suite= new TestSuite(); suite.addTest(new TestPriorizedDispatchWithLostCallback(Global.instance(), "testPriorizedDispatchPluginConnectionState", "PriorizedDispatchPluginRecovery")); return suite; } /** * Invoke: * <pre> * java org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback * <pre> */ public static void main(String args[]) { Global glob = new Global(); if (glob.init(args) != 0) { System.exit(0); } TestPriorizedDispatchWithLostCallback testSub = new TestPriorizedDispatchWithLostCallback(glob, "TestPriorizedDispatchWithLostCallback", "TestPriorizedDispatchWithLostCallback"); testSub.setUp(); testSub.testPriorizedDispatchPluginConnectionState(); testSub.tearDown(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -