📄 testpersistentsession.java
字号:
this.updateInterceptors[num] = new MsgInterceptor(this.glob, log, null); // Collect received msgs this.updateInterceptors[num].setLogPrefix("interceptor-" + num); SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptors[num]); log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done"); assertTrue("returned null subscriptionId", subscriptionId != null); } catch(XmlBlasterException e) { log.warning("XmlBlasterException: " + e.getMessage()); assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false); } } /** * TEST: Construct a message and publish it. * <p /> */ public void doPublish(int counter) throws XmlBlasterException { String oid = "Message" + "-" + counter; log.info("Publishing a message " + oid + " ..."); String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" + " <TestPersistentSession-AGENT id='192.168.124.10' subId='1' type='generic'>" + " </TestPersistentSession-AGENT>" + "</key>"; String content = "" + counter; PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>" MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml()); this.glob.getXmlBlasterAccess().publish(msgUnit); log.info("Success: Publishing of " + oid + " done"); } /** * TEST: <br /> */ public void persistentSession(boolean doStop) { //doSubscribe(); -> see reachedAlive() log.info("Going to publish " + numPublish + " messages, xmlBlaster will be down for message 3 and 4"); // doSubscribe(0, this.exactSubscription, TRANSIENT); doSubscribe(1, this.exactSubscription, PERSISTENT); for (int i=0; i<numPublish; i++) { try { if (i == numStop) { // 3 if (doStop) { log.info("Stopping xmlBlaster, but continue with publishing ..."); EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread); this.serverThread = null; } else { log.info("changing run level but continue with publishing ..."); this.serverThread.changeRunlevel(0, true); } } if (i == numStart) { if (doStop) { log.info("Starting xmlBlaster again, expecting the previous published two messages ..."); // serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort); serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal); log.info("xmlBlaster started, waiting on tail back messsages"); } else { log.info("changing runlevel again to runlevel 9. Expecting the previous published two messages ..."); this.serverThread.changeRunlevel(9, true); log.info("xmlBlaster runlevel 9 reached, waiting on tail back messsages"); } // Message-4 We need to wait until the client reconnected (reconnect interval) // Message-5 assertEquals("", 2, this.updateInterceptors[1].waitOnUpdate(reconnectDelay*2L, 2)); assertEquals("", 2, this.updateInterceptors[3].waitOnUpdate(reconnectDelay*2L, 2)); for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear(); } doPublish(i+1); if (i == 0) { doSubscribe(2, this.exactSubscription, TRANSIENT); doSubscribe(3, this.exactSubscription, PERSISTENT); } if (i < numStop || i >= numStart ) { int n = 1; if (i == 0 && !this.initialUpdates) n = 0; assertEquals("Message nr. " + (i+1), 1, this.updateInterceptors[1].waitOnUpdate(4000L, 1)); assertEquals("Message nr. " + (i+1), n, this.updateInterceptors[3].waitOnUpdate(4000L, n)); } for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear(); } catch(XmlBlasterException e) { if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_POLLING) log.warning("Lost connection, my connection layer is polling: " + e.getMessage()); else if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_DEAD) assertTrue("Lost connection, my connection layer is NOT polling", false); else assertTrue("Publishing problems: " + e.getMessage(), false); } } doSubscribe(0, this.exactSubscription, TRANSIENT); doSubscribe(1, this.exactSubscription, PERSISTENT); } /** * This is the callback method invoked from I_XmlBlasterAccess * informing the client in an asynchronous mode if the connection was established. * <p /> * This method is enforced through interface I_ConnectionStateListener */ public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster"); // doSubscribe(); // initialize on startup and on reconnect } public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING); } public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD); } public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException { String contentStr = new String(content); String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr; log.info("Receiving update of a message oid=" + updateKey.getOid() + " priority=" + updateQos.getPriority() + " state=" + updateQos.getState() + " content=" + cont); log.info("further log for receiving update of a message cbSessionId=" + cbSessionId + updateKey.toXml() + "\n" + new String(content) + updateQos.toXml()); log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions)"); return "OK"; } public void testXPathInitialStop() { this.exactSubscription = false; this.initialUpdates = true; persistentSession(true); } public void testXPathNoInitialStop() { this.exactSubscription = false; this.initialUpdates = false; persistentSession(true); } public void testXPathInitialRunlevelChange() { this.persistent = true; this.exactSubscription = false; this.initialUpdates = true; persistentSession(false); } // ----------------------------------------------------------------- private Global createConnection(Global parentGlobal, String sessionName, boolean isPersistent, boolean expectEx) { try { Global ret = parentGlobal.getClone(null); I_XmlBlasterAccess con = ret.getXmlBlasterAccess(); // Find orb ConnectQos connectQos = new ConnectQos(glob); // == "<qos>...</qos>"; connectQos.setSessionName(new SessionName(ret, sessionName)); // set the persistent connection connectQos.setPersistent(isPersistent); // Setup fail save handling for connection ... Address addressProp = new Address(glob); addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec addressProp.setRetries(-1); // -1 == forever addressProp.setPingInterval(-1L); // switched off connectQos.setAddress(addressProp); // setup failsafe handling for callback ... if (this.failsafeCallback) { CallbackAddress cbAddress = new CallbackAddress(this.glob); cbAddress.setRetries(-1); cbAddress.setPingInterval(-1); cbAddress.setDelay(1000L); connectQos.addCallbackAddress(cbAddress); } con.connect(connectQos, this); // Login to xmlBlaster, register for updates if (expectEx) assertTrue("an exception was expected here because of overflow: Configuration of session queue probably not working", false); return ret; } catch (XmlBlasterException ex) { if (expectEx) log.info("createConnection: exception was OK since overflow was expected"); else assertTrue("an exception should not occur here", false); } return null; //to make compiler happy } /** * Tests the requirement: * - If the storage for the sessions is overflown, it should throw an exception * */ public void testOverflow() { // to change the configuration on server side (limit the queue sizes) tearDown(); setup(true); Global[] globals = new Global[5]; try { globals[0] = createConnection(this.origGlobal, "bjoern/1", true , false); globals[1] = createConnection(this.origGlobal, "fritz/2", false, false); globals[3] = createConnection(this.origGlobal, "dimitri/3", true , true); // <-- exception (since main connection also persistent) globals[2] = createConnection(this.origGlobal, "pandora/4", false , false); // OK since transient globals[4] = createConnection(this.origGlobal, "jonny/5", true, true); } finally { for (int i=0; i < globals.length; i++) { if (globals[i] != null) globals[i].getXmlBlasterAccess().disconnect(new DisconnectQos(globals[i])); } } } /** * Invoke: java org.xmlBlaster.test.client.TestPersistentSession * <p /> * @deprecated Use the TestRunner from the testsuite to run it:<p /> * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPersistentSession</pre> */ public static void main(String args[]) { Global glob = new Global(); if (glob.init(args) != 0) { System.out.println(ME + ": Init failed"); System.exit(1); } TestPersistentSession testSub = new TestPersistentSession(glob, "TestPersistentSession/1"); testSub.setUp(); testSub.testXPathInitialStop(); testSub.tearDown(); testSub.setUp(); testSub.testXPathNoInitialStop(); testSub.tearDown(); testSub.setUp(); testSub.testXPathInitialRunlevelChange(); testSub.tearDown(); testSub.setUp(); testSub.testOverflow(); testSub.tearDown(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -