⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbwatcher.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    * @param recursion To detect recursion    * @return true if publishing of message succeeded or if the message did not need to be published.    */   private boolean hasChanged(final ChangeEvent changeEvent, boolean recursion) {      try {         if (log.isLoggable(Level.FINEST))            log.fine("invoked with '" + changeEvent.getXml());         if (changeEvent.getAttributeMap() == null || changeEvent.getAttributeMap().get(I_DataConverter.IGNORE_MESSAGE) == null) {            if (log.isLoggable(Level.FINE))                log.fine("hasChanged() invoked for groupColValue=" + changeEvent.getGroupColValue());            this.publisher.publish(changeEvent.getGroupColValue(),                  changeEvent.getXml().getBytes(), changeEvent.getAttributeMap());         }         else            log.fine("Message not sent (published) because the attribute '" + I_DataConverter.IGNORE_MESSAGE + "' was set");         clearMessageAttributesAfterPublish(changeEvent.getAttributeMap());         return true;      }      catch(Exception e) {         e.printStackTrace();         log.severe("Can't publish change event " + e.toString() +                     " Event was: " + changeEvent.toString());         return false;      }   }               /**    * Convenience Method which executes a post statement if needed.    *    */   private final void doPostStatement() {      if (this.dataConverter != null) {         String postStatement = this.dataConverter.getPostStatement();         this.collectedMessages = 1;         if (postStatement != null) {            try {               log.fine("executing the post statement '" + postStatement + "'");               this.dbPool.update(postStatement);            }            catch (Exception ex) {               log.severe("An exception occured when cleaning up invocation with post statement '" + postStatement + ": " + ex.getMessage());               ex.printStackTrace();            }         }         else {            if (log.isLoggable(Level.FINEST))               log.finest("No post statement defined after having published, for example: converter.postStatement='INSERT ...'");         }      }   }         /**    * @see I_ChangeListener#publishMessagesFromStmt    */   public int publishMessagesFromStmt(final String stmt, final boolean useGroupCol,                               final ChangeEvent changeEvent,                               Connection conn) throws Exception {      boolean autoCommit = (conn == null);      this.changeCount = 0;      final String command = (changeEvent.getCommand() == null) ? "UPDATE" : changeEvent.getCommand();      Connection connRet = null;      if ("DROP".equals(command) && dataConverter != null) {         ByteArrayOutputStream bout = new ByteArrayOutputStream();         BufferedOutputStream out = new BufferedOutputStream(bout);         dataConverter.setOutputStream(out, command, changeEvent.getGroupColValue(), changeEvent);         dataConverter.done();         String resultXml = bout.toString();         boolean published = hasChanged(new ChangeEvent(changeEvent.getGroupColName(), changeEvent.getGroupColValue(), resultXml, command, changeEvent.getAttributeMap()), true);         if (published)            doPostStatement();         return 1;      }      try {          connRet = this.dbPool.select(conn, stmt, autoCommit, new I_ResultCb() {             public void result(Connection conn, ResultSet rs) throws Exception {                if (log.isLoggable(Level.FINE)) log.fine("Processing result set for '" + stmt + "'");                String groupColName = changeEvent.getGroupColName();                try {                   ByteArrayOutputStream bout = null;                   BufferedOutputStream out = null;                   // default if no grouping is configured                   if (groupColName == null)                      groupColName = info.get("mom.topicName", "db.change");                   String groupColValue = "DELETE".equals(command) ? changeEvent.getGroupColValue() : "${"+groupColName+"}";                   String newGroupColValue = null;                   boolean first = true;                       while (rs != null && rs.next()) {                      if (useGroupCol) {                          newGroupColValue = rs.getString(groupColName);                          if (rs.wasNull()) newGroupColValue = "__NULL__";                      }                      if (log.isLoggable(Level.FINEST))                          log.finest("useGroupCol="+useGroupCol+", groupColName="+groupColName+", groupColValue="+groupColValue+", newGroupColValue="+newGroupColValue);                                            if (!first && !groupColValue.equals(newGroupColValue)) {                         if (log.isLoggable(Level.FINE))                             log.fine("Processing " + groupColName + "=" + groupColValue + " has changed to '" + newGroupColValue + "'");                         if (maxCollectedMessages < 1 || collectedMessages >= maxCollectedMessages) {                            String resultXml = "";                            first = false;                            if (dataConverter != null) {                               dataConverter.done();                               resultXml = bout.toString();                            }                            boolean published = hasChanged(new ChangeEvent(groupColName, groupColValue, resultXml, command, changeEvent.getAttributeMap()), true);                            if (published)                               doPostStatement();                            changeCount++;                            bout = null;                         }                         else                            collectedMessages++;                      }                          groupColValue = newGroupColValue;                                            if (bout == null && dataConverter != null) {                         bout = new ByteArrayOutputStream();                         out = new BufferedOutputStream(bout);                         dataConverter.setOutputStream(out, command, newGroupColValue, changeEvent);                      }                                            if (dataConverter != null) dataConverter.addInfo(conn, rs, I_DataConverter.ALL); // collect data                          first = false;                   } // end while                                      if (bout == null && dataConverter != null) {                      bout = new ByteArrayOutputStream();                      out = new BufferedOutputStream(bout);                      dataConverter.setOutputStream(out, command, groupColValue, changeEvent);                   }                   String resultXml = "";                   if (dataConverter != null) {                      dataConverter.done();                      resultXml = bout.toString();                   }                   boolean published = hasChanged(new ChangeEvent(groupColName, groupColValue, resultXml, command, changeEvent.getAttributeMap()), true);                   if (published)                      doPostStatement();                   changeCount++;                }                catch (Exception e) {                   e.printStackTrace();                   log.severe("Can't publish change event meat for groupColName='" +                       groupColName + "': " + e.toString() + " Query was: " + stmt);                }             }          });      }      finally {         if (conn == null) { // If given conn was null we need to take care ourself            if (connRet != null) this.dbPool.release(connRet);         }      }      return this.changeCount;   }      /**    * Replace e.g. ${XY} with the given token.     * @param text The complete string which may contain zero to many ${...}    *             variables, if null we return null    * @param token The replacement token, if null the original text is returned    * @return The new value where all ${} are replaced.    */   public static String replaceVariable(String text, String token) {      if (text == null) return null;      if (token == null) return text;      //if (token.indexOf("${") >= 0) return text; // Protect against recursion      //while (true) {      int lastFrom = -1;      for (int i=0; i<10; i++) {         int from = text.indexOf("${");         if (from == -1) {            from = text.indexOf("$_{"); // jutils suppresses replacement of such variables            if (from == -1) return text;         }         if (lastFrom != -1 && lastFrom == from) return text; // recursion         int to = text.indexOf("}", from);         if (to == -1) return text;         text = text.substring(0,from) + token + text.substring(to+1);         lastFrom = from;      }      return text;   }   /*   public static String replaceVariable(String text, Map replacements) throws Exception {      if (replacements == null || replacements.size() < 1) return text;      int i;      for (i = 0; i<50; i++) {         int offset = 2;         int from = text.indexOf("${");         if (from == -1) {            from = text.indexOf("$_{");            offset = 3;            if (from == -1) {               break;            }         }         int to = text.indexOf("}", from);         if (to == -1) {            throw new Exception("Invalid variable '" + text.substring(from) + "', expecting ${} syntax in '" + text + "'");         }         String sub = text.substring(from, to + 1); // "${XY}" or $_{XY}         String subKey = sub.substring(offset, sub.length() - 1); // "XY"         String subValue = (String)replacements.get(subKey);         if (subValue == null) {            throw new Exception("Unknown variable '" + subKey + "' in '" + text + "'");         }         text = text.substring(0, from) + subValue + text.substring(to+1);      }            if (i>49)        log.warning("Max. recursion depth reached, please check ${} replacement in '" + text + "'");          return text;   }   */   /**    * Cleanup resources.    * @throws Exception Can be of any type     */   public void shutdown() throws Exception {      for (int i=0; i<this.alertProducerArr.length; i++) {         try { this.alertProducerArr[i].shutdown(); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); }      }      try { if (this.changeDetector != null) this.changeDetector.shutdown(); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); }      try { if (this.dataConverter != null) this.dataConverter.shutdown(); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); }            try {         if (this.publisher != null) {             this.publisher.shutdown();            this.publisher = null;         }      }       catch(Throwable e) {          e.printStackTrace(); log.warning(e.toString());       }      if (this.dbPool != null) {         this.dbPool.shutdown();         this.dbPool = null;         // this.info.putObject("db.pool", null);      }   }         /**    * Helper method used for cleanup:    * invoke conn = DbWatcher.    * @param conn    * @param pool    * @return always null (to be set to the connection)     * @throws Exception    */   public static Connection releaseWithCommit(Connection conn, I_DbPool pool) throws Exception {      if (conn != null) {         try {            conn.commit();         }         catch (Throwable ex) {            ex.printStackTrace();            if (conn != null) {               pool.erase(conn);               conn = null;            }         }         finally {            if (conn != null)               pool.release(conn);            conn = null;         }      }      return conn;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -