📄 md5changedetector.java
字号:
/*------------------------------------------------------------------------------Name: MD5ChangeDetector.javaProject: org.xmlBlasterProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher.detector;import java.util.logging.Logger;import java.util.logging.Level;import java.io.ByteArrayOutputStream;import java.io.BufferedOutputStream;import java.util.Set;import java.util.HashSet;import java.util.Map;import java.util.HashMap;import java.util.Iterator;import java.sql.Connection;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.db.I_ResultCb;import org.xmlBlaster.contrib.dbwatcher.ChangeEvent;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.I_ChangeListener;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;/** * Check the database and compare the MD5 of the result set * to the previous one. * <p>Configuration:</p> * <ul> * <li><tt>changeDetector.detectStatement</tt> for example * <tt>"SELECT col1, col2, ICAO_ID FROM TEST_POLL ORDER BY ICAO_ID</tt> * </li> * <li><tt>changeDetector.groupColName</tt> in the above example * <tt>ICAO_ID</tt>, the SELECT must be sorted after this column and must * list it. All distinct <tt>ICAO_ID</tt> values trigger an own publish event. * If not configured, the whole query is MD5 compared and triggers on change exactly one publish event * </li> * </ul> * <p> * If the table does not exist in the DB no event is triggered, if an empty * table comes to existence an empty event with untouched topic name * is triggered: * </p> * <pre> * topic='db.change.event.${ICAO_ID}' * * <?xml version='1.0' encoding='UTF-8' ?> * <sql> * </sql> * </pre> * * <p> * Note that the previous MD5 values are hold in RAM only, after * plugin restart they are lost and a complete set of data is send again. * </p> * <table border="1"> <tr> <th>DB statement</th> <th>Reported change</th> <th>Comment</th> </tr> <tr> <td>CREATE</td> <td>CREATE</td> <td>-</td> </tr> <tr> <td>INSERT</td> <td>INSERT</td> <td>On multiple table entries for a <tt>changeDetector.groupColName</tt> the change is reported as <tt>UPDATE</tt></td> </tr> <tr> <td>UPDATE</td> <td>UPDATE</td> <td>-</td> </tr> <tr> <td>DELETE</td> <td>DELETE</td> <td>-</td> </tr> <tr> <td>DROP</td> <td>DROP</td> <td>see <tt>mom.eraseOnDrop</tt> configuration</td> </tr> </table> * @author Marcel Ruff */public class MD5ChangeDetector implements I_ChangeDetector{ private static Logger log = Logger.getLogger(MD5ChangeDetector.class.getName()); protected I_Info info; protected I_ChangeListener changeListener; protected I_DataConverter dataConverter; protected I_DbPool dbPool; protected boolean poolOwner; protected boolean tableExists=true; protected MessageDigest digest; protected final Map md5Map = new HashMap(); protected final Set touchSet = new HashSet(); protected String changeDetectStatement; protected String groupColName; protected boolean useGroupCol; protected int changeCount; protected String queryMeatStatement; //protected Connection conn; /** * Default constructor, you need to call {@link #init} thereafter. */ public MD5ChangeDetector() { // void } /** * Convenience constructor which calls {@link #init}. * @param info The configuration environment * @param changeListener The listener to notify if something has changed * @param dataConverter A converter or null * @throws Exception */ public MD5ChangeDetector(I_Info info, I_ChangeListener changeListener, I_DataConverter dataConverter) throws Exception { init(info, changeListener, dataConverter); } /** * Needs to be called after construction. * @param info The configuration * @param changeListener The listener to notify if something has changed * @param dataConverter If not null the data will be transformed immediately during change detection * @throws Exception */ public synchronized void init(I_Info info, I_ChangeListener changeListener, I_DataConverter dataConverter) throws Exception { this.changeListener = changeListener; this.info = info; this.dataConverter = dataConverter; this.queryMeatStatement = info.get("db.queryMeatStatement", (String)null); if (this.queryMeatStatement != null && this.queryMeatStatement.length() < 1) this.queryMeatStatement = null; if (this.queryMeatStatement != null) this.queryMeatStatement = this.queryMeatStatement.trim(); // Check if we need to make a data conversion if (this.dataConverter != null && this.queryMeatStatement != null) { this.dataConverter = null; log.info("Ignoring given dataConverter as 'db.queryMeatStatement' is configured"); } try { this.digest = MessageDigest.getInstance("MD5"); } catch(NoSuchAlgorithmException e) { log.severe("'MD5' is not supported: " + e.toString()); throw e; //new Exception(e); } this.changeDetectStatement = this.info.get("changeDetector.detectStatement", ""); if (this.changeDetectStatement == null) { throw new IllegalArgumentException("Please pass a change detection SQL statement, for example 'changeDetector.detectStatement=SELECT col1, col2 FROM TEST_POLL ORDER BY ICAO_ID'"); } this.changeDetectStatement = this.changeDetectStatement.trim(); ClassLoader cl = this.getClass().getClassLoader(); this.dbPool = (I_DbPool)this.info.getObject("db.pool"); if (this.dbPool == null) { String dbPoolClass = this.info.get("dbPool.class", "org.xmlBlaster.contrib.db.DbPool").trim(); if (dbPoolClass.length() > 0) { this.dbPool = (I_DbPool)cl.loadClass(dbPoolClass).newInstance(); this.dbPool.init(info); if (log.isLoggable(Level.FINE)) log.fine(dbPoolClass + " created and initialized"); } else throw new IllegalArgumentException("Couldn't initialize I_DbPool, please configure 'dbPool.class' to provide a valid JDBC access."); this.poolOwner = true; this.info.putObject("db.pool", this.dbPool); } // if null: check the complete table // if != null: check for each groupColName change separately this.groupColName = this.info.get("changeDetector.groupColName", (String)null); if (this.groupColName != null && this.groupColName.length() < 1) this.groupColName = null; this.useGroupCol = this.groupColName != null; if (this.groupColName == null) this.groupColName = this.info.get("mom.topicName", "db.change.event"); } /** * Check the observed data for changes. * <p /> * The method is synchronized so you can call it from the AlertScheduler * or manually from outside simultaneously. * @see org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector#checkAgain(Map) */ public synchronized int checkAgain(Map attrMap) throws Exception { if (log.isLoggable(Level.FINE)) log.fine("Checking for MD5 changes ..."); this.changeCount = 0; Connection conn = null; try { conn = this.dbPool.select(conn, this.changeDetectStatement, new I_ResultCb() { public void result(Connection conn, ResultSet rs) throws Exception { if (log.isLoggable(Level.FINE)) log.fine("Processing result set"); if (rs == null) { changeCount = 0; if (!tableExists || md5Map.size() == 0) { if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' does not exist, no changes to report"); } else { if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' has been deleted"); Iterator it = md5Map.keySet().iterator(); while (it.hasNext()) { String key = (String)it.next(); String resultXml = ""; if (queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(groupColName, key, null, "DROP", null); changeCount += changeListener.publishMessagesFromStmt(queryMeatStatement, useGroupCol, changeEvent, conn); } else { if (dataConverter != null) { ByteArrayOutputStream bout = new ByteArrayOutputStream(); BufferedOutputStream out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, "DROP", key, null); dataConverter.done(); resultXml = bout.toString(); } changeListener.hasChanged( new ChangeEvent(groupColName, key, resultXml, "DROP", null)); changeCount++; } } md5Map.clear(); } tableExists = false; return; } if (useGroupCol) changeCount = checkWithGroupCol(conn, rs); else changeCount = checkComplete(conn, rs); if (log.isLoggable(Level.FINE)) log.fine("Processing result set done"); } }); } catch (Exception e) { e.printStackTrace(); log.severe("Panic: Change detection failed for '" + this.changeDetectStatement + "': " + e.toString()); if (conn != null) { this.dbPool.erase(conn); conn = null; } } finally { conn = DbWatcher.releaseWithCommit(conn, this.dbPool); } return changeCount; } /** * The select statement contains no grouping. * <tt>CREATE</tt> and <tt>DROP</tt> are reliable, all other changes are marked as <tt>UPDATE</tt>. * @param rs The JDBC query result * @return The number of changes detected
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -