⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testpriorizeddispatchplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                  if (expectsNotify) {                     assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification                  }               }               else if (action.startsWith("destroy")) {                  publish(msgOid, priority);                  destroyCounter++;                  assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));                  int count = expectsNotify ? 1 : 0;                  assertEquals(text, count, this.update.count());                  if (expectsNotify) {                     assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification                  }               }               else {                  log.severe(text + ": Action is not supported");                  fail(text + ": Action is not supported");               }               this.update.clear();            } // for prio         } // for states         text = "Checking ascending sequence of flushed " + queueCounter + " messages which where hold back";         this.update.clear();         changeStatus(statusOid, NORMAL_LINE);         assertEquals(text, queueCounter, this.update.waitOnUpdate(2000L, msgOid, Constants.STATE_OK));         assertEquals(text, queueCounter, this.update.count());         Msg[] msgArr = this.update.getMsgs();         assertEquals(text, queueCounter, msgArr.length);         int lastNum = -1;         int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;         for (int i=0; i<msgArr.length; i++) {            log.info("Received flushed hold back message " + msgArr[i].getUpdateKey().getOid() +                          " priority=" + msgArr[i].getUpdateQos().getPriority() +                         " content=" + msgArr[i].getContentStr() +                         " state=" + msgArr[i].getUpdateQos().getState());         }         for (int i=0; i<msgArr.length; i++) {            int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();            int currNum = msgArr[i].getContentInt();            if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)               fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);            lastNum = currNum;            lastPrio = currPrio;         }         this.update.clear();      }      catch (XmlBlasterException e) {         fail(e.toString());      }      log.info("Success in testPriorizedDispatchPlugin()");   }   /**    * Tests to change the plugin configuration and different status message oids.     */   public void testPriorizedDispatchPluginReconfigure() {      log.info("testPriorizedDispatchPluginReconfigure() ...");      String statusOid2 = statusOid+"-2";      String config =             "<msgDispatch defaultStatus='GO' defaultAction='send'>\n"+            "  <onStatus oid='" + statusOid + "' content='GO' defaultAction='send'>\n" +            "    <action do='send'  ifPriority='0-9'/>\n" +            "  </onStatus>\n" +            "  <onStatus oid='" + statusOid2 + "' content='" + BACKUP_LINE + "' defaultAction='send'>\n" +            "     <action do='queue'  ifPriority='0-9'/>\n" +            "  </onStatus>\n" +            "</msgDispatch>\n";      publishNewConfig(config);      String text = "Testing configuration";      long sleep = 2000L;      //try {         subscribe(msgOid);         int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;         // check normal operation         changeStatus(statusOid, "GO");         for (int priority=0; priority < maxPrio; priority++) {            publish(msgOid, priority);         }         assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));         log.info("SUCCESS, state=GO");         this.update.clear();         // queue messages         changeStatus(statusOid2, BACKUP_LINE);         for (int priority=0; priority < maxPrio; priority++) {            publish(msgOid, priority);         }         assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));         log.info("SUCCESS, state=" + BACKUP_LINE);         this.update.clear();         // flush the before queued messages         changeStatus(statusOid, "GO");         assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));         log.info("SUCCESS, state=GO");         this.update.clear();         // check unkown message content         changeStatus(statusOid, "??YYXX");         for (int priority=0; priority < maxPrio; priority++) {            publish(msgOid, priority);         }         assertEquals(text, maxPrio, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));         log.info("SUCCESS, state=GO");         this.update.clear();         /*      }      catch (XmlBlasterException e) {         fail(e.toString());      }    */      log.info("Success in testPriorizedDispatchPluginReconfigure()");   }   /**    * Change the configuration of the plugin    */   private void publishNewConfig(String config) {      String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // "PriorizedDispatchPlugin/config"      try {         String oid = "__cmd:sysprop/?" + configKey;         String contentStr = config;         PublishQos pq = new PublishQos(glob);         PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml()));         log.info("SUCCESS publish new configuration '" + oid + "' returned state=" + rq.getState());         assertEquals("Returned oid wrong", oid, rq.getKeyOid());         assertEquals("Return not OK", Constants.STATE_OK, rq.getState());      } catch(XmlBlasterException e) {         log.warning("XmlBlasterException: " + e.toString());         fail("publish of configuration data - XmlBlasterException: " + e.getMessage());      }   }   /**    * Test the notifySender message    * 1. subscribe to a message    * 2. change state to 64k    * 3. send a message with prio 6 which should trigger a notify PtP message    */   public void testPriorizedDispatchPluginOne() {      log.info("testPriorizedDispatchPluginOne() ...");      long sleep = 2000L;      String text = "state=" + BACKUP_LINE + " action=queue,notifySender";      // <action do='queue,notifySender'  ifPriority='6'/>      subscribe(msgOid);      changeStatus(statusOid, BACKUP_LINE);      try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Wait some time      int priority = 6;      log.info(text + ": Expecting notify");      this.update.clear();      publish(msgOid, priority);      assertEquals(text, 0, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));      assertEquals(text, 1, this.update.count());      assertEquals(text, "_PriorizedDispatchPlugin", this.update.getMsgs()[0].getUpdateQos().getSender().getLoginName()); // PtP notification      this.update.clear();      changeStatus(statusOid, NORMAL_LINE);      log.info(text + ": Expecting queued message");      assertEquals(text, 1, this.update.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));      log.info("Success in testPriorizedDispatchPluginOne()");   }   /**    * Tears down the fixture.    * <p />    * cleaning up .... erase() the previous message OID and logout    */   protected void tearDown() {      try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time      this.con.disconnect(null);      this.con = null;      if (this.startEmbedded) {         try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time         EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);         this.serverThread = null;      }      // reset to default server port (necessary if other tests follow in the same JVM).      Util.resetPorts(glob);      this.glob = null;           this.con = null;      this.update = null;      Global.instance().shutdown();   }   /**    * Method is used by TestRunner to load these tests    */   public static Test suite() {       TestSuite suite= new TestSuite();       suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginOne", "PriorizedDispatchPluginOne"));       suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPlugin", "PriorizedDispatchPlugin"));       suite.addTest(new TestPriorizedDispatchPlugin(Global.instance(), "testPriorizedDispatchPluginReconfigure", "PriorizedDispatchPluginRecovery"));       return suite;   }   /**    * Invoke:     * <pre>    *  java org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin  -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST    *  java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchPlugin    * <pre>    */   public static void main(String args[]) {      Global glob = new Global();      if (glob.init(args) != 0) {         System.exit(0);      }      TestPriorizedDispatchPlugin testSub = new TestPriorizedDispatchPlugin(glob, "TestPriorizedDispatchPlugin", "TestPriorizedDispatchPlugin");      testSub.setUp();      testSub.testPriorizedDispatchPlugin();      //testSub.testPriorizedDispatchPluginReconfigure();      //testSub.testPriorizedDispatchPluginOne();      testSub.tearDown();   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -