⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 timestampchangedetector.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      TimestampChangeDetector.javaProject:   org.xmlBlasterProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher.detector;import java.io.BufferedOutputStream;import java.io.ByteArrayOutputStream;import java.sql.ResultSet;import java.sql.SQLException;import java.util.Date;import java.util.Map;import java.util.logging.Logger;import java.util.logging.Level;import java.sql.Connection;import java.text.SimpleDateFormat;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.db.DbInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.db.I_ResultCb;import org.xmlBlaster.contrib.dbwatcher.ChangeEvent;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.I_ChangeListener;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;/** * Check the database and compare the change timestamp of a table to the previous one.  * <h2>Configuration</h2> * <ul> *  <li><tt>changeDetector.detectStatement</tt> the SQL statement which detects that a change has occurred, for example *      <tt>SELECT MAX(TO_CHAR(ts, 'YYYY-MM-DD HH24:MI:SSXFF')) FROM TEST_TS</tt> *  </li> *  <li><tt>changeDetector.timestampColNum</tt> is set to 1 and specifies the column number *       where the above <tt>changeDetector.detectStatement</tt> delivers the max result. *       Usually you don't need to change this. *  </li> *  <li><tt>db.queryMeatStatement</tt> is executed when a change was detected, it collects *      the wanted data to send as a message, for example *      <tt>SELECT * FROM TEST_POLL WHERE TO_CHAR(TS, 'YYYY-MM-DD HH24:MI:SSXFF') > '${oldTimestamp}' ORDER BY CAR</tt>. *      It should be order by the <tt>changeDetector.groupColName</tt> value (if such is given). *  </li> *  <li><tt>changeDetector.groupColName</tt> in the above example *      <tt>ICAO_ID</tt>, the SELECT must be sorted after this column and must *       list it. All distinct <tt>ICAO_ID</tt> values trigger an own publish event. *       If not configured, this plugin triggers on change exactly one publish event. *  </li> *  <li><tt>changeDetector.ignoreExistingDataOnStartup</tt> defaults to false, *       like this all SQL data in the observed table are delivered on DbWatcher startup. *       If you set this to <tt>true</tt> only the future changes are detected after *       a new xmlBlaster restart. *  </li> * </ul> * * <h2>Limitations</h2> * <p>The nature of this plugin is based on a timestamp comparison, * as such it does not detect <b>DELETE</b> changes of database rows, as this * will not create a new timestamp. All other commands (CREATE, INSERT, UPDATE) will * touch the timestamp and are therefor detected. Additionally a DROP is detected.</p>   <table border="1">     <tr>       <th>DB statement</th>       <th>Reported change</th>       <th>Comment</th>     </tr>     <tr>       <td>CREATE</td>       <td>CREATE</td>       <td>-</td>     </tr>     <tr>       <td>INSERT</td>       <td>UPDATE</td>       <td>SQL <tt>INSERT</tt> statement are reported as <tt>UPDATE</tt></td>     </tr>     <tr>       <td>UPDATE</td>       <td>UPDATE</td>       <td>-</td>     </tr>     <tr>       <td>DELETE</td>       <td>-</td>       <td>Is not detected</td>     </tr>     <tr>       <td>DROP</td>       <td>DROP</td>       <td>see <tt>mom.eraseOnDrop</tt> configuration</td>     </tr>   </table> * * <p> * Note that the previous timestamp value is hold in RAM only, after * plugin restart it is lost and a complete set of data is send again. * </p> * @author Marcel Ruff * @author Michele Laghi  */public class TimestampChangeDetector implements I_ChangeDetector, TimestampChangeDetectorMBean{   private static Logger log = Logger.getLogger(TimestampChangeDetector.class.getName());   protected I_Info info;   protected I_ChangeListener changeListener;   protected I_DataConverter dataConverter;   protected I_DbPool dbPool;   protected boolean poolOwner;   protected String changeDetectStatement;   protected int timestampColNum;   protected String groupColName;   protected boolean useGroupCol;   protected boolean tableExists=true;   protected boolean ignoreExistingDataOnStartup;   protected String changeCommand;   protected String oldTimestamp;   protected String newTimestamp;   String MINSTR = " ";   protected String queryMeatStatement;   private final static String LAST_TIMESTAMP_KEY = "lastTimestamp";   private final static String PERSIST_KEY = "changeDetector.persist";         private I_Info persistentInfo;      private boolean ignoreDetection;      final private void persistTimestampIfNecessary() {      if (this.persistentInfo == null)         return;      log.fine("storing the timestamp '" + this.oldTimestamp + "' on persistence");      this.persistentInfo.put(LAST_TIMESTAMP_KEY, this.oldTimestamp);   }   /**    * Modifies a string if it contains the special token '${currentDate}'.    * The correct syntax would be :    * ${currentDate}=yyyy-MM-dd HH:mm:ss.0    * Note that spaces after the equality sign count.    * If you only specify ${currentDate} without equality Sign, then     * the current time returned as System.getCurrentTime() is returned.    * This method is made public for testing.     * @param in    * @return    */   public String modifyMinStrIfDateWithPersistence(String in, long time) throws Exception {      // '2005-11-25 12:48:00.0' "yyyy-MM-dd HH:mm:ss.0"      // detect if it has to be processed:      boolean persist = this.info.getBoolean(PERSIST_KEY, false);      if (persist) {         String id = this.info.get(I_Info.ID, null);         if (id == null)            log.severe("No 'id' has been defined for this detector. Can not use persistent data");         this.persistentInfo = new DbInfo(DbWatcher.getDbInfoPool(this.info), id, this.info);         String timestamp = this.persistentInfo.get(LAST_TIMESTAMP_KEY, null);         log.warning("The time id='" + id + "' name='" + LAST_TIMESTAMP_KEY + "' has not been found. Will take current system time");         if (timestamp != null)            in = timestamp;      }      try {         return modifyMinStrIfDate(in, time);      }      finally {         if (this.persistentInfo != null && in != null && in.length() > 0)            persistentInfo.put(LAST_TIMESTAMP_KEY, in);      }   }      public static String modifyMinStrIfDate(String in, long time) throws Exception {      // '2005-11-25 12:48:00.0' "yyyy-MM-dd HH:mm:ss.0"      // detect if it has to be processed:      if (in == null)         return null;      if (in.length() < 1)         return in;      if (time < 1)         time = System.currentTimeMillis();      int pos0 = in.indexOf("${currentDate}");       if (pos0 > -1) { // then it is for us         int pos = in.indexOf("=");         if (pos < pos0 + "${currentDate}".length())            return "" + time;         String formatString = in.substring(pos+1);         SimpleDateFormat format = new SimpleDateFormat(formatString);         in = format.format(new Date(time));      }      return in;   }   // '2005-11-25 12:48:00.0' "yyyy-MM-dd HH:mm:ss.0"   /**    * @param info    * @param changeListener    * @param dataConverter    * @see I_ChangeDetector#init    * @throws Exception    */   public synchronized void init(I_Info info, I_ChangeListener changeListener, I_DataConverter dataConverter) throws Exception {      this.changeListener = changeListener;      this.info = info;      this.dataConverter = dataConverter;            this.queryMeatStatement = this.info.get("db.queryMeatStatement", (String)null);      if (this.queryMeatStatement != null && this.queryMeatStatement.length() < 1)         this.queryMeatStatement = null;      // Check if we need to make a data conversion      if (this.dataConverter != null && this.queryMeatStatement != null) {         this.dataConverter = null;         log.info("Ignoring given dataConverter as 'db.queryMeatStatement' is configured");      }      this.changeDetectStatement = this.info.get("changeDetector.detectStatement", "");      if (this.changeDetectStatement == null || this.changeDetectStatement.length() < 1) {         throw new IllegalArgumentException("Please pass a change detection SQL statement, for example 'changeDetector.detectStatement=SELECT col1, col2 FROM TEST_POLL ORDER BY ICAO_ID'");      }      this.timestampColNum = this.info.getInt("changeDetector.timestampColNum", 1);      if (this.timestampColNum < 1) {         throw new IllegalArgumentException("Please pass the JDBC index (starts with 1) of the column containing the timestamp, for example 'changeDetector.timestampColNum=1'");      }            this.ignoreExistingDataOnStartup = this.info.getBoolean("changeDetector.ignoreExistingDataOnStartup", this.ignoreExistingDataOnStartup);      this.dbPool = DbWatcher.getDbPool(this.info);      this.MINSTR = this.info.get("changeDetector.MINSTR", this.MINSTR);      // '2005-11-25 12:48:00.0' "yyyy-MM-dd HH:mm:ss.0"      this.MINSTR = modifyMinStrIfDateWithPersistence(this.MINSTR, 0);            //String colGroupingName = xmlBlasterPlugin.getType(); // or something else if not inside xmlBlaster       //this.persistentInfo = new DbInfo(this.dbPool, colGroupingName);      //this.oldTimestamp = this.persistentInfo.get("I_ChangeDetector.oldTimestamp");            // if null: check the complete table      // if != null: check for each groupColName change separately      this.groupColName = this.info.get("changeDetector.groupColName", (String)null);      if (this.groupColName != null && this.groupColName.length() < 1)         this.groupColName = null;      this.useGroupCol = this.groupColName != null;      if (this.groupColName == null)         this.groupColName = this.info.get("mom.topicName", "db.change.event");      // to add this to JMX      String jmxName = I_Info.JMX_PREFIX + "timestampChangeDetector";      info.putObject(jmxName, this);      log.info("Added object '" + jmxName + "' to I_Info to be added as an MBean");   }      /**    * Compares the two strings as numerical values. If the newTimestamp is really newer than the oldTimestamp,    * then it returns true, false otherwise.    *     * @param oldTimestamp    * @param newTimestamp    * @return

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -