📄 md5changedetector.java
字号:
* @throws Exception of any type */ private int checkComplete(Connection conn, ResultSet rs) throws Exception { int count = 0; String resultXml = ""; ByteArrayOutputStream bout = null; BufferedOutputStream out = null; StringBuffer buf = new StringBuffer(2048); int cols = rs.getMetaData().getColumnCount(); while (rs.next()) { for (int i=1; i<=cols; i++) { // Add cols for later MD5 calculation buf.append(rs.getString(i)); } if (dataConverter != null) { // Create XML dump on demand if (bout == null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); String command = "UPDATE"; // (md5Map.size() == 0) ? "INSERT" : "UPDATE"; dataConverter.setOutputStream(out, command, this.groupColName, null); } dataConverter.addInfo(conn, rs, I_DataConverter.ALL); } } String newMD5 = getMD5(buf.toString()); String old = (String)md5Map.get(this.groupColName); if (old == null || !old.equals(newMD5)) { String command = (this.tableExists) ? "UPDATE" : "CREATE"; this.tableExists = true; if (this.queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(this.groupColName, null, null, command, null); String stmt = this.queryMeatStatement; count = changeListener.publishMessagesFromStmt(stmt, false, changeEvent, conn); } else { // send message without meat ... if (dataConverter != null) { if (bout == null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, this.groupColName, null); dataConverter.addInfo(conn, rs, I_DataConverter.META_ONLY); // Add the meta info for a CREATE } dataConverter.done(); resultXml = bout.toString(); } changeListener.hasChanged( new ChangeEvent(this.groupColName, this.groupColName, resultXml, command, null)); count++; } } md5Map.put(this.groupColName, newMD5); buf.setLength(0); return count; } /** * The select statement with grouping. * <p> * "CREATE" and "DROP" are reliable, * all other changes "UPDATE", "INSERT" * and "DELETE" are guessed and not reliable. * </p> * <p> * Note that for each removed groupColValue a "DROP" is issued * even so the table still may exist. * </p> * @param rs The JDBC query result * @return The number of changes detected * @throws Exception of any type */ private int checkWithGroupCol(Connection conn, ResultSet rs) throws Exception { int count = 0; int rowCount = 0; String resultXml = ""; ByteArrayOutputStream bout = null; BufferedOutputStream out = null; String command = "UPDATE"; ResultSetMetaData rsmd = rs.getMetaData(); int cols = rsmd.getColumnCount(); StringBuffer buf = new StringBuffer(2048); // default if no grouping is configured String groupColValue = "${"+groupColName+"}"; String newGroupColValue = null; boolean first = true; try { while (rs.next()) { newGroupColValue = rs.getString(groupColName); if (rs.wasNull()) newGroupColValue = "__NULL__"; touchSet.add(newGroupColValue); if (rowCount == 0) { touchSet.add(groupColValue); // Add the CREATE table name: ${ICAO_ID} itself if (!this.tableExists) { command = "CREATE"; if (this.queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(groupColName, groupColValue, null, command, null); String stmt = org.xmlBlaster.contrib.dbwatcher.DbWatcher.replaceVariable(this.queryMeatStatement, groupColValue); count = changeListener.publishMessagesFromStmt(stmt, true, changeEvent, conn); } else { // send message directly if (dataConverter != null && bout == null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, groupColValue, null); dataConverter.done(); resultXml = bout.toString(); bout = null; } changeListener.hasChanged( new ChangeEvent(groupColName, groupColValue, resultXml, command, null)); } this.tableExists = true; } } if (first) { command = (md5Map.get(newGroupColValue) != null) ? "UPDATE" : "INSERT"; } rowCount++; if (dataConverter != null && bout == null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, newGroupColValue, null); } if (!first && !groupColValue.equals(newGroupColValue)) { first = false; if (log.isLoggable(Level.FINE)) log.fine("Processing " + groupColName + "=" + groupColValue + " next one to check is '" + newGroupColValue + "'"); String newMD5 = getMD5(buf.toString()); String old = (String)md5Map.get(groupColValue); if (old == null || !old.equals(newMD5)) { if (this.queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(groupColName, groupColValue, null, command, null); String stmt = org.xmlBlaster.contrib.dbwatcher.DbWatcher.replaceVariable(this.queryMeatStatement, groupColValue); count += changeListener.publishMessagesFromStmt(stmt, true, changeEvent, conn); } else { // send message directly if (dataConverter != null) { dataConverter.done(); resultXml = bout.toString(); } changeListener.hasChanged( new ChangeEvent(groupColName, groupColValue, resultXml, command, null)); count++; } } buf.setLength(0); command = (md5Map.get(newGroupColValue) != null) ? "UPDATE" : "INSERT"; if (dataConverter != null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, newGroupColValue, null); } md5Map.put(groupColValue, newMD5); } groupColValue = newGroupColValue; for (int i=1; i<=cols; i++) { // Add cols for later MD5 calculation //System.out.println(">"+rs.getObject(i).toString()+"<"); ">oracle.sql.TIMESTAMP@157b46f<" //System.out.println(">"+rs.getString(i)+"<"); ">2005-1-31.23.0. 47. 236121000<" // -> getObject is not useful as it returns for same timestamp another object instance. buf.append(rs.getString(i)); } if (dataConverter != null) { // Create XML dump on demand dataConverter.addInfo(conn, rs, I_DataConverter.ALL); } first = false; } String newMD5 = getMD5(buf.toString()); String old = (String)md5Map.get(groupColValue); if (old == null || !old.equals(newMD5)) { if (!this.tableExists) { command = "CREATE"; this.tableExists = true; } else if (old == null) command = "INSERT"; else command = "UPDATE"; if (this.queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(groupColName, groupColValue, null, command, null); String stmt = org.xmlBlaster.contrib.dbwatcher.DbWatcher.replaceVariable(this.queryMeatStatement, groupColValue); count += changeListener.publishMessagesFromStmt(stmt, true, changeEvent, conn); } else { // send message directly if (dataConverter != null) { if (bout == null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, newGroupColValue, null); dataConverter.addInfo(conn, rs, I_DataConverter.META_ONLY); // Add the meta info for a CREATE } dataConverter.done(); resultXml = bout.toString(); } changeListener.hasChanged( new ChangeEvent(groupColName, groupColValue, resultXml, command, null)); count++; } } touchSet.add(groupColValue); md5Map.put(groupColValue, newMD5); buf.setLength(0); // Check for DELETEd entries ... String[] arr = (String[])md5Map.keySet().toArray(new String[md5Map.size()]); for (int i=0; i<arr.length; i++) { if (!touchSet.contains(arr[i])) { String key = arr[i]; md5Map.remove(key); command = "DELETE"; if (this.queryMeatStatement != null) { // delegate processing of message meat ... ChangeEvent changeEvent = new ChangeEvent(groupColName, key, null, command, null); String stmt = org.xmlBlaster.contrib.dbwatcher.DbWatcher.replaceVariable(this.queryMeatStatement, key); count += changeListener.publishMessagesFromStmt(stmt, true, changeEvent, conn); } else { // send message directly if (dataConverter != null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, key, null); dataConverter.done(); resultXml = bout.toString(); } changeListener.hasChanged( new ChangeEvent(groupColName, key, resultXml, command, null)); } count++; } } //if (md5Map.size() == 0) // this.tableExists = false; } finally { touchSet.clear(); } return count; } /** * Calculate the MD5 value. * @param value The accumulated string to check * @return The MD5 value */ private String getMD5(String value) { this.digest.update(value.getBytes()); return new String(this.digest.digest()); // Resets digest } /** * @see org.xmlBlaster.contrib.dbwatcher.db.I_DbPool#shutdown * @throws Exception Typicall XmlBlasterException */ public synchronized void shutdown() throws Exception { if (this.poolOwner) { this.dbPool.shutdown(); this.dbPool = null; this.info.putObject("db.pool", null); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -