⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testpriorizeddispatchwithlostcallback.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      try {         SubscribeKey sk = new SubscribeKey(glob, oid);         SubscribeQos sq = new SubscribeQos(glob);         SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml());         log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState());      } catch(XmlBlasterException e) {         log.warning("XmlBlasterException: " + e.getMessage());         fail("subscribe - XmlBlasterException: " + e.getMessage());      }   }   /**    * Tests what happens if dispatcher frameworks looses the callback connection to us    * and starts polling    */   public void testPriorizedDispatchPluginConnectionState() {      log.info("testPriorizedDispatchPluginConnectionState() ...");      String config =             "<msgDispatch defaultStatus='" + NORMAL_LINE + "' defaultAction='send'>\n"+            "  <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" +            "    <action do='send'  ifPriority='0-9'/>\n" +            "  </onStatus>\n" +            "  <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue' connectionState='polling'>\n" +            "    <action do='queue'  ifPriority='4-9'/>\n" +            "    <action do='destroy'  ifPriority='0-3'/>\n" +            "  </onStatus>\n" +            "</msgDispatch>\n";      publishNewConfig(config);      String text = "Testing configuration";      long sleep = 2000L;      subscribe(msgOid);      int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;      // First check normal operation      //changeStatus(statusOid, NORMAL_LINE);      publish(msgOid, 1);      assertEquals(text, 1, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));      this.updateInterceptor.clear();      // Now kill our callback server      log.info("Shutdown callback, expecting messages to be queued or destroyed depending on the priority");      try {         con.getCbServer().shutdown();      }      catch (XmlBlasterException e) {         fail("ShutdownCB: " + e.getMessage());      }      this.updateInterceptor.clear();      // These messages are depending on the priority queued or destroyed      // as the callback connection is polling ...      for (int priority=0; priority < maxPrio; priority++) {         publish(msgOid, priority);      }      assertEquals(text, 0, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));      this.updateInterceptor.clear();      // Now reestablish the callback server ...      I_CallbackServer cbServer = null;      try {         updateMsgs = new MsgInterceptor(glob, log, null); // just used as message container         this.updateInterceptor.clear();         try {            cbServer = new XmlRpcCallbackServer();            CallbackAddress cbAddress = new CallbackAddress(glob);            cbServer.initialize(this.glob, name, cbAddress, new AbstractCallbackExtended(glob) {               public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {                  try {                     String contentStr = new String(content);                     String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;                     log.info("Receiving update of a message oid=" + updateKey.getOid() +                         " priority=" + updateQos.getPriority() +                         " state=" + updateQos.getState() +                         " content=" + cont);                     if (!updateQos.isErased()) {                        updateMsgs.add(new Msg(cbSessionId, updateKey, content, updateQos));                     }                  }                  catch (Throwable e) {                     log.severe("Error in update method: " + e.toString());                     e.printStackTrace();                  }                  return "";               }               public I_ClientPlugin getSecurityPlugin() { return null; }            }); // Establish new callback server         }         catch (Throwable e) {            log.severe("Can't restart callback server: " + e.toString());            fail("Can't restart callback server: " + e.toString());         }         log.info("Waiting long enough that xmlBlaster reconnected to us and expecting the 6 queued messages ...");         try { Thread.sleep(3000L); } catch( InterruptedException i) {}         assertEquals(text, 0, this.updateInterceptor.getMsgs().length);         assertEquals(text, 6, updateMsgs.getMsgs(msgOid, Constants.STATE_OK).length);         Msg[] msgArr = updateMsgs.getMsgs();         assertEquals(text, 6, msgArr.length);         int lastNum = -1;         int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;         for (int i=0; i<msgArr.length; i++) {            int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();            int currNum = msgArr[i].getContentInt();            if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)               fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);            lastNum = currNum;            lastPrio = currPrio;         }         assertEquals("", PriorityEnum.MAX_PRIORITY, msgArr[0].getUpdateQos().getPriority());         assertEquals("", 4, msgArr[5].getUpdateQos().getPriority().getInt());         updateMsgs.clear();         this.updateInterceptor.clear();      }      finally {         if (cbServer != null) {            try { cbServer.shutdown(); } catch (Exception e) { log.severe(e.toString()); };         }      }      log.info("Success in testPriorizedDispatchPluginConnectionState()");   }   /**    * Tears down the fixture.    * <p />    * cleaning up .... erase() the previous message OID and logout    */   protected void tearDown() {      try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time      con.disconnect(null);      con = null;            try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time      EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);      this.serverThread = null;      // reset to default server port (necessary if other tests follow in the same JVM).      Util.resetPorts(glob);      this.glob = null;           this.connectQos = null;      this.con = null;      this.updateInterceptor = null;      this.updateMsgs = null;      Global.instance().shutdown();   }   /**    * Method is used by TestRunner to load these tests    */   public static Test suite() {       TestSuite suite= new TestSuite();       suite.addTest(new TestPriorizedDispatchWithLostCallback(Global.instance(), "testPriorizedDispatchPluginConnectionState", "PriorizedDispatchPluginRecovery"));       return suite;   }   /**    * Invoke:     * <pre>    *  java org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback  -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST    *  java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback    * <pre>    */   public static void main(String args[]) {      Global glob = new Global();      if (glob.init(args) != 0) {         System.exit(0);      }      TestPriorizedDispatchWithLostCallback testSub = new TestPriorizedDispatchWithLostCallback(glob, "TestPriorizedDispatchWithLostCallback", "TestPriorizedDispatchWithLostCallback");      testSub.setUp();      testSub.testPriorizedDispatchPluginConnectionState();      testSub.tearDown();   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -