📄 teststreammessages.java
字号:
/*-----t-------------------------------------------------------------------------Name: TestStreamMessages.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.test.client;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.Random;import java.util.logging.Logger;import javax.jms.DeliveryMode;import javax.jms.JMSException;import org.xmlBlaster.client.I_StreamingCallback;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.ConnectReturnQos;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.jms.XBDestination;import org.xmlBlaster.jms.XBMessageProducer;import org.xmlBlaster.jms.XBSession;import org.xmlBlaster.jms.XBStreamingMessage;import org.xmlBlaster.test.Msg;import org.xmlBlaster.test.MsgInterceptor;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.address.CallbackAddress;import junit.framework.TestCase;/** * <p> * This is an interesting example, since it creates a XmlBlaster server instance * in the same JVM , but in a separate thread, talking over CORBA with it. * <p> * Invoke examples:<br /> * <pre> * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages * </pre> * @see org.xmlBlaster.client.I_XmlBlasterAccess */public class TestStreamMessages extends TestCase implements I_StreamingCallback { private static String ME = "TestStreamMessages"; private Global global; private static Logger log = Logger.getLogger(TestStreamMessages.class.getName()); private Global connGlobal; //private Global publisherGlobal; private String oid = "testStreamMessages"; private MsgInterceptor updateInterceptor; private byte[] msgContent; private long delay = 5000000L; private boolean ignoreException; public TestStreamMessages() { this(null); } public TestStreamMessages(Global global) { super("TestStreamMessages"); this.global = global; if (this.global == null) { this.global = new Global(); this.global.init((String[])null); } } /** * Sets up the fixture. * <p /> * Connect to xmlBlaster and login */ protected void setUp() { try { this.connGlobal = this.global.getClone(null); // this.publisherGlobal = this.global.getClone(null); // this.publisherGlobal.getXmlBlasterAccess().connect(new ConnectQos(this.publisherGlobal, "one/2", "secret"), null); this.updateInterceptor = new MsgInterceptor(this.connGlobal, log, null, this); boolean withQueue = true; // we need failsafe behaviour to enable holdback messages on client update exceptions ConnectQos connectQos = new ConnectQos(this.connGlobal, "streamingMsgTester/1", "secret"); connectQos.getAddress().setDelay(5000L); connectQos.getAddress().setPingInterval(5000L); connectQos.getAddress().setRetries(-1); CallbackAddress cbAddr = new CallbackAddress(this.global); cbAddr.setDelay(5000L); cbAddr.setPingInterval(5000L); cbAddr.setRetries(-1); connectQos.addCallbackAddress(cbAddr); XmlBlasterAccess access = (XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess(); ConnectReturnQos retQos = access.connect(connectQos, this.updateInterceptor, withQueue); log.info("connect return qos: " + retQos.toXml()); SubscribeQos subQos = new SubscribeQos(this.connGlobal); subQos.setWantInitialUpdate(false); subQos.setMultiSubscribe(false); this.connGlobal.getXmlBlasterAccess().subscribe(new SubscribeKey(this.connGlobal, this.oid), subQos); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail("aborting since exception ex: " + ex.getMessage()); } } /** * Tears down the fixture. * <p /> * cleaning up .... erase() the previous message OID and logout */ protected void tearDown() { log.info("Entering tearDown(), test is finished"); try { Thread.sleep(1000L); // since the cb could be too fast this.connGlobal.getXmlBlasterAccess().unSubscribe(new UnSubscribeKey(this.connGlobal, this.oid), new UnSubscribeQos(this.connGlobal)); this.connGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.connGlobal)); this.connGlobal.shutdown(); this.connGlobal = null; // this.publisherGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.publisherGlobal)); // this.publisherGlobal.shutdown(); // this.publisherGlobal = null; } catch (InterruptedException ex) { ex.printStackTrace(); } catch (XmlBlasterException ex) { ex.printStackTrace(); fail("aborting since exception ex: " + ex.getMessage()); } } private final String getMemInfo() { StringBuffer buf = new StringBuffer(256); final int MEGA = 1024 * 1024; buf.append("MEMORY: total='").append(Runtime.getRuntime().totalMemory()/MEGA).append("' "); buf.append("max='").append(Runtime.getRuntime().maxMemory()/MEGA).append("' "); buf.append("free='").append(Runtime.getRuntime().freeMemory()/MEGA).append("' MB"); return buf.toString(); } public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException, IOException { ClientProperty prop = updateQos.getClientProperty(Constants.addJmsPrefix("interrupted", log)); boolean doInterrupt = false; if (prop != null) doInterrupt = prop.getBooleanValue(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buf = new byte[300]; int count = 0; String name = updateQos.getClientProperty("nameOfTest", ""); boolean isException = "testException".equals(name); log.info("test '" + name + "' before reading: " + getMemInfo()); while(true) { int ret = is.read(buf); if (ret == -1 || doInterrupt) break; baos.write(buf, 0, ret); count += ret; if (isException && count > 600 && !ignoreException) { // it must pass the second time this.ignoreException = true; throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "fake exception to be hold back (dispatcher must go to false)", "fake"); } } log.info("test '" + name + "' before closing input stream: " + getMemInfo()); is.close(); log.info("test '" + name + "' after closing: " + getMemInfo()); this.msgContent = baos.toByteArray(); byte[] content = this.msgContent; log.info("Receiving update of a message oid=" + updateKey.getOid() + " priority=" + updateQos.getPriority() + " state=" + updateQos.getState() + " contentSize=" + content.length); this.updateInterceptor.setMsgContent(content); return "OK"; } private void doPublish(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws XmlBlasterException { log.info("Publishing for '" + name + "'"); // Global glob = this.global.getClone(null); Global glob = this.connGlobal; I_XmlBlasterAccess conn = glob.getXmlBlasterAccess(); PublishKey key = new PublishKey(glob, this.oid); PublishQos qos = new PublishQos(glob); qos.setPersistent(true); if (doInterrupt) qos.addClientProperty("interrupted", true); qos.addClientProperty("nameOfTest", name); qos.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log), maxChunkSize); ByteArrayInputStream bais = new ByteArrayInputStream(content); conn.publishStream(bais, key.getData(), qos.getData(), maxChunkSize, null); } private void doPublishJMS(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws JMSException { // Global glob = this.global.getClone(null); // XBSession session = new XBSession(this.publisherGlobal, XBSession.AUTO_ACKNOWLEDGE, false); log.info("Publishing for '" + name + "'"); XBSession session = new XBSession(this.connGlobal, XBSession.AUTO_ACKNOWLEDGE, false); XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(this.oid, null)); producer.setDeliveryMode(DeliveryMode.PERSISTENT);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -