📄 timestampchangedetector.java
字号:
*/ private final boolean compareTo(String oldTimestamp, String newTimestamp) { int ret = newTimestamp.length() - oldTimestamp.length(); if (ret == 0) return oldTimestamp.compareTo(newTimestamp) > 0; return ret < 0; } /** * Check the observed data for changes. * @param attrMap Currently "oldTimestamp" can be passed to force a specific scan * @return true if the observed data has changed * @see org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector#checkAgain */ public synchronized int checkAgain(Map attrMap) throws Exception { if (log.isLoggable(Level.FINE)) log.fine("Checking for Timestamp changes '" + this.changeDetectStatement + "' ..."); if (this.ignoreDetection) { log.fine("The detection is deactivated"); return 0; } int changeCount = 0; this.changeCommand = null; // We need the connection for detection and in the same transaction to the queryMeat Connection conn = null; boolean reported = false; if (attrMap != null && attrMap.containsKey("oldTimestamp")) { this.oldTimestamp = (String)attrMap.get("oldTimestamp"); log.info("Reconfigured oldTimestamp to '" +this.oldTimestamp + "' as given by attrMap"); } try { conn = this.dbPool.reserve(); // This has been added 2005-08-27 (Michele Laghi) // FIXME this !!!!! // conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); // TOO RESTRICTIVE IN MOST CASES !!! conn = this.dbPool.select(conn, this.changeDetectStatement, new I_ResultCb() { public void result(Connection conn, ResultSet rs) throws Exception { if (log.isLoggable(Level.FINE)) log.fine("Processing result set"); // Check for missing/dropped table if (rs == null) { if (!tableExists || oldTimestamp == null) { if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' does not exist, no changes to report"); } else { if (log.isLoggable(Level.FINE)) log.fine("Table/view '" + changeDetectStatement + "' has been deleted"); changeCommand = "DROP"; oldTimestamp = null; } tableExists = false; return; } // Check if something has changed newTimestamp = null; int rowCount = 0; while (rs.next()) { newTimestamp = rs.getString(timestampColNum); if (rs.wasNull()) newTimestamp = MINSTR; if (oldTimestamp == null && ignoreExistingDataOnStartup) oldTimestamp = newTimestamp; if (oldTimestamp == null || !oldTimestamp.equals(newTimestamp)) { changeCommand = (tableExists) ? "UPDATE" : "CREATE"; if (oldTimestamp != null && compareTo(oldTimestamp, newTimestamp)) { // The newest entry was removed -> //changeCommand=DELETE // as other DELETE are not detected, we ignore this one as well if (log.isLoggable(Level.FINE)) log.fine("Ignoring DELETE of newest entry to be consistent (as we can't detect other DELETEs): oldTimestamp=" + oldTimestamp + " newTimestamp=" + newTimestamp); changeCommand = null; } } rowCount++; } if (rowCount > 1) throw new IllegalArgumentException("Please correct your change detection SQL statement, it may return max one result set: 'changeDetector.detectStatement="+changeDetectStatement); if (log.isLoggable(Level.FINE)) log.fine("oldTimestamp=" + oldTimestamp + " newTimestamp=" + newTimestamp); } // end of callback method }); if (this.changeCommand != null) { if (log.isLoggable(Level.FINE)) log.fine("Data has changed"); if (!"DROP".equals(this.changeCommand)) tableExists = true; if (this.queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(groupColName, null, null, this.changeCommand, null); String stmt = DbWatcher.replaceVariable(this.queryMeatStatement, oldTimestamp==null?MINSTR:oldTimestamp); try { changeCount = changeListener.publishMessagesFromStmt(stmt, groupColName!=null, changeEvent, conn); } catch (Exception e) { log.severe("Panic: Query meat failed for '" + stmt + "': " + e.toString()); reported = true; throw e; } } else { // send message without meat ... String resultXml = ""; ChangeEvent changeEvent = new ChangeEvent(groupColName, null, resultXml, this.changeCommand, null); if (dataConverter != null) { // add some basic meta info ... ByteArrayOutputStream bout = new ByteArrayOutputStream(); BufferedOutputStream out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, this.changeCommand, groupColName, changeEvent); dataConverter.done(); resultXml = bout.toString(); changeEvent.setXml(resultXml); } changeListener.hasChanged(changeEvent); changeCount++; } oldTimestamp = newTimestamp; persistTimestampIfNecessary(); // TODO rollback in case of an exception and distributed transactions ... } } catch (Exception e) { if (conn != null) { try { conn.rollback(); } catch (SQLException ex) { } this.dbPool.erase(conn); conn = null; } if (!reported) { log.severe("Panic: Change detection failed for '" + this.changeDetectStatement + "': " + e.toString()); } } finally { DbWatcher.releaseWithCommit(conn, this.dbPool); } return changeCount; } /** * @see org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector#shutdown */ public synchronized void shutdown() throws Exception { if (this.dbPool != null) { this.dbPool.shutdown(); this.dbPool = null; } } // methods inherited by the MBean public String getChangeCommand() { return this.changeCommand; } public String getChangeDetectStatement() { return this.changeDetectStatement; } public String getGroupColName() { return this.groupColName; } public String getNewTimestamp() { return this.newTimestamp; } public String getOldTimestamp() { return this.oldTimestamp; } public String getQueryMeatStatement() { return this.queryMeatStatement; } public int getTimestampColNum() { return this.timestampColNum; } public boolean isIgnoreExistingDataOnStartup() { return this.ignoreExistingDataOnStartup; } public boolean isPoolOwner() { return this.poolOwner; } public boolean isTableExists() { return this.tableExists; } public boolean isUseGroupCol() { return this.useGroupCol; } public void setChangeCommand(String changeCommand) { this.changeCommand = changeCommand; } public void setChangeDetectStatement(String changeDetectStatement) { this.changeDetectStatement = changeDetectStatement; } public void setGroupColName(String groupColName) { this.groupColName = groupColName; } public void setOldTimestamp(String oldTimestamp) { this.oldTimestamp = oldTimestamp; } public void setPoolOwner(boolean poolOwner) { this.poolOwner = poolOwner; } public void setQueryMeatStatement(String queryMeatStatement) { this.queryMeatStatement = queryMeatStatement; } public void setTimestampColNum(int timestampColNum) { this.timestampColNum = timestampColNum; } public void setUseGroupCol(boolean useGroupCol) { this.useGroupCol = useGroupCol; } public void stopDetection() { this.ignoreDetection = true; } public void activateDetection() { this.ignoreDetection = false; } public boolean isIgnoreDetection() { return this.ignoreDetection; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -