⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationconverter.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      ReplicationConverter.javaProject:   org.xmlBlasterProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.util.logging.Level;import java.util.logging.Logger;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.StringTokenizer;import java.io.OutputStream;import java.sql.Clob;import java.sql.Connection;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.db.DbInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwatcher.ChangeEvent;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.dbwatcher.convert.I_AttributeTransformer;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlRow;import org.xmlBlaster.contrib.replication.impl.SpecificDefault;import org.xmlBlaster.util.ReplaceVariable;/** * Creates a standardized XML dump from the given ResultSets. * Note this class is not thread safe, in other words you must make sure the same instance of * this class can not be invoked concurently from more than one thread. * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter * @see org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter * @author Michele Laghi */public class ReplicationConverter implements I_DataConverter, ReplicationConstants {   private static Logger log = Logger.getLogger(ReplicationConverter.class.getName());      private I_DbSpecific dbSpecific;   private SqlInfo sqlInfo;   private I_Info info;   private I_AttributeTransformer transformer;   private OutputStream out;   private boolean sendInitialTableContent = true;   private long oldReplKey = -1L;   private I_Info persistentInfo;   private String oldReplKeyPropertyName;   private ChangeEvent event;   private String transactionId;   /** All transactions in this message (needed to delete entries after publishing) */   private List allTransactions;   private String replPrefix;   private I_DbPool dbPool;   private String transSeqPropertyName;   private long transSeq;   private String messageSeqPropertyName;   private long messageSeq;   private long newReplKey;   private boolean sendUnchangedUpdates = true;      /**    * Default constructor, you need to call <tt>init(info)</tt> thereafter.     */   public ReplicationConverter() {      this.allTransactions = new ArrayList();   }   /**    * Create this plugin.     * @param info Possible configuration parameters you find in the class description    * @throws Exception If transformer instantiation fails    */   public ReplicationConverter(I_Info info) throws Exception {      this();      init(info);   }      /**    * This method creates every time a new instance    * @param info    * @param forceNewIfNeeded if true and the entry is not found in the registry, a new object is created, initialized and added    * to the registry, otherwise it only returns entries found in the registry (without initializing the object) or null if none is found.    * @return    * @throws Exception    */   public static I_DbSpecific getDbSpecific(I_Info info, boolean forceNewIfNeeded) throws Exception {      String dbSpecificClass = info.get("replication.dbSpecific.class", "org.xmlBlaster.contrib.replication.impl.SpecificOracle");      I_DbSpecific dbSpecific = (I_DbSpecific)info.getObject(dbSpecificClass + ".object");      if (!forceNewIfNeeded)         return dbSpecific;      if (dbSpecific == null) {         if (dbSpecificClass.length() > 0) {            ClassLoader cl = ReplicationConverter.class.getClassLoader();            dbSpecific = (I_DbSpecific)cl.loadClass(dbSpecificClass).newInstance();            if (log.isLoggable(Level.FINE))                log.fine(dbSpecificClass + " created and initialized");            info.putObject(dbSpecificClass + ".object" ,dbSpecific);        }        else           log.info("Couldn't initialize I_DataConverter, please configure 'converter.class' if you need a conversion.");      }      dbSpecific.init(info);      return dbSpecific;   }      /**    * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter#init(I_Info)    */   public synchronized void init(I_Info info) throws Exception {      this.info = info;      String propsToConnect = this.info.get(DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT, null);      if (propsToConnect == null) {         // String val = REPL_PREFIX_KEY + "," + REPL_VERSION;         String val = "*";         log.info("Adding the property '" + DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT + "' with value '" + val + "'");         this.info.put(DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT, val);      }      else {         Set set = getKeys(propsToConnect, ",");         if (!set.contains(REPL_PREFIX_KEY)) {            String txt = "the property '" + DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT + "' has been explicitly set to '" + propsToConnect + "' but it must contain '" + REPL_PREFIX_KEY + "'";            throw new Exception(txt);          }         if (!set.contains(REPL_VERSION)) {            String txt = "the property '" + DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT + "' has been explicitly set to '" + propsToConnect + "' but it must contain '" + REPL_VERSION + "'";            throw new Exception(txt);          }      }      ClassLoader cl = this.getClass().getClassLoader();      String transformerClassName = info.get("transformer.class", "");      if (transformerClassName != null && transformerClassName.length() > 0) {         this.transformer = (I_AttributeTransformer)cl.loadClass(transformerClassName).newInstance();         this.transformer.init(info);         log.info("Loaded transformer pluging '" + transformerClassName + "'");      }      this.replPrefix = SpecificDefault.getReplPrefix(this.info);      boolean forceCreationAndInit = true;      this.dbSpecific = getDbSpecific(info, forceCreationAndInit);      this.dbSpecific.setAttributeTransformer(this.transformer);      final boolean doFix = true;      this.dbSpecific.checkTriggerConsistency(doFix);      this.sendInitialTableContent = this.info.getBoolean("replication.sendInitialTableContent", true);      // this.persistentMap = new PersistentMap(CONTRIB_PERSISTENT_MAP);      // this.persistentMap = new Info(CONTRIB_PERSISTENT_MAP);      this.dbPool = DbWatcher.getDbPool(this.info);      this.persistentInfo = new DbInfo(DbWatcher.getDbInfoPool(this.info), "replication", this.info);      // we now recreate the triggers if the version has changed      String oldVersionName = this.dbSpecific.getName() + ".previousVersion";      String oldVersion = this.persistentInfo.get(oldVersionName, null);      String currentVersion = this.info.get("replication.version", "0.0");      if (oldVersion == null || !currentVersion.equals(oldVersion))         this.persistentInfo.put(oldVersionName, currentVersion);      boolean versionHasChanged = false;      if (oldVersion != null && !currentVersion.equals(oldVersion))         versionHasChanged = true;      if (versionHasChanged) {         final boolean force = true;         final boolean forceSend = false;         this.dbSpecific.addTriggersIfNeeded(force, null, forceSend);      }      // end of recreating the triggers (if neeed)            this.oldReplKeyPropertyName = this.dbSpecific.getName() + ".oldReplKey";      this.transSeqPropertyName = this.dbSpecific.getName() + ".transactionSequence";      this.messageSeqPropertyName = this.dbSpecific.getName() + ".messageSequence";      this.transSeq = this.persistentInfo.getLong(this.transSeqPropertyName, 0L);      this.messageSeq = this.persistentInfo.getLong(this.messageSeqPropertyName, 0L);      this.info.put(MESSAGE_SEQ, "" + this.messageSeq);      this.sendUnchangedUpdates = this.info.getBoolean(REPLICATION_SEND_UNCHANGED_UPDATES, true);      long tmp = this.persistentInfo.getLong(this.oldReplKeyPropertyName, -1L);      if (tmp > -1L) {         this.oldReplKey = tmp;         log.info("One entry found in persistent map '" + CONTRIB_PERSISTENT_MAP + "' with key '" + this.oldReplKeyPropertyName + "' found. Will start with '" + this.oldReplKey + "'");         // the following to fix the situation where the peristent DBINFO has not been cleaned up but the sequence has been reset         Connection conn = null;         try {            conn = this.dbPool.reserve();            conn.setAutoCommit(true);            long realCounter = this.dbSpecific.incrementReplKey(conn);            if (this.oldReplKey > realCounter) {               log.warning("The counter from persistence is '" + this.oldReplKey + "' while the sequence is '" + realCounter + "' which is lower. Supposly you have reset the sequence but not the persistence. This is now fixed by initially setting the old replKey to be '" + realCounter + "'");               this.oldReplKey = realCounter;            }         }         catch (Throwable ex) {            log.severe("An exception occured when verifying the intial status of the counter against the old replKeY: " + ex.getMessage());            ex.printStackTrace();            SpecificDefault.removeFromPool(conn, SpecificDefault.ROLLBACK_NO, this.dbPool);            conn = null;         }         finally {            SpecificDefault.releaseIntoPool(conn, SpecificDefault.COMMIT_NO, this.dbPool);         }      }      else {         log.info("No entry found in persistent map '" + CONTRIB_PERSISTENT_MAP + "' with key '" + this.oldReplKeyPropertyName + "' found. Starting by 0'");         this.oldReplKey = 0L;      }   }   private static Set getKeys(String val, String sep) {      Set ret = new HashSet();      if (val == null || val.trim().length() < 1)         return ret;      StringTokenizer tokenizer = new StringTokenizer(val, sep);      while (tokenizer.hasMoreTokens())         ret.add(tokenizer.nextToken().trim());      return ret;   }      /**    * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter#shutdown    */   public synchronized void shutdown() throws Exception {      try {         if (this.dbSpecific != null)            this.dbSpecific.shutdown();      }      finally {         this.dbPool.shutdown();         this.dbSpecific = null;      }   }            /**    * Add another result set to the XML string.    * This method is invoked for each SQL Operation. Each transaction, i.e. each message    * can contain several such operations.    *     * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter#addInfo(ResultSet, int)    */   public void addInfo(Connection conn, ResultSet rs, int what) throws Exception {      if (rs == null)         throw new IllegalArgumentException("ReplicationConverter: Given ResultSet is null");            if (this.dbSpecific.isDatasourceReadonly()) {         this.sqlInfo.fillOneRowWithObjects(rs, this.transformer);         return;      }      ResultSetMetaData meta = rs.getMetaData();      int numberOfColumns = meta.getColumnCount();           if (numberOfColumns != 11)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -