⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 massivesubtest.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
             assertTrue("numSubscribers not divadable by breakpoint", false);            }                        manyConnections = new I_XmlBlasterAccess[numSubscribers/maxSubPerCon];         } // end of if ()                           long usedBefore = getUsedServerMemory();                  log.info("Setting up " + numSubscribers + " subscriber clients ...");         int startNoThreads = ThreadLister.countThreads();         //ThreadLister.listAllThreads(System.out);         stopWatch = new StopWatch();         for (int ii=0; ii<numSubscribers; ii++) {            Client sub = new Client();            sub.loginName = "Joe-" + ii;            sub.oneConnection = useOneConnection;            if (useOneConnection) {               // Should we distribute among a few connections               if (maxSubPerCon >0) {                  if (  ii % maxSubPerCon == 0) {                     ci++;                     try {                        log.fine("Creating connection no: " +ci);                        Global gg = globalUtil.getClone(glob);                        // Try to reuse the same ORB to avoid too many threads:                        if ("IOR".equals(gg.getProperty().get("protocol","IOR")) && ci > 0) {                           gg.addObjectEntry(Constants.RELATING_CLIENT+":org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper",                                             (org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper)manyConnections[ci-1].getGlobal().getObjectEntry(Constants.RELATING_CLIENT+":org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper"));                        }                        manyConnections[ci] = gg.getXmlBlasterAccess();                        ConnectQos connectQos = new ConnectQos(gg, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)                        // If we have many subs on one con, we must raise the max size of the callback queue!                        CbQueueProperty cbProp =connectQos.getSessionCbQueueProperty();                        // algo is maxSubPerCon*4                        cbProp.setMaxEntries(maxSubPerCon*1000);//This means we have a backlog of 1000 messages per subscriber as i normal when each con only have one subscriber!                        //cbProp.setMaxBytes(4000);                        //cbProp.setOnOverflow(Constants.ONOVERFLOW_BLOCK);                        //connectQos.setSubjectQueueProperty(cbProp);                        log.fine("Login qos: " +  connectQos.toXml());                        ConnectReturnQos connectReturnQos = manyConnections[ci].connect(connectQos, this);                        log.info("Connected maxSubPerCon=" + maxSubPerCon + " : " + connectReturnQos.toXml());                     }                     catch (Exception e) {                        log.severe("Login failed: " + e.toString());                        assertTrue("Login failed: " + e.toString(), false);                     }                                       } // end of if ()                  sub.connection = manyConnections[ci];               } else {                  sub.connection = oneConnection;               }            }else {               try {                  Global gg = globalUtil.getClone(glob);                  sub.connection = gg.getXmlBlasterAccess();                  ConnectQos connectQos = new ConnectQos(gg, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)                  ConnectReturnQos connectReturnQos = sub.connection.connect(connectQos, this);                  log.info("Connected: " + connectReturnQos.toXml());               }               catch (Exception e) {                  log.severe("Login failed: " + e.toString());                  assertTrue("Login failed: " + e.toString(), false);               }                                                                    }            try {            sub.subscribeOid = sub.connection.subscribe(subKey, subQos).getSubscriptionId();            log.fine("Client " + sub.loginName + " subscribed to " + subKeyW.getOid());            } catch(XmlBlasterException e) {               log.warning("XmlBlasterException: " + e.getMessage());               assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);            }                        manyClients[ii] = sub;         }         double timeForLogins = (double)stopWatch.elapsed()/1000.; // msec -> sec                  long usedAfter = getUsedServerMemory();         long memPerLogin = (usedAfter - usedBefore)/numSubscribers;         int noThreads = ThreadLister.countThreads();         int tDiff = noThreads - startNoThreads;         int tPerConn = ((ci == 0|| ci == -1) ? tDiff :tDiff/(ci+1));         int subPerT = tDiff != 0 ? numSubscribers/tDiff:0;                  log.info(numSubscribers + " subscriber clients are ready.");         log.info("Server memory per login consumed=" + memPerLogin);         log.info("Time " + (long)(numSubscribers/timeForLogins) + " logins/sec");         log.info("Threads created " + tDiff + ", threads per connection " + tPerConn + ", sub  per thread " + subPerT);         //ThreadLister.listAllThreads(System.out);         //try { Thread.sleep(5000000L); } catch( InterruptedException i) {}               } catch (Error e) {         e.printStackTrace();         log.severe("Could not set up subscribers: " +e);         log.severe("No of threads " + ThreadLister.countThreads() + " for connection no " + ci);         throw e;      } // end of try-catch         }      /**    * Query xmlBlaster for its current memory consumption.     */   long getUsedServerMemory() {      String xmlKey = "<key oid='__cmd:?usedMem' queryType='EXACT'></key>";      String qos = "<qos></qos>";      try {         MsgUnit[] msgArr = oneConnection.get(xmlKey, qos);         String mem = new String(msgArr[0].getContent());         return new Long(mem).longValue();      } catch (XmlBlasterException e) {         log.warning(e.toString());         return 0L;      }   }   /**    * TEST: Publish numToPub messages..    * <p />    * The returned publishOid1 is checked    */   public void publish()   {      if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");            numReceived = 0;      String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +                      "<key oid='" + publishOid1 + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +         "</key>";      String senderContent = "Yeahh, i'm the new content";      try {         stopWatch = new StopWatch();         for (int i = 0; i < noToPub;i++) {            senderContent = senderContent+"-"+i;            MsgUnit msgUnit = new MsgUnit(xmlKey, senderContent.getBytes(), "<qos></qos>");            String tmp = oneConnection.publish(msgUnit).getKeyOid();            assertEquals("Wrong publishOid1", publishOid1, tmp);            log.info("Success: Publishing done for " + i +", returned oid=" + publishOid1);         }      } catch(XmlBlasterException e) {         log.warning("XmlBlasterException: " + e.getMessage());         assertTrue("publishOne - XmlBlasterException: " + e.getMessage(), false);      }   }   /**    * This is the callback method invoked from xmlBlaster    * delivering us a new asynchronous message.     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)    */   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)   {      //log.info("Client " + loginName + " receiving update of message oid=" + updateKey.getOid() + "...");      numReceived++;      if (numReceived == numToRec) {         long avg = 0;         double elapsed = stopWatch.elapsed();         if (elapsed > 0.)            avg = (long)(1000.0 * numReceived / elapsed);         log.info(numReceived + " messages updated, average messages/second = " + avg + stopWatch.nice());      }      return "";   }      /**    * TEST: Construct a message and publish it,    * all clients should receive an update.     */   public void testManyClients()   {      System.out.println("");      log.info("TEST 1, many clients, useOneConnection="+useOneConnection);            subcribeMany();      try { Thread.sleep(1000L); } catch( InterruptedException i) {}                                            // Wait some time for callback to arrive ...      assertEquals("numReceived after subscribe", 0, numReceived);  // there should be no Callback            publish();      long delay = 2000L + 10 * numToRec;      log.info("Waiting long enough for updates ..."+delay);      Util.delay(delay);                          // Wait some time for callback to arrive ...      // !!!! this.updateInterceptor.      if ( numReceived != numToRec ){         // Warn and wait some more         log.warning("Have not yet received more than " +numReceived+"/"+numToRec+" waiting some more");         int midRec=numReceived;         long avg = 0;         double elapsed = stopWatch.elapsed();         if (elapsed > 0.)            avg = (long)(1000.0 * numReceived / elapsed);         log.info(numReceived + " messages updated, average firts round messages/second = " + avg + stopWatch.nice(false));//Don't reset         Util.delay(2L*delay);          //Lastt delay         if ( numReceived != numToRec ){         // Warn and wait some more         log.warning("Have NOT yet received more than " +numReceived+"/"+numToRec+" waiting last round");         avg = 0;         elapsed = stopWatch.elapsed()-elapsed;         if (elapsed > 0.)            avg = (long)(1000.0 *( numReceived -midRec)/ elapsed);         log.info(numReceived-midRec + " messages updated this round, average second round messages/second = " + avg + stopWatch.nice(false));//Don't reset         Util.delay(4L*delay);       }               }      log.info("Got messages:" +numReceived+"/"+numToRec);      assertEquals("Wrong number of updates", numToRec, numReceived);         }      /**    * Method is used by TestRunner to load these tests.    *    * <p>Warning! The default uses the embedded server, to give each round a equal chance. But it is MUCH slower than using a server in another VM.</p>    */   public static Test suite()   {      TestSuite suite= new TestSuite();      String loginName = "Tim";      Global glob = Global.instance();      // Test IOR many on one      setProtoMax(glob,"IOR","0");      suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));      // Test IOR many on few      setProtoMax(glob,"IOR","500");      suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));      // Test RMI many on one      setProtoMax(glob,"RMI","0");      suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));      // Test RMI many on few      setProtoMax(glob,"RMI","500");      suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));      // Test IOR many on many      setProtoMax(glob,"IOR","0");      suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,false));      // Test RMI many on many      setProtoMax(glob,"RMI","0");      suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,false));            return suite;   }   private static void setProtoMax(Global glob, String proto, String max) {      try {         glob.getProperty().set("client.protocol",proto);         glob.getProperty().set("maxSubPerCon",max);      }catch(XmlBlasterException ex) {         assertTrue("Could not setup test: " + ex, false);      }   }   public static void main(String[] args) {      Global glob = new Global(args);      setProtoMax(glob, "IOR", "500");      MassiveSubTest m = new MassiveSubTest(glob, "testManyClients", "testManyClients", false);      m.setUp();      m.testManyClients();      m.tearDown();   }   } // MassiveSubTest

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -