⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 teststreammessages.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*-----t-------------------------------------------------------------------------Name:      TestStreamMessages.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.test.client;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.Random;import java.util.logging.Logger;import javax.jms.DeliveryMode;import javax.jms.JMSException;import org.xmlBlaster.client.I_StreamingCallback;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.ConnectReturnQos;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.jms.XBDestination;import org.xmlBlaster.jms.XBMessageProducer;import org.xmlBlaster.jms.XBSession;import org.xmlBlaster.jms.XBStreamingMessage;import org.xmlBlaster.test.Msg;import org.xmlBlaster.test.MsgInterceptor;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.address.CallbackAddress;import junit.framework.TestCase;/** * <p> * This is an interesting example, since it creates a XmlBlaster server instance * in the same JVM , but in a separate thread, talking over CORBA with it. * <p> * Invoke examples:<br /> * <pre> *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages * </pre> * @see org.xmlBlaster.client.I_XmlBlasterAccess */public class TestStreamMessages extends TestCase implements I_StreamingCallback {   private static String ME = "TestStreamMessages";   private Global global;   private static Logger log = Logger.getLogger(TestStreamMessages.class.getName());   private Global connGlobal;   //private Global publisherGlobal;   private String oid = "testStreamMessages";   private MsgInterceptor updateInterceptor;   private byte[] msgContent;   private long delay = 5000000L;   private boolean ignoreException;      public TestStreamMessages() {      this(null);   }   public TestStreamMessages(Global global) {      super("TestStreamMessages");      this.global = global;      if (this.global == null) {         this.global = new Global();         this.global.init((String[])null);      }   }   /**    * Sets up the fixture.    * <p />    * Connect to xmlBlaster and login    */   protected void setUp() {      try {         this.connGlobal = this.global.getClone(null);         // this.publisherGlobal = this.global.getClone(null);         // this.publisherGlobal.getXmlBlasterAccess().connect(new ConnectQos(this.publisherGlobal, "one/2", "secret"), null);                  this.updateInterceptor = new MsgInterceptor(this.connGlobal, log, null, this);         boolean withQueue = true;         // we need failsafe behaviour to enable holdback messages on client update exceptions         ConnectQos connectQos = new ConnectQos(this.connGlobal, "streamingMsgTester/1", "secret");         connectQos.getAddress().setDelay(5000L);         connectQos.getAddress().setPingInterval(5000L);         connectQos.getAddress().setRetries(-1);         CallbackAddress cbAddr = new CallbackAddress(this.global);         cbAddr.setDelay(5000L);         cbAddr.setPingInterval(5000L);         cbAddr.setRetries(-1);         connectQos.addCallbackAddress(cbAddr);         XmlBlasterAccess access = (XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess();         ConnectReturnQos retQos = access.connect(connectQos, this.updateInterceptor, withQueue);         log.info("connect return qos: " + retQos.toXml());                  SubscribeQos subQos = new SubscribeQos(this.connGlobal);         subQos.setWantInitialUpdate(false);         subQos.setMultiSubscribe(false);         this.connGlobal.getXmlBlasterAccess().subscribe(new SubscribeKey(this.connGlobal, this.oid), subQos);      }      catch (XmlBlasterException ex) {         ex.printStackTrace();         fail("aborting since exception ex: " + ex.getMessage());      }   }         /**    * Tears down the fixture.    * <p />    * cleaning up .... erase() the previous message OID and logout    */   protected void tearDown() {      log.info("Entering tearDown(), test is finished");      try {         Thread.sleep(1000L); // since the cb could be too fast         this.connGlobal.getXmlBlasterAccess().unSubscribe(new UnSubscribeKey(this.connGlobal, this.oid), new UnSubscribeQos(this.connGlobal));         this.connGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.connGlobal));         this.connGlobal.shutdown();         this.connGlobal = null;         // this.publisherGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.publisherGlobal));         // this.publisherGlobal.shutdown();         // this.publisherGlobal = null;      }      catch (InterruptedException ex) {         ex.printStackTrace();      }      catch (XmlBlasterException ex) {         ex.printStackTrace();         fail("aborting since exception ex: " + ex.getMessage());      }   }   private final String getMemInfo() {      StringBuffer buf = new StringBuffer(256);      final int MEGA = 1024 * 1024;      buf.append("MEMORY: total='").append(Runtime.getRuntime().totalMemory()/MEGA).append("' ");      buf.append("max='").append(Runtime.getRuntime().maxMemory()/MEGA).append("' ");      buf.append("free='").append(Runtime.getRuntime().freeMemory()/MEGA).append("' MB");      return buf.toString();   }      public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException, IOException {            ClientProperty prop = updateQos.getClientProperty(Constants.addJmsPrefix("interrupted", log));      boolean doInterrupt = false;      if (prop != null)         doInterrupt = prop.getBooleanValue();      ByteArrayOutputStream baos = new ByteArrayOutputStream();      byte[] buf = new byte[300];      int count = 0;      String name = updateQos.getClientProperty("nameOfTest", "");      boolean isException = "testException".equals(name);      log.info("test '" + name + "' before reading: " + getMemInfo());      while(true) {         int ret = is.read(buf);         if (ret == -1 || doInterrupt)            break;         baos.write(buf, 0, ret);         count += ret;         if (isException && count > 600 && !ignoreException) { // it must pass the second time            this.ignoreException = true;            throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "fake exception to be hold back (dispatcher must go to false)", "fake");         }      }      log.info("test '" + name + "' before closing input stream: " + getMemInfo());      is.close();      log.info("test '" + name + "' after closing: " + getMemInfo());      this.msgContent = baos.toByteArray();      byte[] content = this.msgContent;      log.info("Receiving update of a message oid=" + updateKey.getOid() +                        " priority=" + updateQos.getPriority() +                        " state=" + updateQos.getState() +                        " contentSize=" + content.length);      this.updateInterceptor.setMsgContent(content);      return "OK";   }   private void doPublish(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws XmlBlasterException {      log.info("Publishing for '" + name + "'");      // Global glob = this.global.getClone(null);      Global glob = this.connGlobal;      I_XmlBlasterAccess conn = glob.getXmlBlasterAccess();      PublishKey key = new PublishKey(glob, this.oid);      PublishQos qos = new PublishQos(glob);      qos.setPersistent(true);      if (doInterrupt)         qos.addClientProperty("interrupted", true);      qos.addClientProperty("nameOfTest", name);      qos.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log), maxChunkSize);      ByteArrayInputStream bais = new ByteArrayInputStream(content);      conn.publishStream(bais, key.getData(), qos.getData(), maxChunkSize, null);   }      private void doPublishJMS(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws JMSException {      // Global glob = this.global.getClone(null);      // XBSession session = new XBSession(this.publisherGlobal, XBSession.AUTO_ACKNOWLEDGE, false);      log.info("Publishing for '" + name + "'");      XBSession session = new XBSession(this.connGlobal, XBSession.AUTO_ACKNOWLEDGE, false);      XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(this.oid, null));      producer.setDeliveryMode(DeliveryMode.PERSISTENT);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -