📄 eventplugintest.java
字号:
String content = "This is test " + i;
conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
}
int ret = msgInterceptor.waitOnUpdate(3000L, 1);
assertEquals("We expected one message for the excess of the history queue", 1, ret);
msgInterceptor.clear();
for (int i=5; i < 8; i++) {
String content = "This is test " + i;
conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
}
ret = msgInterceptor.waitOnUpdate(3000L, 1);
assertEquals("We expected one message", 1, ret);
msgInterceptor.clear();
conn.disconnect(new DisconnectQos(global));
}
{
Global global = new Global(args);
qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
conn.connect(qos, this);
Thread.sleep(1000L);
conn.disconnect(new DisconnectQos(global));
}
conn2.disconnect(new DisconnectQos(secondGlobal));
}
catch (Exception ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
finally {
stopServer();
}
}
/**
* We start an embedded server where we define an EventPlugin to fire on two events:
* <ul>
* <li>on all callback queues (all users) 70 % of the maximum has been reached (maximum is 10 Entries)</li>
* <li>on all topics when the history queue reaches 4</li>
* </ul>
* We then connect one failsafe client, make a subscription and leave the server (without logging out) to keep
* the entries in the callback queue (and in the history queue).
* <p/>
* The second client subscribes to the configured events (this is the client which will get
* the events.
* <p/>
* A third client publishes 5 messages (which hit the subscription of the first client).
* Such messages fill the callback queue and the history queue.
* This shall result in an event coming from the history queue. The callback queue shall not
* fire since it has not been exceeded, however the second history queue, the one for the __sys__Event
* shall fire since it has exceeded too, so two messages shall arrive.
* <p/>
*/
public void testQueueEventsWithWildcards() {
try {
String userName = "eventTester";
String topicName = "eventTest";
String sessionId = "1";
String port = "7617";
String eventTypes = "";
eventTypes +="client/*/session/*/queue/callback/event/threshold.70%,";
eventTypes +="topic/*/queue/history/event/threshold.4";
writePluginsFile(port, eventTypes);
startServer();
String[] args = new String[] {
"-dispatch/connection/plugin/socket/port", port,
"-dispatch/connection/retries", "-1",
"-dispatch/callback/retries", "-1"
};
{
Global global = new Global(args);
ConnectQos qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
qos.getSessionCbQueueProperty().setMaxEntries(10L);
qos.getSessionCbQueueProperty().setMaxEntriesCache(10L);
I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
conn.connect(qos, this);
SubscribeKey subKey = new SubscribeKey(global, topicName);
SubscribeQos subQos = new SubscribeQos(global);
conn.subscribe(subKey, subQos);
// conn.leaveServer(null);
DisconnectQos disconnectQos = new DisconnectQos(global);
disconnectQos.setLeaveServer(true);
conn.disconnect(disconnectQos);
}
Global secondGlobal = new Global(args);
MsgInterceptor msgInterceptor = new MsgInterceptor(secondGlobal, log, null);
ConnectQos qos = new ConnectQos(secondGlobal, "tester/2", "secret");
I_XmlBlasterAccess conn2 = secondGlobal.getXmlBlasterAccess();
conn2.connect(qos, msgInterceptor);
SubscribeKey subKey = new SubscribeKey(secondGlobal, "__sys__Event");
SubscribeQos subQos = new SubscribeQos(secondGlobal);
conn2.subscribe(subKey, subQos);
msgInterceptor.clear();
{
// publish now
Global global = new Global(args);
qos = new ConnectQos(global, "testPublisher/1", "secret");
I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
conn.connect(qos, this);
PublishKey pubKey = new PublishKey(global, topicName);
PublishQos pubQos = new PublishQos(global);
for (int i=0; i < 5; i++) {
String content = "This is test " + i;
conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
}
int ret = msgInterceptor.waitOnUpdate(3000L, 1);
assertEquals("We expected one message for the excess of the history queue", 1, ret);
msgInterceptor.clear();
for (int i=5; i < 8; i++) {
String content = "This is test " + i;
conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
}
ret = msgInterceptor.waitOnUpdate(3000L, 2);
assertEquals("We expected two messages: one for the excess of the callback queue and the other for the excess of the history queue of the __sys__Event topic", 2, ret);
msgInterceptor.clear();
conn.disconnect(new DisconnectQos(global));
}
{
Global global = new Global(args);
qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
conn.connect(qos, this);
Thread.sleep(1000L);
conn.disconnect(new DisconnectQos(global));
}
conn2.disconnect(new DisconnectQos(secondGlobal));
}
catch (Exception ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
finally {
stopServer();
}
}
/**
* here come the updates for the test client.
* @param cbSessionId
* @param updateKey
* @param content
* @param updateQos
* @return
* @throws XmlBlasterException
*/
public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
log.info(updateQos.toXml());
return "OK";
}
public static void main(String[] args) {
EventPluginTest test = new EventPluginTest("EventPluginTest");
try {
test.setUp();
test.testRegex();
test.tearDown();
test.setUp();
test.testQueueEventsWithWildcards();
test.tearDown();
test.setUp();
test.testQueueEventsWithoutWildcards();
test.tearDown();
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -