⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 timestampchangedetector.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    */   private final boolean compareTo(String oldTimestamp, String newTimestamp) {      int ret = newTimestamp.length() - oldTimestamp.length();      if (ret == 0)         return oldTimestamp.compareTo(newTimestamp) > 0;      return ret < 0;   }         /**    * Check the observed data for changes.     * @param attrMap Currently "oldTimestamp" can be passed to force a specific scan    * @return true if the observed data has changed    * @see org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector#checkAgain    */   public synchronized int checkAgain(Map attrMap) throws Exception {      if (log.isLoggable(Level.FINE)) log.fine("Checking for Timestamp changes '" + this.changeDetectStatement + "' ...");      if (this.ignoreDetection) {         log.fine("The detection is deactivated");         return 0;      }      int changeCount = 0;      this.changeCommand = null;      // We need the connection for detection and in the same transaction to the queryMeat      Connection conn = null;      boolean reported = false;            if (attrMap != null && attrMap.containsKey("oldTimestamp")) {         this.oldTimestamp = (String)attrMap.get("oldTimestamp");         log.info("Reconfigured oldTimestamp to '" +this.oldTimestamp + "' as given by attrMap");      }      try {         conn = this.dbPool.reserve(); // This has been added 2005-08-27 (Michele Laghi)         // FIXME this !!!!!         // conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); // TOO RESTRICTIVE IN MOST CASES !!!                  conn = this.dbPool.select(conn, this.changeDetectStatement, new I_ResultCb() {            public void result(Connection conn, ResultSet rs) throws Exception {               if (log.isLoggable(Level.FINE)) log.fine("Processing result set");                           // Check for missing/dropped table               if (rs == null) {                  if (!tableExists || oldTimestamp == null) {                     if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' does not exist, no changes to report");                  }                  else {                     if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' has been deleted");                     changeCommand = "DROP";                     oldTimestamp = null;                  }                  tableExists = false;                  return;               }               // Check if something has changed               newTimestamp = null;               int rowCount = 0;               while (rs.next()) {                  newTimestamp = rs.getString(timestampColNum);                  if (rs.wasNull())                    newTimestamp = MINSTR;                  if (oldTimestamp == null && ignoreExistingDataOnStartup)                     oldTimestamp = newTimestamp;                  if (oldTimestamp == null || !oldTimestamp.equals(newTimestamp)) {                    changeCommand = (tableExists) ? "UPDATE" : "CREATE";                    if (oldTimestamp != null && compareTo(oldTimestamp, newTimestamp)) {                       // The newest entry was removed ->                        //changeCommand=DELETE                       // as other DELETE are not detected, we ignore this one as well                       if (log.isLoggable(Level.FINE)) log.fine("Ignoring DELETE of newest entry to be consistent (as we can't detect other DELETEs): oldTimestamp=" + oldTimestamp + " newTimestamp=" + newTimestamp);                       changeCommand = null;                    }                  }                  rowCount++;               }               if (rowCount > 1)                  throw new IllegalArgumentException("Please correct your change detection SQL statement, it may return max one result set: 'changeDetector.detectStatement="+changeDetectStatement);               if (log.isLoggable(Level.FINE)) log.fine("oldTimestamp=" + oldTimestamp + " newTimestamp=" + newTimestamp);            } // end of callback method         });                if (this.changeCommand != null) {            if (log.isLoggable(Level.FINE)) log.fine("Data has changed");            if (!"DROP".equals(this.changeCommand))                tableExists = true;            if (this.queryMeatStatement != null) { // delegate processing of message meat ...               ChangeEvent changeEvent = new ChangeEvent(groupColName, null, null, this.changeCommand, null);               String stmt = DbWatcher.replaceVariable(this.queryMeatStatement, oldTimestamp==null?MINSTR:oldTimestamp);               try {                  changeCount = changeListener.publishMessagesFromStmt(stmt, groupColName!=null, changeEvent, conn);               }               catch (Exception e) {                  log.severe("Panic: Query meat failed for '" + stmt + "': " + e.toString());                   reported = true;                  throw e;               }            }            else { // send message without meat ...               String resultXml = "";               ChangeEvent changeEvent = new ChangeEvent(groupColName, null, resultXml, this.changeCommand, null);               if (dataConverter != null) { // add some basic meta info ...                  ByteArrayOutputStream bout = new ByteArrayOutputStream();                  BufferedOutputStream out = new BufferedOutputStream(bout);                  dataConverter.setOutputStream(out, this.changeCommand, groupColName, changeEvent);                  dataConverter.done();                  resultXml = bout.toString();                  changeEvent.setXml(resultXml);               }               changeListener.hasChanged(changeEvent);               changeCount++;            }            oldTimestamp = newTimestamp;            persistTimestampIfNecessary();            // TODO rollback in case of an exception and distributed transactions ...         }      }      catch (Exception e) {         if (conn != null) {            try {               conn.rollback();            }            catch (SQLException ex) {            }            this.dbPool.erase(conn);            conn = null;         }         if (!reported) {            log.severe("Panic: Change detection failed for '" +                       this.changeDetectStatement + "': " + e.toString());          }      }      finally {         DbWatcher.releaseWithCommit(conn, this.dbPool);      }      return changeCount;   }      /**    * @see org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector#shutdown    */   public synchronized void shutdown() throws Exception {      if (this.dbPool != null) {         this.dbPool.shutdown();         this.dbPool = null;      }   }   // methods inherited by the MBean   public String getChangeCommand() {      return this.changeCommand;   }   public String getChangeDetectStatement() {      return this.changeDetectStatement;   }   public String getGroupColName() {      return this.groupColName;   }   public String getNewTimestamp() {      return this.newTimestamp;   }   public String getOldTimestamp() {      return this.oldTimestamp;   }   public String getQueryMeatStatement() {      return this.queryMeatStatement;   }   public int getTimestampColNum() {      return this.timestampColNum;   }   public boolean isIgnoreExistingDataOnStartup() {      return this.ignoreExistingDataOnStartup;   }   public boolean isPoolOwner() {      return this.poolOwner;   }   public boolean isTableExists() {      return this.tableExists;   }   public boolean isUseGroupCol() {      return this.useGroupCol;   }   public void setChangeCommand(String changeCommand) {      this.changeCommand = changeCommand;   }   public void setChangeDetectStatement(String changeDetectStatement) {      this.changeDetectStatement = changeDetectStatement;   }   public void setGroupColName(String groupColName) {      this.groupColName = groupColName;   }   public void setOldTimestamp(String oldTimestamp) {      this.oldTimestamp = oldTimestamp;   }   public void setPoolOwner(boolean poolOwner) {      this.poolOwner = poolOwner;   }   public void setQueryMeatStatement(String queryMeatStatement) {      this.queryMeatStatement = queryMeatStatement;   }   public void setTimestampColNum(int timestampColNum) {      this.timestampColNum = timestampColNum;   }   public void setUseGroupCol(boolean useGroupCol) {      this.useGroupCol = useGroupCol;   }   public void stopDetection() {      this.ignoreDetection = true;   }   public void activateDetection() {      this.ignoreDetection = false;   }   public boolean  isIgnoreDetection() {      return this.ignoreDetection;   }   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -