⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationwriter.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      ReplicationWriter.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.io.ByteArrayInputStream;import java.io.File;import java.io.FileInputStream;import java.io.InputStream;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwriter.DbWriter;import org.xmlBlaster.contrib.dbwriter.I_Parser;import org.xmlBlaster.contrib.dbwriter.I_Writer;import org.xmlBlaster.contrib.dbwriter.info.I_PrePostStatement;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlRow;import org.xmlBlaster.contrib.filewriter.FileWriterCallback;import org.xmlBlaster.contrib.replication.impl.ReplManagerPlugin;import org.xmlBlaster.contrib.replication.impl.SearchableConfig;import org.xmlBlaster.contrib.replication.impl.SpecificDefault;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.jms.XBMessage;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.xbformat.MsgInfo;import org.xmlBlaster.util.xbformat.XmlScriptParser;public class ReplicationWriter implements I_Writer, ReplicationConstants {   private final static String ME = "ReplicationWriter";   private static Logger log = Logger.getLogger(ReplicationWriter.class.getName());   protected I_DbPool pool;   protected I_Info info;   private I_DbSpecific dbSpecific;   I_Mapper mapper;   private boolean overwriteTables;   private boolean recreateTables;   private String importLocation;   private I_Update callback;   private boolean keepDumpFiles;   private boolean doDrop;   private boolean doCreate;   private boolean doAlter;   private boolean doStatement;   // private String sqlTopic;   private String schemaToWipeout;   private I_PrePostStatement prePostStatement;   private boolean hasInitialCmd;   private I_Parser parserForOldInUpdates;   private boolean nirvanaClient; // if true it does not store on the db (just consumes them   private Map sqlInfoCache;      private final static int SQL_INFO_CACHE_MAX_SIZE_DEFAULT = 5000;   private int sqlInfoCacheMaxSize;   private boolean exceptionInTransaction;   private Connection keptConnection; // we need to use the same connection within a transaction      public ReplicationWriter() {      this.sqlInfoCache = new HashMap();      this.sqlInfoCacheMaxSize = SQL_INFO_CACHE_MAX_SIZE_DEFAULT;   }      /**    * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys()    */   public Set getUsedPropertyKeys() {      Set set = new HashSet();      set.add(I_DbSpecific.NEEDS_PUBLISHER_KEY);      set.add("replication.mapper.class");      set.add("replication.overwriteTables");      set.add("replication.importLocation");      set.add("dbWriter.prePostStatement.class");      PropertiesInfo.addSet(set, this.mapper.getUsedPropertyKeys());      PropertiesInfo.addSet(set, this.pool.getUsedPropertyKeys());      PropertiesInfo.addSet(set, this.dbSpecific.getUsedPropertyKeys());      return set;   }   public void init(I_Info info) throws Exception {      log.info("init invoked");      this.info = info;      this.pool = (I_DbPool)info.getObject(DbWriter.DB_POOL_KEY);      if (this.pool == null)         throw new Exception(ME + ".init: the pool has not been configured, please check your '" + DbWriter.DB_POOL_KEY + "' configuration settings");      SearchableConfig searchableConfig = new SearchableConfig();      searchableConfig.init(this.info);      this.info.putObject(SearchableConfig.NAME, searchableConfig);            // this avoids the publisher to be instantiated (since we are on the slave side)      this.info.put(I_DbSpecific.NEEDS_PUBLISHER_KEY, "false");      boolean forceCreationAndInit = true;      this.dbSpecific = ReplicationConverter.getDbSpecific(this.info, forceCreationAndInit);       String mapperClass = info.get("replication.mapper.class", "org.xmlBlaster.contrib.replication.impl.DefaultMapper");      if (mapperClass.length() > 0) {         ClassLoader cl = ReplicationConverter.class.getClassLoader();         this.mapper = (I_Mapper)cl.loadClass(mapperClass).newInstance();         this.mapper.init(info);         if (log.isLoggable(Level.FINE))             log.fine(mapperClass + " created and initialized");      }      else         log.info("Couldn't initialize I_DataConverter, please configure 'converter.class' if you need a conversion.");      this.overwriteTables = this.info.getBoolean("replication.overwriteTables", true);      this.recreateTables =  this.info.getBoolean("replication.recreateTables", false);            this.importLocation = this.info.get("replication.importLocation", "${java.io.tmpdir}");      // clean from ending separators      if (this.importLocation.endsWith(File.separator) || this.importLocation.endsWith("/"))         this.importLocation = this.importLocation.substring(0, this.importLocation.length()-1);            String tmpImportLocation = this.info.get("replication.importLocationChunks", this.importLocation + "/chunks");      boolean overwriteDumpFiles = true;      String lockExtention =  null;      this.keepDumpFiles = info.getBoolean("replication.keepDumpFiles", false);      this.callback = new FileWriterCallback(this.importLocation, tmpImportLocation, lockExtention, overwriteDumpFiles, this.keepDumpFiles);      this.info = info;      this.pool = (I_DbPool)info.getObject(DbWriter.DB_POOL_KEY);      if (this.pool == null)         throw new Exception(ME + ".init: the pool has not been configured, please check your '" + DbWriter.DB_POOL_KEY + "' configuration settings");      this.doDrop = info.getBoolean("replication.drops", true);      this.doCreate = info.getBoolean("replication.creates", true);      this.doAlter = info.getBoolean("replication.alters", true);      this.doStatement = info.getBoolean("replication.statements", true);      this.schemaToWipeout = info.get("replication.writer.schemaToWipeout", null);      // if (this.doStatement)      //    this.sqlTopic = this.info.get("replication.sqlTopic", null);      String prePostStatementClass = this.info.get("dbWriter.prePostStatement.class", "");      if (prePostStatementClass.length() > 0) {         ClassLoader cl = ReplicationConverter.class.getClassLoader();         this.prePostStatement = (I_PrePostStatement)cl.loadClass(prePostStatementClass).newInstance();         this.prePostStatement.init(info);         if (log.isLoggable(Level.FINE))             log.fine(prePostStatementClass + " created and initialized");      }      String key = "replication.initialCmd";      String tmp = info.get(key, null);      this.hasInitialCmd = (tmp != null);      // log.info(GlobalInfo.dump(info));            if (tmp == null)         log.fine("The property '" + key  + "' was not found in the configuration");      else         log.fine("The property '" + key  + "' is '" + tmp + "' and hasInitialCmd is '" + this.hasInitialCmd + "'");            String parserClass = this.info.get("parser.class", "org.xmlBlaster.contrib.dbwriter.SqlInfoParser").trim();      if (parserClass.length() > 0) {         ClassLoader cl = this.getClass().getClassLoader();         this.parserForOldInUpdates = (I_Parser)cl.loadClass(parserClass).newInstance();         this.parserForOldInUpdates.init(info);         if (log.isLoggable(Level.FINE))             log.fine(parserClass + " created and initialized");      }      else         log.severe("Couldn't initialize I_Parser, please configure 'parser.class'");      this.nirvanaClient = this.info.getBoolean("replication.nirvanaClient", false);   }   public void shutdown() throws Exception {      if (this.prePostStatement != null) {         this.prePostStatement.shutdown();         this.prePostStatement = null;      }      if (this.dbSpecific != null) {         this.dbSpecific.shutdown();         this.dbSpecific = null;      }      if (this.pool != null) {         this.pool.shutdown();         this.pool = null;      }      if (this.mapper != null) {         this.mapper.shutdown();         this.mapper = null;      }      if (this.parserForOldInUpdates != null) {         this.parserForOldInUpdates.shutdown();         this.parserForOldInUpdates = null;      }   }   /**    * It first searches in the row and if nothing found it searches in the description. Both row and    * description are allowed to be null.    * @param key    * @param row    * @return    */   private String getStringAttribute(String key, SqlRow row, SqlDescription description) {      ClientProperty prop = null;      if (row != null)         prop = row.getAttribute(key);      if (prop == null && description != null) {         prop = description.getAttribute(key);      }      if (prop == null)         return null;      return prop.getStringValue();   }      private boolean isAllowedCommand(String command) {      if (command == null)         return false;      command = command.trim();      if (REPLICATION_CMD.equalsIgnoreCase(command) ||           ALTER_ACTION.equalsIgnoreCase(command) ||          CREATE_ACTION.equalsIgnoreCase(command) ||          DROP_ACTION.equalsIgnoreCase(command) ||          STATEMENT_ACTION.equalsIgnoreCase(command) ||          DUMP_ACTION.equalsIgnoreCase(command))         return true;      return false;   }      /**    * Returns the number of columns modified.    * @param originalCatalog    * @param originalSchema    * @param originalTable    * @param row if null, nothing is changed    * @return    * @throws Exception    */   private final int modifyColumnsIfNecessary(String originalCatalog, String originalSchema, String originalTable, SqlRow row) throws Exception {      if (this.mapper == null || row == null)         return 0;      String[] cols = row.getColumnNames();      Map colsToChange = new HashMap();      for (int i=0; i < cols.length; i++) {         String newCol = this.mapper.getMappedColumn(originalCatalog, originalSchema, originalTable, cols[i], cols[i]);         if (newCol == null)            continue;         if (cols[i].equalsIgnoreCase(newCol))            continue;         colsToChange.put(cols[i], newCol);         log.fine("Renaming '" + cols[i] + "' to '" + newCol + "'");      }      if (colsToChange.size() < 1)         return 0;      Iterator iter = colsToChange.entrySet().iterator();      while (iter.hasNext()) {         Map.Entry entry = (Map.Entry)iter.next();         row.renameColumn((String)entry.getKey(), (String)entry.getValue());      }      return colsToChange.size();   }   /**    * Checks wether an entry has already been processed, in which case it will not be processed anymore    * @param dbInfo    * @return    */   private boolean checkIfAlreadyProcessed(SqlInfo dbInfo) {      ClientProperty prop = dbInfo.getDescription().getAttribute(ReplicationConstants.ALREADY_PROCESSED_ATTR);      if (prop != null)         return true;      List rows = dbInfo.getRows();      for (int i=0; i < rows.size(); i++) {         SqlRow row = (SqlRow)rows.get(i);         prop = row.getAttribute(ReplicationConstants.ALREADY_PROCESSED_ATTR);         if (prop != null)            return true;      }      return false;   }         public void store(SqlInfo dbInfo) throws Exception {      if (checkIfAlreadyProcessed(dbInfo)) {         log.info("Entry '" + dbInfo.toString() + "' already processed, will ignore it");         return;      }      SqlDescription description = dbInfo.getDescription();      if (description == null) {         log.warning("store: The message was a dbInfo but lacked description. " + dbInfo.toString());         return;      }      boolean keepTransactionOpen = false;      ClientProperty keepOpenProp = description.getAttribute(ReplicationConstants.KEEP_TRANSACTION_OPEN);      if (keepOpenProp != null) {         keepTransactionOpen = keepOpenProp.getBooleanValue();         log.fine("Keep transaction open is '" + keepTransactionOpen + "'");      }      if (!keepTransactionOpen)         this.exceptionInTransaction = false; // make sure we reset it here, otherwise it will not store anything anymore      ClientProperty endOfTransition = description.getAttribute(ReplicationConstants.END_OF_TRANSITION);      if (endOfTransition != null && endOfTransition.getBooleanValue()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -