⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 md5changedetector.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      MD5ChangeDetector.javaProject:   org.xmlBlasterProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher.detector;import java.util.logging.Logger;import java.util.logging.Level;import java.io.ByteArrayOutputStream;import java.io.BufferedOutputStream;import java.util.Set;import java.util.HashSet;import java.util.Map;import java.util.HashMap;import java.util.Iterator;import java.sql.Connection;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.db.I_ResultCb;import org.xmlBlaster.contrib.dbwatcher.ChangeEvent;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.I_ChangeListener;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;/** * Check the database and compare the MD5 of the result set * to the previous one.  * <p>Configuration:</p> * <ul> *  <li><tt>changeDetector.detectStatement</tt> for example *      <tt>"SELECT col1, col2, ICAO_ID FROM TEST_POLL ORDER BY ICAO_ID</tt> *  </li> *  <li><tt>changeDetector.groupColName</tt> in the above example *      <tt>ICAO_ID</tt>, the SELECT must be sorted after this column and must *       list it. All distinct <tt>ICAO_ID</tt> values trigger an own publish event. *       If not configured, the whole query is MD5 compared and triggers on change exactly one publish event *  </li> * </ul> * <p> * If the table does not exist in the DB no event is triggered, if an empty * table comes to existence an empty event with untouched topic name * is triggered: * </p> * <pre> * topic='db.change.event.${ICAO_ID}' *  * &lt;?xml version='1.0' encoding='UTF-8' ?> * &lt;sql> * &lt;/sql> * </pre> * * <p> * Note that the previous MD5 values are hold in RAM only, after * plugin restart they are lost and a complete set of data is send again. * </p> *    <table border="1">     <tr>       <th>DB statement</th>       <th>Reported change</th>       <th>Comment</th>     </tr>     <tr>       <td>CREATE</td>       <td>CREATE</td>       <td>-</td>     </tr>     <tr>       <td>INSERT</td>       <td>INSERT</td>       <td>On multiple table entries for a <tt>changeDetector.groupColName</tt> the change is reported as <tt>UPDATE</tt></td>     </tr>     <tr>       <td>UPDATE</td>       <td>UPDATE</td>       <td>-</td>     </tr>     <tr>       <td>DELETE</td>       <td>DELETE</td>       <td>-</td>     </tr>     <tr>       <td>DROP</td>       <td>DROP</td>       <td>see <tt>mom.eraseOnDrop</tt> configuration</td>     </tr>   </table> * @author Marcel Ruff */public class MD5ChangeDetector implements I_ChangeDetector{   private static Logger log = Logger.getLogger(MD5ChangeDetector.class.getName());   protected I_Info info;   protected I_ChangeListener changeListener;   protected I_DataConverter dataConverter;   protected I_DbPool dbPool;   protected boolean poolOwner;   protected boolean tableExists=true;   protected MessageDigest digest;   protected final Map md5Map = new HashMap();   protected final Set touchSet = new HashSet();   protected String changeDetectStatement;   protected String groupColName;   protected boolean useGroupCol;   protected int changeCount;   protected String queryMeatStatement;   //protected Connection conn;   /**    * Default constructor, you need to call {@link #init} thereafter.     */   public MD5ChangeDetector() {      // void   }      /**    * Convenience constructor which calls {@link #init}.    * @param info The configuration environment    * @param changeListener The listener to notify if something has changed    * @param dataConverter A converter or null    * @throws Exception    */   public MD5ChangeDetector(I_Info info, I_ChangeListener changeListener, I_DataConverter dataConverter) throws Exception {      init(info, changeListener, dataConverter);   }      /**    * Needs to be called after construction.     * @param info The configuration    * @param changeListener The listener to notify if something has changed    * @param dataConverter If not null the data will be transformed immediately during change detection    * @throws Exception    */   public synchronized void init(I_Info info, I_ChangeListener changeListener, I_DataConverter dataConverter) throws Exception {      this.changeListener = changeListener;      this.info = info;      this.dataConverter = dataConverter;            this.queryMeatStatement = info.get("db.queryMeatStatement", (String)null);      if (this.queryMeatStatement != null && this.queryMeatStatement.length() < 1)         this.queryMeatStatement = null;      if (this.queryMeatStatement != null)         this.queryMeatStatement = this.queryMeatStatement.trim();      // Check if we need to make a data conversion      if (this.dataConverter != null && this.queryMeatStatement != null) {         this.dataConverter = null;         log.info("Ignoring given dataConverter as 'db.queryMeatStatement' is configured");      }      try {         this.digest = MessageDigest.getInstance("MD5");      }      catch(NoSuchAlgorithmException e) {         log.severe("'MD5' is not supported: " + e.toString());         throw e; //new Exception(e);      }      this.changeDetectStatement = this.info.get("changeDetector.detectStatement", "");      if (this.changeDetectStatement == null) {         throw new IllegalArgumentException("Please pass a change detection SQL statement, for example 'changeDetector.detectStatement=SELECT col1, col2 FROM TEST_POLL ORDER BY ICAO_ID'");      }      this.changeDetectStatement = this.changeDetectStatement.trim();      ClassLoader cl = this.getClass().getClassLoader();      this.dbPool = (I_DbPool)this.info.getObject("db.pool");      if (this.dbPool == null) {         String dbPoolClass = this.info.get("dbPool.class", "org.xmlBlaster.contrib.db.DbPool").trim();         if (dbPoolClass.length() > 0) {            this.dbPool = (I_DbPool)cl.loadClass(dbPoolClass).newInstance();            this.dbPool.init(info);            if (log.isLoggable(Level.FINE)) log.fine(dbPoolClass + " created and initialized");         }         else            throw new IllegalArgumentException("Couldn't initialize I_DbPool, please configure 'dbPool.class' to provide a valid JDBC access.");         this.poolOwner = true;         this.info.putObject("db.pool", this.dbPool);      }            // if null: check the complete table      // if != null: check for each groupColName change separately      this.groupColName = this.info.get("changeDetector.groupColName", (String)null);      if (this.groupColName != null && this.groupColName.length() < 1)         this.groupColName = null;      this.useGroupCol = this.groupColName != null;      if (this.groupColName == null)         this.groupColName = this.info.get("mom.topicName", "db.change.event");   }   /**    * Check the observed data for changes.      * <p />    * The method is synchronized so you can call it from the AlertScheduler    * or manually from outside simultaneously.     * @see org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector#checkAgain(Map)    */   public synchronized int checkAgain(Map attrMap) throws Exception {      if (log.isLoggable(Level.FINE)) log.fine("Checking for MD5 changes ...");      this.changeCount = 0;      Connection conn = null;      try {         conn = this.dbPool.select(conn, this.changeDetectStatement, new I_ResultCb() {            public void result(Connection conn, ResultSet rs) throws Exception {               if (log.isLoggable(Level.FINE)) log.fine("Processing result set");                           if (rs == null) {                  changeCount = 0;                  if (!tableExists || md5Map.size() == 0) {                     if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' does not exist, no changes to report");                  }                  else {                     if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' has been deleted");                     Iterator it = md5Map.keySet().iterator();                     while (it.hasNext()) {                        String key = (String)it.next();                        String resultXml = "";                        if (queryMeatStatement != null) { // delegate processing of message meat ...                           ChangeEvent changeEvent = new ChangeEvent(groupColName, key, null, "DROP", null);                           changeCount += changeListener.publishMessagesFromStmt(queryMeatStatement, useGroupCol, changeEvent, conn);                        }                        else {                           if (dataConverter != null) {                              ByteArrayOutputStream bout = new ByteArrayOutputStream();                              BufferedOutputStream out = new BufferedOutputStream(bout);                              dataConverter.setOutputStream(out, "DROP", key, null);                              dataConverter.done();                              resultXml = bout.toString();                           }                           changeListener.hasChanged(                                 new ChangeEvent(groupColName, key, resultXml, "DROP", null));                           changeCount++;                        }                     }                     md5Map.clear();                  }                  tableExists = false;                  return;               }               if (useGroupCol)                  changeCount = checkWithGroupCol(conn, rs);               else                  changeCount = checkComplete(conn, rs);               if (log.isLoggable(Level.FINE)) log.fine("Processing result set done");            }         });      }      catch (Exception e) {         e.printStackTrace();         log.severe("Panic: Change detection failed for '" +                    this.changeDetectStatement + "': " + e.toString());         if (conn != null) {            this.dbPool.erase(conn);            conn = null;         }      }      finally {         conn = DbWatcher.releaseWithCommit(conn, this.dbPool);      }      return changeCount;   }   /**    * The select statement contains no grouping.      * <tt>CREATE</tt> and <tt>DROP</tt> are reliable, all other changes are marked as <tt>UPDATE</tt>.    * @param rs The JDBC query result    * @return The number of changes detected 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -