⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 eventplugintest.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
               String content = "This is test " + i;
               conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
            }
            
            int ret = msgInterceptor.waitOnUpdate(3000L, 1);
            assertEquals("We expected one message for the excess of the history queue", 1, ret);
            msgInterceptor.clear();
            for (int i=5; i < 8; i++) {
               String content = "This is test " + i;
               conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
            }
            ret = msgInterceptor.waitOnUpdate(3000L, 1);
            assertEquals("We expected one message", 1, ret);
            msgInterceptor.clear();
            conn.disconnect(new DisconnectQos(global));
         }

         {
            Global global = new Global(args);
            qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
            I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
            conn.connect(qos, this);
            Thread.sleep(1000L);
            conn.disconnect(new DisconnectQos(global));
         }
         conn2.disconnect(new DisconnectQos(secondGlobal));
      }
      catch (Exception ex) {
         ex.printStackTrace();
         fail(ex.getMessage());
      }
      finally {
         stopServer();
      }
   }

   /**
    * We start an embedded server where we define an EventPlugin to fire on two events:
    * <ul>
    *    <li>on all callback queues (all users) 70 % of the maximum has been reached (maximum is 10 Entries)</li>
    *    <li>on all topics when the history queue reaches 4</li>
    * </ul>
    * We then connect one failsafe client, make a subscription and leave the server (without logging out) to keep
    * the entries in the callback queue (and in the history queue).
    * <p/>
    * The second client subscribes to the configured events (this is the client which will get 
    * the events.
    * <p/>
    * A third client publishes 5 messages (which hit the subscription of the first client). 
    * Such messages fill the callback queue and the history queue.
    * This shall result in an event coming from the history queue. The callback queue shall not
    * fire since it has not been exceeded, however the second history queue, the one for the __sys__Event
    * shall fire since it has exceeded too, so two messages shall arrive.
    * <p/>
    */
   public void testQueueEventsWithWildcards() {
      try {
         String userName = "eventTester";
         String topicName = "eventTest";
         String sessionId = "1";
         String port = "7617";
         String eventTypes = "";
         eventTypes +="client/*/session/*/queue/callback/event/threshold.70%,";
         eventTypes +="topic/*/queue/history/event/threshold.4";
         writePluginsFile(port, eventTypes);
         startServer();
         String[] args = new String[] {
               "-dispatch/connection/plugin/socket/port", port,
               "-dispatch/connection/retries", "-1",
               "-dispatch/callback/retries", "-1"
               };
         {
            Global global = new Global(args);
            ConnectQos qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
            qos.getSessionCbQueueProperty().setMaxEntries(10L);
            qos.getSessionCbQueueProperty().setMaxEntriesCache(10L);
            I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
            conn.connect(qos, this);
            SubscribeKey subKey = new SubscribeKey(global, topicName);
            SubscribeQos subQos = new SubscribeQos(global);
            conn.subscribe(subKey, subQos);
            // conn.leaveServer(null);
            DisconnectQos disconnectQos = new DisconnectQos(global);
            disconnectQos.setLeaveServer(true);
            conn.disconnect(disconnectQos);
         }

         Global secondGlobal = new Global(args);
         MsgInterceptor msgInterceptor = new MsgInterceptor(secondGlobal, log, null);
         ConnectQos qos = new ConnectQos(secondGlobal, "tester/2", "secret");
         I_XmlBlasterAccess conn2 = secondGlobal.getXmlBlasterAccess();
         conn2.connect(qos, msgInterceptor);
         SubscribeKey subKey = new SubscribeKey(secondGlobal, "__sys__Event");
         SubscribeQos subQos = new SubscribeQos(secondGlobal);
         conn2.subscribe(subKey, subQos);
         msgInterceptor.clear();

         {
            // publish now
            Global global = new Global(args);
            qos = new ConnectQos(global, "testPublisher/1", "secret");
            I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
            conn.connect(qos, this);
            PublishKey pubKey = new PublishKey(global, topicName);
            PublishQos pubQos = new PublishQos(global);
            for (int i=0; i < 5; i++) {
               String content = "This is test " + i;
               conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
            }
            
            int ret = msgInterceptor.waitOnUpdate(3000L, 1);
            assertEquals("We expected one message for the excess of the history queue", 1, ret);
            msgInterceptor.clear();
            for (int i=5; i < 8; i++) {
               String content = "This is test " + i;
               conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
            }
            ret = msgInterceptor.waitOnUpdate(3000L, 2);
            assertEquals("We expected two messages: one for the excess of the callback queue and the other for the excess of the history queue of the __sys__Event topic", 2, ret);
            msgInterceptor.clear();
            conn.disconnect(new DisconnectQos(global));
         }

         {
            Global global = new Global(args);
            qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
            I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
            conn.connect(qos, this);
            Thread.sleep(1000L);
            conn.disconnect(new DisconnectQos(global));
         }
         conn2.disconnect(new DisconnectQos(secondGlobal));
      }
      catch (Exception ex) {
         ex.printStackTrace();
         fail(ex.getMessage());
      }
      finally {
         stopServer();
      }
   }

   /**
    * here come the updates for the test client.
    * @param cbSessionId
    * @param updateKey
    * @param content
    * @param updateQos
    * @return
    * @throws XmlBlasterException
    */
   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
      log.info(updateQos.toXml());
      return "OK";
   }
   
   
   public static void main(String[] args) {
      EventPluginTest test = new EventPluginTest("EventPluginTest");

      try {
         test.setUp();
         test.testRegex();
         test.tearDown();

         test.setUp();
         test.testQueueEventsWithWildcards();
         test.tearDown();
         
         test.setUp();
         test.testQueueEventsWithoutWildcards();
         test.tearDown();
         
      }
      catch (Exception ex) {
         ex.printStackTrace();
      }
   }


}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -