⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xmlblasterpublisher.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
      if (this.alertSubscribeKey != null && this.alertSubscribeKey.length() < 1)          this.alertSubscribeKey = null;      this.alertSubscribeQos = info.get(MOM_ALERT_SUBSCRIBE_QOS, "<qos/>");      String hardConnectQos  = info.get(MOM_CONNECT_QOS, (String)null);      if (hardConnectQos != null) {         this.connectQos = new ConnectQos(this.glob, this.glob.getConnectQosFactory().readObject(hardConnectQos));      }      else {         this.connectQos = new ConnectQos(this.glob, this.loginName, this.password);         int maxSessions = info.getInt(MOM_MAX_SESSIONS, 100);         this.connectQos.setMaxSessions(maxSessions);         this.connectQos.getAddress().setRetries(-1);         this.connectQos.setSessionTimeout(0L);         CallbackAddress cb = this.connectQos.getData().getCurrentCallbackAddress();         cb.setRetries(-1);         /*         if (isRunningNative) {            Address address = this.connectQos.getAddress();            address.setType("LOCAL");            address.setPingInterval(0L);            address.setCollectTime(0L);            this.connectQos.getClientQueueProperty().setType("RAM");            this.connectQos.getClientQueueProperty().setVersion("1.0");            CallbackAddress cb = this.connectQos.getData().getCurrentCallbackAddress();            cb.setPingInterval(0L);            cb.setCollectTime(0L);            cb.setType("LOCAL");            this.connectQos.getData().getSessionCbQueueProperty().setType("RAM");            this.connectQos.getData().getSessionCbQueueProperty().setVersion("1.0");            this.connectQos.getData().getSubjectQueueProperty().setType("RAM");            this.connectQos.getData().getSubjectQueueProperty().setVersion("1.0");         }         */         this.compressSize = info.getInt(MOM_COMPRESS_SIZE, 0);      }      String propKeysToAdd = info.get(MOM_PROPS_TO_ADD_TO_CONNECT, "").trim();      if (propKeysToAdd.length() > 0) {         if ("*".equals(propKeysToAdd)) { // then all properties are added to the connectQos            if (hardConnectQos != null)               log.warning("The property '" + MOM_PROPS_TO_ADD_TO_CONNECT + "' was is set to '*' and and '" + MOM_CONNECT_QOS + "' was set too (some of the properties could be overwritten");            // fill the client properties of the connectQos with the info object            new ClientPropertiesInfo(this.connectQos.getData().getClientProperties(), info);         }         else {            StringTokenizer tokenizer = new StringTokenizer(propKeysToAdd, ",");            while (tokenizer.hasMoreTokens()) {               String key = tokenizer.nextToken().trim();               String val = info.get(key, null);               if (val == null)                  log.warning("The property '" + key + "' shall be added to the connectQos but was not found among the properties");               else {                  if (this.connectQos.getClientProperty(key) != null)                     log.warning("The property '" + key + "' is already set in the client properties of the connect qos. Will be overwritten with the value '" + val + "'");                  this.connectQos.addClientProperty(key, val);               }                              }         }      }            this.con = this.glob.getXmlBlasterAccess();      this.con.registerConnectionListener(this);      this.con.connect(this.connectQos, this);      this.initCount++;      // Make myself available      info.putObject("org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher", this);      info.putObject("org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher", this);      // Add JMX Registration       String jmxName = I_Info.JMX_PREFIX + "xmlBlasterPublisher";      info.putObject(jmxName, this);      log.info("Added object '" + jmxName + "' to I_Info to be added as an MBean");               }      /**    * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#shutdown    */   public synchronized void shutdown() {      this.initCount--;      if (this.initCount > 0)         return;      log.fine("Closing xmlBlaster connection");      if (this.con != null) {         this.con.disconnect(null);         this.con = null;         this.glob = null;      }   }   private void addStringPropToQos(Map attrMap, MsgQosData qos) {      synchronized (attrMap) {         String[] keys = (String[])attrMap.keySet().toArray(new String[attrMap.size()]);         for (int i=0; i < keys.length; i++) {            Object val = attrMap.get(keys[i]);            if (val != null && val instanceof String)               qos.addClientProperty(keys[i], val);         }      }   }      /**    * The send message is configured with <tt>mom.publishKey</tt> and <tt>mom.publishQos</tt>.     * A DROP command erases the topic.      * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#publish(String, String, Map)    */   public String publish(String changeKey, byte[] out, Map attrMap) throws Exception {      // this is only for testing purposes      if (this.throwAwayMessages) {         log.fine("The message '" + changeKey + "' has been thrown away (not published)");         return (new Timestamp()).toString() + "thrownAway";      }      long t0 = System.currentTimeMillis();      if (out == null) out = "".getBytes();      out = MomEventEngine.compress(out, attrMap, this.compressSize, null);      String pk = (changeKey.indexOf("${") == -1) ? DbWatcher.replaceVariable(this.publishKey, changeKey) : this.publishKey;      String command = null;      if (attrMap != null)          command = (String)attrMap.get("_command");      else         command = "";            String destLiteral = null;      Destination destination = null;      List additionalDestinations = null;      if (attrMap != null)         destLiteral = (String)attrMap.get("_destination");            if (destLiteral != null) {         if (destLiteral.indexOf(',') < 0) {            destination = new Destination(new SessionName(this.glob, destLiteral));            destination.forceQueuing(true); // to ensure it works even if this comes before manager         }         else {            StringTokenizer tokenizer = new StringTokenizer(destLiteral, ","); // comma separated list            destination = new Destination(new SessionName(this.glob, tokenizer.nextToken().trim()));            destination.forceQueuing(true);            additionalDestinations = new ArrayList();            while (tokenizer.hasMoreTokens()) {               Destination tmp = new Destination(new SessionName(this.glob, tokenizer.nextToken().trim()));               tmp.forceQueuing(true);            }         }      }            // this is used to register the owner of this object (typically the DbWatcher)      if ("INITIAL_DATA_RESPONSE".equals(command) || "STATEMENT".equals(command)) {         PublishQos qos = null;         if (destination != null) {            qos = new PublishQos(this.glob, destination);            if (additionalDestinations != null) {               for (int i=0; i < additionalDestinations.size(); i++)                  qos.addDestination((Destination)additionalDestinations.get(i));            }         }         else            qos = new PublishQos(this.glob);         qos.setSubscribable(true);         // to force to fill the client properties map !!         ClientPropertiesInfo tmpInfo = new ClientPropertiesInfo(attrMap);         new ClientPropertiesInfo(qos.getData().getClientProperties(), tmpInfo);         addStringPropToQos(attrMap, qos.getData());         PublishKey key = null;         if (changeKey != null && changeKey.length() > 0)            key = new PublishKey(this.glob, changeKey);         else             key = new PublishKey(this.glob, "dbWatcherUnspecified");         key.setContentMime("text/xml");         MsgUnit msg = new MsgUnit(key, out, qos);         PublishReturnQos prq = this.con.publish(msg);         String id = (prq.getRcvTimestamp()!=null)?prq.getRcvTimestamp().toString():"queued";         if (log.isLoggable(Level.FINE)) log.fine("Published '" + prq.getKeyOid() + "' '" + id + "'");         return id;      }      if (this.eraseOnDrop && "DROP".equals(command)) {         String oid = this.glob.getMsgKeyFactory().readObject(pk).getOid();         EraseKey ek = new EraseKey(glob, oid);         EraseQos eq = new EraseQos(glob);         con.erase(ek, eq);         log.info("Topic '" + pk + "' is erased:" + out);         return "0";      }      if (this.eraseOnDelete && "DELETE".equals(command)) {         String oid = this.glob.getMsgKeyFactory().readObject(pk).getOid();         EraseKey ek = new EraseKey(glob, oid);         EraseQos eq = new EraseQos(glob);         con.erase(ek, eq);         log.info("Topic '" + pk + "' is erased:" + out);         return "0";      }      if (log.isLoggable(Level.FINER))          log.finer("Topic '" + pk + "' is published: " + out);      try {         String oid = (String)attrMap.remove(ContribConstants.TOPIC_NAME); // consume it since only used to inform this method         if (destination != null) {            pk = this.adminKey;         }         if (oid != null)            pk = "<key oid='" + oid + "'/>";         MsgUnit msgUnit = new MsgUnit(pk, out, this.publishQos);         String tmp = msgUnit.getKeyData().getContentMime();         // FIXME pass this in the map and set only if explicitly set in the map         if (tmp == null || tmp.equals("text/plain")) {            msgUnit.getKeyData().setContentMime("text/xml");         }          if (destination != null)            ((MsgQosData)msgUnit.getQosData()).addDestination(destination);         // to force to fill the client properties map !!         ClientPropertiesInfo tmpInfo = new ClientPropertiesInfo(attrMap);         new ClientPropertiesInfo(msgUnit.getQosData().getClientProperties(), tmpInfo);         addStringPropToQos(attrMap, (MsgQosData)msgUnit.getQosData());         PublishReturnQos prq = this.con.publish(msgUnit);         String id = (prq.getRcvTimestamp()!=null)?prq.getRcvTimestamp().toString():"queued";         if (log.isLoggable(Level.FINE))             log.fine("Published '" + prq.getKeyOid() + "' '" + id + "'");         this.lastPublishTime = System.currentTimeMillis() - t0;         return id;      }      catch (XmlBlasterException e) {         log.severe("Can't publish to xmlBlaster: " + e.getMessage());         throw e;      }   }      /**    * Subscribes on the alert topic as configured with <tt>mom.alertSubscribeKey</tt>.    * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#registerAlertListener(I_Update)    * @param attrs it currently accepts a null (old behaviour) or if it is not null, then    * the attribute ptp must be set (does not matter to what).    *     * @throws Exception Typically a XmlBlasterException    */   public boolean registerAlertListener(final I_Update momCb, Map attrs) throws Exception {      if (momCb == null) throw new IllegalArgumentException("I_Update==null");      try {         if (attrs == null) { // 'old' behaviour            if (this.alertSubscribeKey == null)               return false;            log.info("Registering on '" + this.alertSubscribeKey + "' for alerts");            SubscribeReturnQos subRet = this.con.subscribe(this.alertSubscribeKey, this.alertSubscribeQos, new I_Callback() {               public String update(String s, UpdateKey k, byte[] c, UpdateQos q) throws XmlBlasterException {                  log.fine("Receiving alert message '" + k.getOid() + "'");                  Map attrMap = clientPropertiesToMap(q.getClientProperties());                  try {                     momCb.update(k.getOid(), new ByteArrayInputStream(c), attrMap);                  }                  catch (Exception e) {                     log.severe("Can't subscribe from xmlBlaster: " + e.getMessage());                     throw new XmlBlasterException(glob, ErrorCode.USER_UPDATE_ERROR, "alertListener", "", e);                  }                  return "";               }             });             this.alertSubscriptionId = subRet.getSubscriptionId();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -