⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xmlblasterpublisher.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      XmlBlasterPublisher.javaProject:   org.xmlBlasterProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher.mom;import java.io.ByteArrayInputStream;import java.io.InputStream;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.StringTokenizer;import java.util.TreeMap;import java.util.Iterator;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.I_ConnectionStateListener;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.I_Callback;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.contrib.ClientPropertiesInfo;import org.xmlBlaster.contrib.ContribConstants;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.MomEventEngine;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer;import org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector;import org.xmlBlaster.jms.XBSession;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.MsgUnit;/** * Implementation to send change events to xmlBlaster. * <p> * This plugin plays two roles, first it is the gateway to xmlBlaster and second * if can be configured to listen on a alert topic and use incoming messages as * alerts to check the database again. * </p> * <p>  * Supported configuration: * </p> * <ul> *   <li><tt>mom.topicName</tt> for example <i>db.change.event.${colGroupValue}</i> *       where ${} variables will be replaced by the current *       <tt>colGroupValue</tt>. *   </li> *   <li><tt>mom.loginName</tt> the login name *   </li> *   <li><tt>mom.password</tt> the password *   </li> *   <li><tt>mom.publishKey</tt> for example <tt>&lt;key oid='db.change.event.${colGroupValue}'/></tt></li> *   <li><tt>mom.publishQos</tt> for example <tt>&lt;qos/></tt></li> *   <li><tt>mom.alertSubscribeKey</tt> for example <tt>&lt;key oid='db.change.alert'/></tt><br /> *        To use XmlBlasterPublisher as an alert notifier register with <tt>alertProducer.class=org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher</tt></li> *   <li><tt>mom.alertSubscribeQos</tt> for example <tt>&lt;qos/></tt></li> *   <li><tt>mom.connectQos</tt> if given it is stronger than the <tt>mom.loginName</tt>  *           and <tt>mom.password</tt> settings</li> * </ul> * * @author Marcel Ruff */public class XmlBlasterPublisher implements       I_ChangePublisher,       I_AlertProducer,       I_Callback,       I_ConnectionStateListener,       DbWatcherConstants,      XmlBlasterPublisherMBean {      private static Logger log = Logger.getLogger(XmlBlasterPublisher.class.getName());   protected I_ChangeDetector changeDetector;   protected Global glob;   protected I_XmlBlasterAccess con;   protected String topicNameTemplate;   protected String loginName;   protected String password;   protected String publishKey;   protected String publishQos;   protected String alertSubscribeKey;   protected String alertSubscribeQos;   protected String alertSubscriptionId;   protected ConnectQos connectQos;   protected boolean eraseOnDrop;   protected boolean eraseOnDelete;   private int initCount = 0;    private I_Update defaultUpdate;   private String adminKey = "<key oid='mom.publisher.adminMsg'/>";   private int compressSize;   private boolean throwAwayMessages;   private long lastPublishTime;      /**     * Can be null, taken out of the info object if the owner of this object has set the    * parameter _connectionStateListener.    */   private I_ConnectionStateListener connectionStateListener;      /**    * Default constructor.     * You need to call  {@link #init(I_Info)} thereafter.    */   public XmlBlasterPublisher() {      // void   }   /**    * If called we shall subcribe to xmlBlaster for alert messages    * which notifies us that there may be new changes available, we call    * {@link I_ChangeDetector#checkAgain} in such a case.    * @see org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer#init(I_Info,I_ChangeDetector)    */   public void init(I_Info info, I_ChangeDetector changeDetector) throws Exception {      this.changeDetector = changeDetector;   }   /**    * Subscribes on the alert topic as configured with <tt>mom.alertSubscribeKey</tt>.      * @see org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer#startProducing    */   public void startProducing() throws Exception {      registerAlertListener(new I_Update() {         public void update(String topic, java.io.InputStream is, Map attrMap) {            try {                if (log.isLoggable(Level.FINE)) log.fine("Alert notification arrived '" + topic + "' with " + ((attrMap==null)?0:attrMap.size()) + " attributes");                changeDetector.checkAgain(attrMap);            }            catch (Exception e) {                log.warning("Ignoring alert notification message '" + topic + "': " + e.toString());            }         }      }, null);   }   /**    * Unsubscribes from the alert topic.     * @see org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer#stopProducing    */   public void stopProducing() throws Exception {      if (this.alertSubscriptionId != null) {         UnSubscribeKey sk = new UnSubscribeKey(glob, this.alertSubscriptionId);         UnSubscribeQos sq = new UnSubscribeQos(glob);         this.con.unSubscribe(sk, sq);         this.alertSubscriptionId = null;      }   }   /**    * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys()    */   public Set getUsedPropertyKeys() {      Set set = new HashSet();      set.add(MOM_TOPIC_NAME);      set.add(MOM_LOGIN_NAME);      set.add(MOM_PASSWORD);      set.add(MOM_ERASE_ON_DROP);      set.add(MOM_ERASE_ON_DELETE);      set.add(MOM_PUBLISH_KEY);      set.add(MOM_PUBLISH_QOS);      set.add(MOM_ALERT_SUBSCRIBE_KEY);      set.add(MOM_ALERT_SUBSCRIBE_QOS);      set.add(MOM_CONNECT_QOS);      set.add(MOM_PROPS_TO_ADD_TO_CONNECT);      set.add(MOM_MAX_SESSIONS);      return set;   }   public synchronized void initWithExistingGlob(Global global, String pubKey, String pubQos, int compressionSize) {      this.glob = global;      this.con = this.glob.getXmlBlasterAccess();      this.publishQos = pubQos;      this.publishKey = pubKey;      this.compressSize = compressionSize;   }      /**    * If a global is passed with <tt>info.getObject("org.xmlBlaster.engine.Global")</tt>    * we take a clone and reuse it.     * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#init(I_Info)    */   public synchronized void init(I_Info info) throws Exception {      // here because if somebody makes it as a second object it still works      if (this.connectionStateListener == null) {         log.info("The connection status listener will be added");         this.connectionStateListener = (I_ConnectionStateListener)info.getObject("_connectionStateListener");      }      else         log.warning("The connection status listener for this info has already been defined, ignoring this new request");      if (this.initCount > 0) {         this.initCount++;         return;      }            Global globOrig = (Global)info.getObject("org.xmlBlaster.engine.Global");      if (globOrig == null) {         this.glob = new Global();      }      else {         if (globOrig instanceof org.xmlBlaster.engine.ServerScope) {            this.glob = globOrig.getClone(globOrig.getNativeConnectArgs());            this.glob.addObjectEntry(Constants.OBJECT_ENTRY_ServerScope, globOrig.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope)); //"ServerNodeScope"         }         else {            this.glob = globOrig;         }      }      this.topicNameTemplate = info.get(MOM_TOPIC_NAME, "db.change.event.${groupColValue}");      this.loginName = info.get(MOM_LOGIN_NAME, "dbWatcher/1");      this.password  = info.get(MOM_PASSWORD, "secret");      this.eraseOnDrop = info.getBoolean(MOM_ERASE_ON_DROP, false);      this.eraseOnDelete = info.getBoolean(MOM_ERASE_ON_DELETE, false);      this.publishKey = info.get(MOM_PUBLISH_KEY, (String)null);      if (this.publishKey != null && this.topicNameTemplate != null) {         log.warning("constructor: since 'mom.publishKey' is defined, 'mom.topicName' will be ignored");      }      if (this.publishKey == null && this.topicNameTemplate == null) {         //throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "at least one of the properties 'mom.topicName' or 'mom.publishKey' must be defined");         throw new IllegalArgumentException("At least one of the properties 'mom.topicName' or 'mom.publishKey' must be defined");      }      if (this.publishKey == null) {         this.publishKey = (new PublishKey(this.glob, this.topicNameTemplate)).toXml();       }            this.publishQos = info.get(MOM_PUBLISH_QOS, "<qos/>");      this.alertSubscribeKey = info.get(MOM_ALERT_SUBSCRIBE_KEY, (String)null);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -