⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testpersistentsession.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
         this.updateInterceptors[num] = new MsgInterceptor(this.glob, log, null); // Collect received msgs         this.updateInterceptors[num].setLogPrefix("interceptor-" + num);         SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptors[num]);         log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");         assertTrue("returned null subscriptionId", subscriptionId != null);      } catch(XmlBlasterException e) {         log.warning("XmlBlasterException: " + e.getMessage());         assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);      }   }    /**    * TEST: Construct a message and publish it.    * <p />    */   public void doPublish(int counter) throws XmlBlasterException {      String oid = "Message" + "-" + counter;      log.info("Publishing a message " + oid + " ...");      String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +                      "   <TestPersistentSession-AGENT id='192.168.124.10' subId='1' type='generic'>" +                      "   </TestPersistentSession-AGENT>" +                      "</key>";      String content = "" + counter;      PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"      MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());      this.glob.getXmlBlasterAccess().publish(msgUnit);      log.info("Success: Publishing of " + oid + " done");   }   /**    * TEST: <br />    */   public void persistentSession(boolean doStop) {      //doSubscribe(); -> see reachedAlive()      log.info("Going to publish " + numPublish + " messages, xmlBlaster will be down for message 3 and 4");      //       doSubscribe(0, this.exactSubscription, TRANSIENT);      doSubscribe(1, this.exactSubscription, PERSISTENT);            for (int i=0; i<numPublish; i++) {         try {            if (i == numStop) { // 3               if (doStop) {                  log.info("Stopping xmlBlaster, but continue with publishing ...");                  EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);                  this.serverThread = null;               }               else {                  log.info("changing run level but continue with publishing ...");                  this.serverThread.changeRunlevel(0, true);               }            }            if (i == numStart) {               if (doStop) {                  log.info("Starting xmlBlaster again, expecting the previous published two messages ...");                  // serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);                  serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);                  log.info("xmlBlaster started, waiting on tail back messsages");               }               else {                  log.info("changing runlevel again to runlevel 9. Expecting the previous published two messages ...");                  this.serverThread.changeRunlevel(9, true);                  log.info("xmlBlaster runlevel 9 reached, waiting on tail back messsages");               }                              // Message-4 We need to wait until the client reconnected (reconnect interval)               // Message-5               assertEquals("", 2, this.updateInterceptors[1].waitOnUpdate(reconnectDelay*2L, 2));               assertEquals("", 2, this.updateInterceptors[3].waitOnUpdate(reconnectDelay*2L, 2));                              for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();            }            doPublish(i+1);            if (i == 0) {               doSubscribe(2, this.exactSubscription, TRANSIENT);               doSubscribe(3, this.exactSubscription, PERSISTENT);            }            if (i < numStop || i >= numStart ) {               int n = 1;               if (i == 0 && !this.initialUpdates) n = 0;               assertEquals("Message nr. " + (i+1), 1, this.updateInterceptors[1].waitOnUpdate(4000L, 1));               assertEquals("Message nr. " + (i+1), n, this.updateInterceptors[3].waitOnUpdate(4000L, n));            }            for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();         }         catch(XmlBlasterException e) {            if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_POLLING)               log.warning("Lost connection, my connection layer is polling: " + e.getMessage());            else if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_DEAD)               assertTrue("Lost connection, my connection layer is NOT polling", false);            else               assertTrue("Publishing problems: " + e.getMessage(), false);         }      }      doSubscribe(0, this.exactSubscription, TRANSIENT);      doSubscribe(1, this.exactSubscription, PERSISTENT);   }   /**    * This is the callback method invoked from I_XmlBlasterAccess    * informing the client in an asynchronous mode if the connection was established.    * <p />    * This method is enforced through interface I_ConnectionStateListener    */   public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {      log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");      // doSubscribe();    // initialize on startup and on reconnect   }   public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {      log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);   }   public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {      log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);   }   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {      String contentStr = new String(content);      String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;      log.info("Receiving update of a message oid=" + updateKey.getOid() +                        " priority=" + updateQos.getPriority() +                        " state=" + updateQos.getState() +                        " content=" + cont);      log.info("further log for receiving update of a message cbSessionId=" + cbSessionId +                     updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());      log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions)");      return "OK";   }   public void testXPathInitialStop() {      this.exactSubscription = false;      this.initialUpdates = true;      persistentSession(true);   }   public void testXPathNoInitialStop() {      this.exactSubscription = false;      this.initialUpdates = false;      persistentSession(true);   }   public void testXPathInitialRunlevelChange() {      this.persistent = true;      this.exactSubscription = false;      this.initialUpdates = true;      persistentSession(false);   }   // -----------------------------------------------------------------   private Global createConnection(Global parentGlobal, String sessionName, boolean isPersistent, boolean expectEx) {      try {         Global ret = parentGlobal.getClone(null);         I_XmlBlasterAccess con = ret.getXmlBlasterAccess(); // Find orb         ConnectQos connectQos = new ConnectQos(glob); // == "<qos>...</qos>";         connectQos.setSessionName(new SessionName(ret, sessionName));         // set the persistent connection          connectQos.setPersistent(isPersistent);         // Setup fail save handling for connection ...         Address addressProp = new Address(glob);         addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec         addressProp.setRetries(-1);       // -1 == forever         addressProp.setPingInterval(-1L); // switched off         connectQos.setAddress(addressProp);               // setup failsafe handling for callback ...         if (this.failsafeCallback) {            CallbackAddress cbAddress = new CallbackAddress(this.glob);            cbAddress.setRetries(-1);            cbAddress.setPingInterval(-1);            cbAddress.setDelay(1000L);            connectQos.addCallbackAddress(cbAddress);         }         con.connect(connectQos, this);  // Login to xmlBlaster, register for updates         if (expectEx) assertTrue("an exception was expected here because of overflow: Configuration of session queue probably not working", false);         return ret;      }      catch (XmlBlasterException ex) {         if (expectEx) log.info("createConnection: exception was OK since overflow was expected");         else assertTrue("an exception should not occur here", false);      }      return null; //to make compiler happy   }            /**    * Tests the requirement:    * - If the storage for the sessions is overflown, it should throw an exception    *    */   public void testOverflow() {      // to change the configuration on server side (limit the queue sizes)      tearDown();      setup(true);      Global[] globals = new Global[5];      try {         globals[0] = createConnection(this.origGlobal, "bjoern/1", true , false);         globals[1] = createConnection(this.origGlobal, "fritz/2", false, false);         globals[3] = createConnection(this.origGlobal, "dimitri/3", true , true); // <-- exception (since main connection also persistent)         globals[2] = createConnection(this.origGlobal, "pandora/4", false , false); // OK since transient         globals[4] = createConnection(this.origGlobal, "jonny/5", true, true);      }      finally {         for (int i=0; i < globals.length; i++) {            if (globals[i] != null) globals[i].getXmlBlasterAccess().disconnect(new DisconnectQos(globals[i]));         }      }   }   /**    * Invoke: java org.xmlBlaster.test.client.TestPersistentSession    * <p />    * @deprecated Use the TestRunner from the testsuite to run it:<p />    * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPersistentSession</pre>    */   public static void main(String args[])   {      Global glob = new Global();      if (glob.init(args) != 0) {         System.out.println(ME + ": Init failed");         System.exit(1);      }      TestPersistentSession testSub = new TestPersistentSession(glob, "TestPersistentSession/1");      testSub.setUp();      testSub.testXPathInitialStop();      testSub.tearDown();      testSub.setUp();      testSub.testXPathNoInitialStop();      testSub.tearDown();      testSub.setUp();      testSub.testXPathInitialRunlevelChange();      testSub.tearDown();      testSub.setUp();      testSub.testOverflow();      testSub.tearDown();   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -