📄 dbwatcher.java
字号:
* @param recursion To detect recursion * @return true if publishing of message succeeded or if the message did not need to be published. */ private boolean hasChanged(final ChangeEvent changeEvent, boolean recursion) { try { if (log.isLoggable(Level.FINEST)) log.fine("invoked with '" + changeEvent.getXml()); if (changeEvent.getAttributeMap() == null || changeEvent.getAttributeMap().get(I_DataConverter.IGNORE_MESSAGE) == null) { if (log.isLoggable(Level.FINE)) log.fine("hasChanged() invoked for groupColValue=" + changeEvent.getGroupColValue()); this.publisher.publish(changeEvent.getGroupColValue(), changeEvent.getXml().getBytes(), changeEvent.getAttributeMap()); } else log.fine("Message not sent (published) because the attribute '" + I_DataConverter.IGNORE_MESSAGE + "' was set"); clearMessageAttributesAfterPublish(changeEvent.getAttributeMap()); return true; } catch(Exception e) { e.printStackTrace(); log.severe("Can't publish change event " + e.toString() + " Event was: " + changeEvent.toString()); return false; } } /** * Convenience Method which executes a post statement if needed. * */ private final void doPostStatement() { if (this.dataConverter != null) { String postStatement = this.dataConverter.getPostStatement(); this.collectedMessages = 1; if (postStatement != null) { try { log.fine("executing the post statement '" + postStatement + "'"); this.dbPool.update(postStatement); } catch (Exception ex) { log.severe("An exception occured when cleaning up invocation with post statement '" + postStatement + ": " + ex.getMessage()); ex.printStackTrace(); } } else { if (log.isLoggable(Level.FINEST)) log.finest("No post statement defined after having published, for example: converter.postStatement='INSERT ...'"); } } } /** * @see I_ChangeListener#publishMessagesFromStmt */ public int publishMessagesFromStmt(final String stmt, final boolean useGroupCol, final ChangeEvent changeEvent, Connection conn) throws Exception { boolean autoCommit = (conn == null); this.changeCount = 0; final String command = (changeEvent.getCommand() == null) ? "UPDATE" : changeEvent.getCommand(); Connection connRet = null; if ("DROP".equals(command) && dataConverter != null) { ByteArrayOutputStream bout = new ByteArrayOutputStream(); BufferedOutputStream out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, changeEvent.getGroupColValue(), changeEvent); dataConverter.done(); String resultXml = bout.toString(); boolean published = hasChanged(new ChangeEvent(changeEvent.getGroupColName(), changeEvent.getGroupColValue(), resultXml, command, changeEvent.getAttributeMap()), true); if (published) doPostStatement(); return 1; } try { connRet = this.dbPool.select(conn, stmt, autoCommit, new I_ResultCb() { public void result(Connection conn, ResultSet rs) throws Exception { if (log.isLoggable(Level.FINE)) log.fine("Processing result set for '" + stmt + "'"); String groupColName = changeEvent.getGroupColName(); try { ByteArrayOutputStream bout = null; BufferedOutputStream out = null; // default if no grouping is configured if (groupColName == null) groupColName = info.get("mom.topicName", "db.change"); String groupColValue = "DELETE".equals(command) ? changeEvent.getGroupColValue() : "${"+groupColName+"}"; String newGroupColValue = null; boolean first = true; while (rs != null && rs.next()) { if (useGroupCol) { newGroupColValue = rs.getString(groupColName); if (rs.wasNull()) newGroupColValue = "__NULL__"; } if (log.isLoggable(Level.FINEST)) log.finest("useGroupCol="+useGroupCol+", groupColName="+groupColName+", groupColValue="+groupColValue+", newGroupColValue="+newGroupColValue); if (!first && !groupColValue.equals(newGroupColValue)) { if (log.isLoggable(Level.FINE)) log.fine("Processing " + groupColName + "=" + groupColValue + " has changed to '" + newGroupColValue + "'"); if (maxCollectedMessages < 1 || collectedMessages >= maxCollectedMessages) { String resultXml = ""; first = false; if (dataConverter != null) { dataConverter.done(); resultXml = bout.toString(); } boolean published = hasChanged(new ChangeEvent(groupColName, groupColValue, resultXml, command, changeEvent.getAttributeMap()), true); if (published) doPostStatement(); changeCount++; bout = null; } else collectedMessages++; } groupColValue = newGroupColValue; if (bout == null && dataConverter != null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, newGroupColValue, changeEvent); } if (dataConverter != null) dataConverter.addInfo(conn, rs, I_DataConverter.ALL); // collect data first = false; } // end while if (bout == null && dataConverter != null) { bout = new ByteArrayOutputStream(); out = new BufferedOutputStream(bout); dataConverter.setOutputStream(out, command, groupColValue, changeEvent); } String resultXml = ""; if (dataConverter != null) { dataConverter.done(); resultXml = bout.toString(); } boolean published = hasChanged(new ChangeEvent(groupColName, groupColValue, resultXml, command, changeEvent.getAttributeMap()), true); if (published) doPostStatement(); changeCount++; } catch (Exception e) { e.printStackTrace(); log.severe("Can't publish change event meat for groupColName='" + groupColName + "': " + e.toString() + " Query was: " + stmt); } } }); } finally { if (conn == null) { // If given conn was null we need to take care ourself if (connRet != null) this.dbPool.release(connRet); } } return this.changeCount; } /** * Replace e.g. ${XY} with the given token. * @param text The complete string which may contain zero to many ${...} * variables, if null we return null * @param token The replacement token, if null the original text is returned * @return The new value where all ${} are replaced. */ public static String replaceVariable(String text, String token) { if (text == null) return null; if (token == null) return text; //if (token.indexOf("${") >= 0) return text; // Protect against recursion //while (true) { int lastFrom = -1; for (int i=0; i<10; i++) { int from = text.indexOf("${"); if (from == -1) { from = text.indexOf("$_{"); // jutils suppresses replacement of such variables if (from == -1) return text; } if (lastFrom != -1 && lastFrom == from) return text; // recursion int to = text.indexOf("}", from); if (to == -1) return text; text = text.substring(0,from) + token + text.substring(to+1); lastFrom = from; } return text; } /* public static String replaceVariable(String text, Map replacements) throws Exception { if (replacements == null || replacements.size() < 1) return text; int i; for (i = 0; i<50; i++) { int offset = 2; int from = text.indexOf("${"); if (from == -1) { from = text.indexOf("$_{"); offset = 3; if (from == -1) { break; } } int to = text.indexOf("}", from); if (to == -1) { throw new Exception("Invalid variable '" + text.substring(from) + "', expecting ${} syntax in '" + text + "'"); } String sub = text.substring(from, to + 1); // "${XY}" or $_{XY} String subKey = sub.substring(offset, sub.length() - 1); // "XY" String subValue = (String)replacements.get(subKey); if (subValue == null) { throw new Exception("Unknown variable '" + subKey + "' in '" + text + "'"); } text = text.substring(0, from) + subValue + text.substring(to+1); } if (i>49) log.warning("Max. recursion depth reached, please check ${} replacement in '" + text + "'"); return text; } */ /** * Cleanup resources. * @throws Exception Can be of any type */ public void shutdown() throws Exception { for (int i=0; i<this.alertProducerArr.length; i++) { try { this.alertProducerArr[i].shutdown(); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); } } try { if (this.changeDetector != null) this.changeDetector.shutdown(); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); } try { if (this.dataConverter != null) this.dataConverter.shutdown(); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); } try { if (this.publisher != null) { this.publisher.shutdown(); this.publisher = null; } } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); } if (this.dbPool != null) { this.dbPool.shutdown(); this.dbPool = null; // this.info.putObject("db.pool", null); } } /** * Helper method used for cleanup: * invoke conn = DbWatcher. * @param conn * @param pool * @return always null (to be set to the connection) * @throws Exception */ public static Connection releaseWithCommit(Connection conn, I_DbPool pool) throws Exception { if (conn != null) { try { conn.commit(); } catch (Throwable ex) { ex.printStackTrace(); if (conn != null) { pool.erase(conn); conn = null; } } finally { if (conn != null) pool.release(conn); conn = null; } } return conn; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -