📄 msginterceptor.java
字号:
/*------------------------------------------------------------------------------Name: MsgInterceptor.javaProject: org.xmlBlasterProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.test;import java.util.logging.Logger;import org.xmlBlaster.client.I_Callback;import org.xmlBlaster.client.I_StreamingCallback;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.client.qos.UpdateReturnQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.def.ErrorCode;import java.io.IOException;import java.io.InputStream;import java.lang.InterruptedException;import java.util.Map;import java.util.Vector;import junit.framework.Assert;import java.lang.ref.WeakReference;/** * Intercepts incoming message in update() and collects them in a Vector for nice handling. */public class MsgInterceptor extends Assert implements I_Callback, I_StreamingCallback, I_Update { private final WeakReference weakglob; private final WeakReference weaklog; private I_Callback testsuite; //private Msgs msgs = null; private int verbosity = 2; private boolean countErased; private I_StreamingCallback streamTestsuite; private I_Update contribTestsuite; private byte[] msgContent; /** * @param testsuite If != null your update() variant will be called as well */ public MsgInterceptor(Global glob, Logger log, I_Callback testsuite, I_StreamingCallback streamTestsuite) { this(glob, log, testsuite); this.streamTestsuite = streamTestsuite; } /** * @param testsuite If != null your update() variant will be called as well */ public MsgInterceptor(Global glob, Logger log, I_Callback testsuite, I_Update contribTestsuite) { this(glob, log, testsuite); this.contribTestsuite = contribTestsuite; } /** * @param testsuite If != null your update() variant will be called as well */ public MsgInterceptor(Global glob, Logger log, I_Callback testsuite) { this.weakglob = new WeakReference(glob); this.weaklog = new WeakReference(log); this.testsuite = testsuite; //this.msgs = new Msgs(); } public final Global getGlobal() { return (Global)this.weakglob.get(); } public final Logger getLog() { return (Logger)this.weaklog.get(); } public void setLogPrefix(String prefix) { } /** * 0: no logging * 1: simple logging * 2: dump messages on arrival */ public void setVerbosity(int val) { this.verbosity = val; } /* * Contains all update() messages in a Vector, but not erase events. public Msgs getMsgs() { return this.msgs; } */ /** * @param countErased Set to true to count the erased notifications as well */ public void countErased(boolean countErased) { this.countErased = countErased; } /** * This is the callback method (I_Callback) invoked from xmlBlaster * It directly calls the update method from the testsuite (delegation) */ public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException { String contentStr = new String(content); if (this.verbosity == 1) { String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr; getLog().info("Receiving update of a message oid=" + updateKey.getOid() + " priority=" + updateQos.getPriority() + " state=" + updateQos.getState() + " content=" + cont); } else if (this.verbosity == 2) { getLog().info("Receiving update #" + (count()+1) + " of a message cbSessionId=" + cbSessionId + updateKey.toXml() + "\n" + new String(content) + updateQos.toXml()); } if (this.countErased || !updateQos.isErased()) { add(new Msg(cbSessionId, updateKey, content, updateQos)); } if (testsuite != null) return testsuite.update(cbSessionId, updateKey, content, updateQos); else { UpdateReturnQos qos = new UpdateReturnQos(getGlobal()); return qos.toXml(); } } /** * This is the callback method (I_StreamingCallback) invoked from xmlBlaster * It directly calls the update method from the testsuite (delegation) */ public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException { String ret = null; if (this.streamTestsuite != null) { try { ret = this.streamTestsuite.update(cbSessionId, updateKey, is, updateQos); } catch (IOException ex) { throw new XmlBlasterException(Global.instance(), ErrorCode.INTERNAL_ILLEGALARGUMENT, "update", "update", ex); } } else { fail("The testsuite instance has not been defined"); } if (this.countErased || !updateQos.isErased()) { add(new Msg(cbSessionId, updateKey, msgContent, updateQos)); } return ret; } /** * This is the callback method (I_StreamingCallback) invoked from xmlBlaster * It directly calls the update method from the testsuite (delegation) */ public void update(String topic, InputStream is, Map attrMap) throws Exception { if (this.contribTestsuite != null) { try { this.contribTestsuite.update(topic, is, attrMap); } catch (IOException ex) { throw new XmlBlasterException(Global.instance(), ErrorCode.INTERNAL_ILLEGALARGUMENT, "update", "update", ex); } } add(new Msg(null, null, new byte[0], null)); } /** * @see #waitOnUpdate(long, String, String, int) */ public int waitOnUpdate(final long timeout, int countExpected) { return waitOnUpdate(timeout, null, null, countExpected); } /** * Waits until the given number of messages arrived, * the messages must match the given oid and state. * It is not checked if more messages would arrive as we return after * countExpected are here. * <p> * ERASE notifies are not returned * </p> * <p> * This method does not assert() it return the number of messages arrived * which you can use to assert yourself. * </p> * @param timeout in milliseconds * @param oid The expected message oid, if null the oid is not checked (all oids are OK) * @param state The expected state, if null the state is not checked (all states are OK) * * @return Number of messages arrived */ public int waitOnUpdate(final long timeout, String oid, String state, int countExpected) { long pollingInterval = 50L; // check every 0.05 seconds if (timeout < 50) pollingInterval = timeout / 10L; long sum = 0L; int countArrived = 0; while (true) { countArrived = getMsgs(oid, state).length; if (countArrived >= countExpected) return countArrived; // OK, no timeout try { Thread.sleep(pollingInterval); } catch( InterruptedException i) {} sum += pollingInterval; if (sum > timeout) { getLog().severe("timeout=" + timeout + " occurred for " + oid + " state=" + state + " countExpected=" + countExpected + " countArrived=" + countArrived); return countArrived; // Timeout occurred } } } /** * Sleeps until timeout and returns the arrived messages. * <p> * ERASE notifies are not returned * </p> * @see #waitOnUpdate(long, String, String) */ public int waitOnUpdate(final long timeout) { return waitOnUpdate(timeout, null, null); } /** * Sleeps until timeout and returns the number of arrived messages filtered by oid and state. * <p> * ERASE notifies are not returned * </p> * @param timeout in milliseconds * @param oid The expected message oid, if null the oid is not checked (all oids are OK) * @param state The expected state, if null the state is not checked (all states are OK) * * @return Number of messages arrived */ public int waitOnUpdate(final long timeout, String oid, String state) { try { Thread.sleep(timeout); } catch( InterruptedException i) {} return getMsgs(oid, state).length; } // Holding all messages private Vector updateVec = new Vector(); public void add(Msg msg) { this.updateVec.addElement(msg); } public void remove(Msg msg) { this.updateVec.removeElement(msg); } /** * Clears all arrived messages AND the countErased flag to false */ public void clear() { this.updateVec.clear(); this.countErased = false; } /** * Access the updated message filtered by the given oid and state. * @param oid if null the oid is not checked * @param state if null the state is not checked */ public Msg[] getMsgs(String oid, String state) { Vector ret = new Vector(); for (int i=0; i<this.updateVec.size(); i++) { Msg msg = (Msg)this.updateVec.elementAt(i); //System.out.println("MsgInterceptor: Checking msg oid='" + msg.getOid() + "' with state='" + msg.getState() + "' against '" + oid + "' '" + state + "'"); if ( (oid == null || oid.equals(msg.getOid())) && (state == null || state.equals(msg.getState())) ) { ret.addElement(msg); //System.out.println("MsgInterceptor: FOUND: Checking msg oid='" + msg.getOid() + "' with state='" + msg.getState() + "' against '" + oid + "' '" + state + "'"); } } return (Msg[])ret.toArray(new Msg[ret.size()]); } public Msg[] getMsgs() { return getMsgs(null, null); } /** * Access the updated message filtered by the given oid and state. * @return null or the message * @exception If more than one message is available */ public Msg getMsg(String oid, String state) throws XmlBlasterException { Msg[] msgs = getMsgs(oid, state); //System.out.println("MsgInterceptor: FOUND " + msgs.length + " entries for msg oid='" + oid + "' with state='" + state); if (msgs.length > 1) throw new XmlBlasterException("Msgs", "update(oid=" + oid + ", state=" + state + ") " + msgs.length + " arrived instead of zero or one"); if (msgs.length == 0) return null; return msgs[0]; } public int count() { return this.updateVec.size(); } /** * Compares all messages given by parameter 'expectedArr' and compare * them with the received ones. On failure a junit - assert() is thrown. * <p> * The correct sequence and the message data is checked. * </p> * @param expectedArr The published messages which we expect here as updates * @param secretCbSessionId If not null it is checked as well */ public void compareToReceived(MsgUnit[] expectedArr, String secretCbSessionId) { assertEquals("We have received " + count() + " messages only", expectedArr.length, count()); for(int i=0; i<expectedArr.length; i++) { MsgUnit expected = expectedArr[i]; Msg msg = (Msg)this.updateVec.elementAt(i); if (secretCbSessionId != null) { assertEquals("The secretCbSessionId is wrong", secretCbSessionId, msg.getCbSessionId()); } msg.compareMsg(expected); } } /** * Compares all messages given by parameter 'expectedArr' and compare * them with the received ones. On failure a junit - assert() is thrown. * <p> * Especially the sequence and the rcvTimestamp is checked. * </p> * @param expectedArr The published messages which we expect here as updates */ public void compareToReceived(PublishReturnQos[] expectedArr) { assertEquals("We have received " + count() + " messages only", expectedArr.length, count()); for(int i=0; i<expectedArr.length; i++) { Msg msg = (Msg)this.updateVec.elementAt(i); msg.compareMsg(expectedArr[i]); } } public void setMsgContent(byte[] msgContent) { this.msgContent = msgContent; } } // MsgInterceptor
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -