⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbwatcher.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      TestResultSetToXmlConverter.javaProject:   org.xmlBlasterProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher;import java.sql.ResultSet;import java.io.ByteArrayOutputStream;import java.io.BufferedOutputStream;import java.util.logging.Logger;import java.util.logging.Level;import java.util.Map;import java.util.Properties;import java.util.StringTokenizer;import java.sql.Connection;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.InfoHelper;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.db.I_ResultCb;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;import org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer;import org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector;/** * This is the core processor class to handle database observation.  * <p> * We handle detected changes of the observed target, typically a database table, * and forward them to the {@link I_ChangePublisher} implementation, * typically a message oriented middleware (MoM). * The changes are XML formatted as implemented by the * {@link I_DataConverter} plugin. * <p /> * <p> * This class loads all plugins by reflection and starts them. Each plugin * has its specific configuration paramaters. Descriptions thereof you find * in the plugins documentation. * </p> * <p> * To get you quickly going have a look into <tt>Example.java</tt> * </p> * Configuration: * <ul> *   <li><tt>dbPool.class</tt> configures your implementation of interface {@link I_DbPool} which defaults to <tt>org.xmlBlaster.contrib.db.DbPool</tt></li> *   <li><tt>db.queryMeatStatement</tt> if given a SQL select string this *       is executed on changes and the query result is dumped according *       to the configured I_DataConverter plugin and send as message content *       to the MoM</li> *    <li><tt>converter.class</tt> configures your implementation of interface {@link I_DataConverter} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter</tt></li> *    <li><tt>mom.class</tt> configures your implementation of interface {@link I_ChangePublisher} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher</tt></li> *    <li><tt>changeDetector.class</tt> configures your implementation of interface {@link I_ChangeDetector} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector</tt></li> *    <li><tt>alertProducer.class</tt> configures your implementation of interface {@link I_AlertProducer} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.detector.AlertScheduler</tt> *            Here you can configure multiple classes with a comma separated list. *            Each of them can trigger an new check in parallel, for example *            <tt>alertProducer.class=org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher,org.xmlBlaster.contrib.dbwatcher.detector.AlertSchedulery</tt> will check regularly via *            the AlertScheduler and on message via XmlBlasterPublisher</li>. * </ul> *  * @see org.xmlBlaster.contrib.dbwatcher.test.TestResultSetToXmlConverter * @author Marcel Ruff */public class DbWatcher implements I_ChangeListener {   private static Logger log = Logger.getLogger(DbWatcher.class.getName());   private String queryMeatStatement;   private I_Info info;   private I_DataConverter dataConverter;   private I_ChangePublisher publisher;   private I_ChangeDetector changeDetector;   private I_DbPool dbPool;   private I_AlertProducer[] alertProducerArr;   private int changeCount;   private int collectedMessages = 1;   private int maxCollectedMessages;      /**    * Default constructor, you need to call {@link #init} thereafter.     */   public DbWatcher() {      // void   }   /**    * Convenience constructor, creates a processor for changes, calls {@link #init}.      * @param info Configuration    * @throws Exception Can be of any type    */   public DbWatcher(I_Info info) throws Exception {      init(info);   }   public static I_ChangePublisher getChangePublisher(I_Info info) throws Exception {      synchronized (info) {         I_ChangePublisher publisher = (I_ChangePublisher)info.getObject("mom.publisher");         if (publisher == null) {            ClassLoader cl = DbWatcher.class.getClassLoader();            String momClass = info.get("mom.class", "org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher").trim();            if (momClass.length() > 0) {               publisher = (I_ChangePublisher)cl.loadClass(momClass).newInstance();               info.putObject("mom.publisher", publisher);               if (log.isLoggable(Level.FINE))                   log.fine(momClass + " created and initialized");            }            else               log.severe("Couldn't initialize I_ChangePublisher, please configure 'mom.class'.");         }         publisher.init(info);         return publisher;      }   }      public static I_DbPool getDbPool(I_Info info) throws Exception {      synchronized (info) {         I_DbPool dbPool = (I_DbPool)info.getObject("db.pool");         if (dbPool == null) {            ClassLoader cl = DbWatcher.class.getClassLoader();            String dbPoolClass = info.get("dbPool.class", "org.xmlBlaster.contrib.db.DbPool");            if (dbPoolClass.length() > 0) {                dbPool = (I_DbPool)cl.loadClass(dbPoolClass).newInstance();                if (log.isLoggable(Level.FINE))                    log.fine(dbPoolClass + " created and initialized");            }            else               throw new IllegalArgumentException("Couldn't initialize I_DbPool, please configure 'dbPool.class' to provide a valid JDBC access.");            info.putObject("db.pool", dbPool);         }         dbPool.init(info);         return dbPool;      }   }      /**    * We scan for dbinfo.* properties and replace it with db.* in a second info object to use for the configuration     * of the DbPool. Defaults to getDbPool if no 'dbinfo.url' is specified in the configuration.     * @param info    * @return    * @throws Exception    */   public static I_DbPool getDbInfoPool(I_Info info) throws Exception {      I_Info infoForDbInfo = (I_Info)info.getObject("infoForDbInfo");       if (infoForDbInfo == null) {         Map dbInfoMap = InfoHelper.getPropertiesStartingWith("dbinfo.", info, null, "db.");         if (dbInfoMap.containsKey("db.url")) {            infoForDbInfo = new PropertiesInfo(new Properties());            InfoHelper.fillInfoWithEntriesFromMap(infoForDbInfo, dbInfoMap);            info.putObject("infoForDbInfo", infoForDbInfo);         }      }      if (infoForDbInfo != null)         return getDbPool(infoForDbInfo);      return getDbPool(info);   }      /**    * Creates a processor for changes.     * The alert producers need to be started later with a call to    * {@link #startAlertProducers}     * @param info Configuration    * @throws Exception Can be of any type    */   public void init(I_Info info) throws Exception {      if (info == null) throw new IllegalArgumentException("Missing configuration, info is null");      this.info = info;      //this.dataConverter = converter;      //this.publisher = publisher;      ClassLoader cl = this.getClass().getClassLoader();      this.queryMeatStatement = info.get("db.queryMeatStatement", (String)null);      if (this.queryMeatStatement != null && this.queryMeatStatement.length() < 1)         this.queryMeatStatement = null;      if (this.queryMeatStatement != null)         this.dbPool = getDbPool(this.info);      // Now we load all plugins to do the job      this.maxCollectedMessages = this.info.getInt("dbWatcher.maxCollectedStatements", 0);            String converterClass = this.info.get("converter.class", "org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter").trim();      String changeDetectorClass = this.info.get("changeDetector.class", "org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector").trim();      String alerSchedulerClasses = this.info.get("alertProducer.class", "org.xmlBlaster.contrib.dbwatcher.detector.AlertScheduler").trim(); // comma separated list         if (converterClass.length() > 0) {          this.dataConverter = (I_DataConverter)cl.loadClass(converterClass).newInstance();          this.dataConverter.init(info);          if (log.isLoggable(Level.FINE)) log.fine(converterClass + " created and initialized");      }      else         log.info("Couldn't initialize I_DataConverter, please configure 'converter.class' if you need a conversion.");      // this must be invoked after the converter class has been initialized since the converter class can set properties on the info object      // for example to register applications such as replication (for example the purpose)      this.publisher = getChangePublisher(this.info);      if (changeDetectorClass.length() > 0) {         this.changeDetector = (I_ChangeDetector)cl.loadClass(changeDetectorClass).newInstance();         this.changeDetector.init(info, this, this.dataConverter);         if (log.isLoggable(Level.FINE)) log.fine(changeDetectorClass + " created and initialized");      }      else         log.severe("Couldn't initialize I_ChangeDetector, please configure 'changeDetector.class'.");      StringTokenizer st = new StringTokenizer(alerSchedulerClasses, ":, ");      int countPlugins = st.countTokens();      this.alertProducerArr = new I_AlertProducer[countPlugins];      for (int i = 0; i < countPlugins; i++) {         String clazz = st.nextToken().trim();         try {            Object o = info.getObject(clazz);            if (o != null) {               this.alertProducerArr[i] = (I_AlertProducer)o;               if (log.isLoggable(Level.FINE)) log.fine("Existing AlertProducer '" + this.alertProducerArr[i] + "' reused.");            }            else {               this.alertProducerArr[i] = (I_AlertProducer)cl.loadClass(clazz).newInstance();               if (log.isLoggable(Level.FINE)) log.fine("AlertProducer '" + this.alertProducerArr[i] + "' created.");            }            this.alertProducerArr[i].init(info, this.changeDetector);            //this.alertProducerArr[i].startProducing();         }         catch (Throwable e) {            this.alertProducerArr[i] = null;            log.severe("Couldn't initialize I_AlertProducer '" + clazz + "', the reason is: " + e.toString());         }      }      if (countPlugins == 0) {         log.warning("No AlertProducers are registered, set 'alertProducer.class' to point to your plugin class name");      }            if (log.isLoggable(Level.FINE)) log.fine("DbWatcher created");   }      /**    * Start the work.     */   public void startAlertProducers() {      for (int i=0; i<this.alertProducerArr.length; i++) {         try { this.alertProducerArr[i].startProducing(); } catch(Throwable e) { log.warning(e.toString()); }      }      log.info("DbWatcher is running");   }   /**    * Suspend processing.     */   public void stopAlertProducers() {      for (int i=0; i<this.alertProducerArr.length; i++) {         try { this.alertProducerArr[i].shutdown(); } catch(Throwable e) { log.warning(e.toString()); }      }   }   /**    * Access the MoM handele.     * @return The I_ChangePublisher plugin    */   public I_ChangePublisher getMom() {      return this.publisher;   }      /**    * Access the change detector handele.     * @return The I_ChangeDetector plugin    */   public I_ChangeDetector getChangeDetector() {      return this.changeDetector;   }       /**    * @see org.xmlBlaster.contrib.dbwatcher.I_ChangeListener#hasChanged(ChangeEvent)    */   public void hasChanged(final ChangeEvent changeEvent) {      hasChanged(changeEvent, false);   }      private final void clearMessageAttributesAfterPublish(Map attributeMap) {      // we need to remove this since if several transactions are detected in the same       // detector sweep the same Event is used, so we would not send subsequent messages      attributeMap.remove(I_DataConverter.IGNORE_MESSAGE);      attributeMap.remove(DbWatcherConstants._UNCOMPRESSED_SIZE);      attributeMap.remove(DbWatcherConstants._COMPRESSION_TYPE);   }       /**    * Publishes a message if it has changed and if it has not to be ignored.    * @param changeEvent The event data

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -