📄 testpriorizeddispatchplugin.java
字号:
/*------------------------------------------------------------------------------Name: TestPriorizedDispatchPlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.test.dispatch;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.EmbeddedXmlBlaster;import org.xmlBlaster.test.Util;import org.xmlBlaster.test.Msg;import org.xmlBlaster.test.MsgInterceptor;import junit.framework.*;/** * This client tests the * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">dispatch.control.plugin requirement</a> * <p /> * We start our own xmlBlaster server in a thread. * This client may be invoked multiple time on the same xmlBlaster server, * as it cleans up everything after his tests are done. * <p> * Invoke examples:<br /> * <pre> * java junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin * </pre> * @see org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin */public class TestPriorizedDispatchPlugin extends TestCase { private Global glob; private static Logger log = Logger.getLogger(TestPriorizedDispatchPlugin.class.getName()); private I_XmlBlasterAccess con = null; private String name; private String passwd = "secret"; private EmbeddedXmlBlaster serverThread; private int serverPort = 9560; private boolean startEmbedded = true; private MsgInterceptor update; // collects updated messages private final String msgOid = "dispatchTestMessage"; private int msgSequenceNumber = 0; private String statusOid = "_bandwidth.status"; private String NORMAL_LINE = "2M"; private String BACKUP_LINE = "64k"; private String DEAD_LINE = "DOWN"; private String[] states = { NORMAL_LINE, BACKUP_LINE, DEAD_LINE }; private String[][] expectedActions = { {"send", "send", "send", "send", "send", "send", "send", "send", "send", "send"}, {"destroy", "destroy", "destroy", "destroy", "queue", "queue", "queue,notifySender", "send", "send", "send"}, {"destroy", "destroy", "destroy", "destroy", "queue", "queue", "queue", "queue", "queue", "queue"} }; /** * Constructs the TestPriorizedDispatchPlugin object. * <p /> * @param testName The name used in the test suite * @param name The name to login to the xmlBlaster */ public TestPriorizedDispatchPlugin(Global glob, String testName, String name) { super(testName); this.glob = glob; this.name = name; } /** * Sets up the fixture. * <p /> * We start an own xmlBlaster server in a separate thread, * it is configured to load our demo dispatch plugin. * <p /> * Then we connect as a client */ protected void setUp() { //Global embeddedGlobal = glob.getClone(null); this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded); // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin String[] args = { "-DispatchPlugin[Priority][1.0]", "org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin", "-DispatchPlugin/defaultPlugin", "undef", "-PriorizedDispatchPlugin/user", "_PriorizedDispatchPlugin", "-"+org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY, //"-PriorizedDispatchPlugin/config", "<msgDispatch defaultStatus='" + BACKUP_LINE + "' defaultAction='send'>\n"+ " <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" + //" <action do='send' ifPriority='0-9'/>\n" + " </onStatus>\n" + " <onStatus oid='" + statusOid + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" + " <action do='send' ifPriority='7'/>\n" + " <action do='queue,notifySender' ifPriority='6'/>\n" + " <action do='queue' ifPriority='4-5'/>\n" + " <action do='destroy' ifPriority='0-3'/>\n" + " </onStatus>\n" + " <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue'>\n" + " <action do='destroy' ifPriority='0-3'/>\n" + " </onStatus>\n" + "</msgDispatch>\n" }; glob.init(args); if (this.startEmbedded) { glob.init(Util.getOtherServerPorts(serverPort)); serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob); log.info("XmlBlaster is ready for testing the priority dispatch plugin"); } try { log.info("Connecting ..."); this.con = glob.getXmlBlasterAccess(); // Activate plugin for callback only: ConnectQos qos = new ConnectQos(glob, name, passwd); CallbackAddress cbAddress = new CallbackAddress(glob); cbAddress.setDispatchPlugin("Priority,1.0"); qos.addCallbackAddress(cbAddress); this.update = new MsgInterceptor(glob, log, null); this.con.connect(qos, update); } catch (Exception e) { Thread.dumpStack(); log.severe("Can't connect to xmlBlaster: " + e.toString()); } this.update.clear(); } /** * @param The oid of the status message * @param state Choose one of "2M" or "64k" */ private void changeStatus(String oid, String state) { log.info("Changing band width state to '" + state + "'"); try { PublishReturnQos rq = con.publish(new MsgUnit(glob, "<key oid='" + oid + "'/>", state, null)); log.info("SUCCESS for state change to '" + state + "', " + rq.getState()); // Sleep to be shure the plugin has got and processed the message try { Thread.sleep(1000L); } catch( InterruptedException i) {} } catch(XmlBlasterException e) { log.warning("XmlBlasterException: " + e.getMessage()); fail("publish bandwidth state - XmlBlasterException: " + e.getMessage()); } } private void publish(String oid, int priority) { PriorityEnum prio = PriorityEnum.toPriorityEnum(priority); try { msgSequenceNumber++; String content = "" + msgSequenceNumber; PublishQos pq = new PublishQos(glob); pq.setPriority(prio); PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", content.getBytes(), pq.toXml())); log.info("SUCCESS publish '" + oid + "' with prio=" + prio.toString() + " content=" + content + " returned state=" + rq.getState()); assertEquals("Returned oid wrong", oid, rq.getKeyOid()); assertEquals("Return not OK", Constants.STATE_OK, rq.getState()); } catch(XmlBlasterException e) { log.warning("XmlBlasterException: " + e.getMessage()); fail("publish prio=" + prio.toString() + " - XmlBlasterException: " + e.getMessage()); } } private void subscribe(String oid) { try { SubscribeKey sk = new SubscribeKey(glob, oid); SubscribeQos sq = new SubscribeQos(glob); SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml()); log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState()); } catch(XmlBlasterException e) { log.warning("XmlBlasterException: " + e.getMessage()); fail("subscribe - XmlBlasterException: " + e.getMessage()); } } /** * Test all tuples of possibilities */ public void testPriorizedDispatchPlugin() { log.info("testPriorizedDispatchPlugin() ..."); long sleep = 1000L; String text; subscribe(msgOid); int queueCounter = 0; int destroyCounter = 0; try { for (int i=0; i<states.length; i++) { changeStatus(statusOid, states[i]); log.info("========================state=" + states[i]); for (int priority=0; priority<expectedActions[i].length; priority++) { String action = expectedActions[i][priority]; text = "state=" + states[i] + " action=" + action; log.info("Doing " + text + " queueCounter=" + queueCounter); boolean expectsNotify = false; if (action.indexOf("notifySender") >= 0) { expectsNotify = true; log.info(text + ": Expecting notify"); } if (action.startsWith("send")) { publish(msgOid, priority); assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); int count = expectsNotify ? 2 : 1; assertEquals(text, count, this.update.count()); if (expectsNotify) { String expectedState = "send,notifySender"; Msg msg = this.update.getMsg(msgOid, expectedState); // PtP notification assertTrue("send,notifySender PtP not arrived", msg != null); } } else if (action.startsWith("queue")) { publish(msgOid, priority); queueCounter++; assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK)); int count = expectsNotify ? 1 : 0; assertEquals(text, count, this.update.count());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -