📄 replicationwriter.java
字号:
/*------------------------------------------------------------------------------Name: ReplicationWriter.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.io.ByteArrayInputStream;import java.io.File;import java.io.FileInputStream;import java.io.InputStream;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwriter.DbWriter;import org.xmlBlaster.contrib.dbwriter.I_Parser;import org.xmlBlaster.contrib.dbwriter.I_Writer;import org.xmlBlaster.contrib.dbwriter.info.I_PrePostStatement;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlRow;import org.xmlBlaster.contrib.filewriter.FileWriterCallback;import org.xmlBlaster.contrib.replication.impl.ReplManagerPlugin;import org.xmlBlaster.contrib.replication.impl.SearchableConfig;import org.xmlBlaster.contrib.replication.impl.SpecificDefault;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.jms.XBMessage;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.xbformat.MsgInfo;import org.xmlBlaster.util.xbformat.XmlScriptParser;public class ReplicationWriter implements I_Writer, ReplicationConstants { private final static String ME = "ReplicationWriter"; private static Logger log = Logger.getLogger(ReplicationWriter.class.getName()); protected I_DbPool pool; protected I_Info info; private I_DbSpecific dbSpecific; I_Mapper mapper; private boolean overwriteTables; private boolean recreateTables; private String importLocation; private I_Update callback; private boolean keepDumpFiles; private boolean doDrop; private boolean doCreate; private boolean doAlter; private boolean doStatement; // private String sqlTopic; private String schemaToWipeout; private I_PrePostStatement prePostStatement; private boolean hasInitialCmd; private I_Parser parserForOldInUpdates; private boolean nirvanaClient; // if true it does not store on the db (just consumes them private Map sqlInfoCache; private final static int SQL_INFO_CACHE_MAX_SIZE_DEFAULT = 5000; private int sqlInfoCacheMaxSize; private boolean exceptionInTransaction; private Connection keptConnection; // we need to use the same connection within a transaction public ReplicationWriter() { this.sqlInfoCache = new HashMap(); this.sqlInfoCacheMaxSize = SQL_INFO_CACHE_MAX_SIZE_DEFAULT; } /** * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys() */ public Set getUsedPropertyKeys() { Set set = new HashSet(); set.add(I_DbSpecific.NEEDS_PUBLISHER_KEY); set.add("replication.mapper.class"); set.add("replication.overwriteTables"); set.add("replication.importLocation"); set.add("dbWriter.prePostStatement.class"); PropertiesInfo.addSet(set, this.mapper.getUsedPropertyKeys()); PropertiesInfo.addSet(set, this.pool.getUsedPropertyKeys()); PropertiesInfo.addSet(set, this.dbSpecific.getUsedPropertyKeys()); return set; } public void init(I_Info info) throws Exception { log.info("init invoked"); this.info = info; this.pool = (I_DbPool)info.getObject(DbWriter.DB_POOL_KEY); if (this.pool == null) throw new Exception(ME + ".init: the pool has not been configured, please check your '" + DbWriter.DB_POOL_KEY + "' configuration settings"); SearchableConfig searchableConfig = new SearchableConfig(); searchableConfig.init(this.info); this.info.putObject(SearchableConfig.NAME, searchableConfig); // this avoids the publisher to be instantiated (since we are on the slave side) this.info.put(I_DbSpecific.NEEDS_PUBLISHER_KEY, "false"); boolean forceCreationAndInit = true; this.dbSpecific = ReplicationConverter.getDbSpecific(this.info, forceCreationAndInit); String mapperClass = info.get("replication.mapper.class", "org.xmlBlaster.contrib.replication.impl.DefaultMapper"); if (mapperClass.length() > 0) { ClassLoader cl = ReplicationConverter.class.getClassLoader(); this.mapper = (I_Mapper)cl.loadClass(mapperClass).newInstance(); this.mapper.init(info); if (log.isLoggable(Level.FINE)) log.fine(mapperClass + " created and initialized"); } else log.info("Couldn't initialize I_DataConverter, please configure 'converter.class' if you need a conversion."); this.overwriteTables = this.info.getBoolean("replication.overwriteTables", true); this.recreateTables = this.info.getBoolean("replication.recreateTables", false); this.importLocation = this.info.get("replication.importLocation", "${java.io.tmpdir}"); // clean from ending separators if (this.importLocation.endsWith(File.separator) || this.importLocation.endsWith("/")) this.importLocation = this.importLocation.substring(0, this.importLocation.length()-1); String tmpImportLocation = this.info.get("replication.importLocationChunks", this.importLocation + "/chunks"); boolean overwriteDumpFiles = true; String lockExtention = null; this.keepDumpFiles = info.getBoolean("replication.keepDumpFiles", false); this.callback = new FileWriterCallback(this.importLocation, tmpImportLocation, lockExtention, overwriteDumpFiles, this.keepDumpFiles); this.info = info; this.pool = (I_DbPool)info.getObject(DbWriter.DB_POOL_KEY); if (this.pool == null) throw new Exception(ME + ".init: the pool has not been configured, please check your '" + DbWriter.DB_POOL_KEY + "' configuration settings"); this.doDrop = info.getBoolean("replication.drops", true); this.doCreate = info.getBoolean("replication.creates", true); this.doAlter = info.getBoolean("replication.alters", true); this.doStatement = info.getBoolean("replication.statements", true); this.schemaToWipeout = info.get("replication.writer.schemaToWipeout", null); // if (this.doStatement) // this.sqlTopic = this.info.get("replication.sqlTopic", null); String prePostStatementClass = this.info.get("dbWriter.prePostStatement.class", ""); if (prePostStatementClass.length() > 0) { ClassLoader cl = ReplicationConverter.class.getClassLoader(); this.prePostStatement = (I_PrePostStatement)cl.loadClass(prePostStatementClass).newInstance(); this.prePostStatement.init(info); if (log.isLoggable(Level.FINE)) log.fine(prePostStatementClass + " created and initialized"); } String key = "replication.initialCmd"; String tmp = info.get(key, null); this.hasInitialCmd = (tmp != null); // log.info(GlobalInfo.dump(info)); if (tmp == null) log.fine("The property '" + key + "' was not found in the configuration"); else log.fine("The property '" + key + "' is '" + tmp + "' and hasInitialCmd is '" + this.hasInitialCmd + "'"); String parserClass = this.info.get("parser.class", "org.xmlBlaster.contrib.dbwriter.SqlInfoParser").trim(); if (parserClass.length() > 0) { ClassLoader cl = this.getClass().getClassLoader(); this.parserForOldInUpdates = (I_Parser)cl.loadClass(parserClass).newInstance(); this.parserForOldInUpdates.init(info); if (log.isLoggable(Level.FINE)) log.fine(parserClass + " created and initialized"); } else log.severe("Couldn't initialize I_Parser, please configure 'parser.class'"); this.nirvanaClient = this.info.getBoolean("replication.nirvanaClient", false); } public void shutdown() throws Exception { if (this.prePostStatement != null) { this.prePostStatement.shutdown(); this.prePostStatement = null; } if (this.dbSpecific != null) { this.dbSpecific.shutdown(); this.dbSpecific = null; } if (this.pool != null) { this.pool.shutdown(); this.pool = null; } if (this.mapper != null) { this.mapper.shutdown(); this.mapper = null; } if (this.parserForOldInUpdates != null) { this.parserForOldInUpdates.shutdown(); this.parserForOldInUpdates = null; } } /** * It first searches in the row and if nothing found it searches in the description. Both row and * description are allowed to be null. * @param key * @param row * @return */ private String getStringAttribute(String key, SqlRow row, SqlDescription description) { ClientProperty prop = null; if (row != null) prop = row.getAttribute(key); if (prop == null && description != null) { prop = description.getAttribute(key); } if (prop == null) return null; return prop.getStringValue(); } private boolean isAllowedCommand(String command) { if (command == null) return false; command = command.trim(); if (REPLICATION_CMD.equalsIgnoreCase(command) || ALTER_ACTION.equalsIgnoreCase(command) || CREATE_ACTION.equalsIgnoreCase(command) || DROP_ACTION.equalsIgnoreCase(command) || STATEMENT_ACTION.equalsIgnoreCase(command) || DUMP_ACTION.equalsIgnoreCase(command)) return true; return false; } /** * Returns the number of columns modified. * @param originalCatalog * @param originalSchema * @param originalTable * @param row if null, nothing is changed * @return * @throws Exception */ private final int modifyColumnsIfNecessary(String originalCatalog, String originalSchema, String originalTable, SqlRow row) throws Exception { if (this.mapper == null || row == null) return 0; String[] cols = row.getColumnNames(); Map colsToChange = new HashMap(); for (int i=0; i < cols.length; i++) { String newCol = this.mapper.getMappedColumn(originalCatalog, originalSchema, originalTable, cols[i], cols[i]); if (newCol == null) continue; if (cols[i].equalsIgnoreCase(newCol)) continue; colsToChange.put(cols[i], newCol); log.fine("Renaming '" + cols[i] + "' to '" + newCol + "'"); } if (colsToChange.size() < 1) return 0; Iterator iter = colsToChange.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry)iter.next(); row.renameColumn((String)entry.getKey(), (String)entry.getValue()); } return colsToChange.size(); } /** * Checks wether an entry has already been processed, in which case it will not be processed anymore * @param dbInfo * @return */ private boolean checkIfAlreadyProcessed(SqlInfo dbInfo) { ClientProperty prop = dbInfo.getDescription().getAttribute(ReplicationConstants.ALREADY_PROCESSED_ATTR); if (prop != null) return true; List rows = dbInfo.getRows(); for (int i=0; i < rows.size(); i++) { SqlRow row = (SqlRow)rows.get(i); prop = row.getAttribute(ReplicationConstants.ALREADY_PROCESSED_ATTR); if (prop != null) return true; } return false; } public void store(SqlInfo dbInfo) throws Exception { if (checkIfAlreadyProcessed(dbInfo)) { log.info("Entry '" + dbInfo.toString() + "' already processed, will ignore it"); return; } SqlDescription description = dbInfo.getDescription(); if (description == null) { log.warning("store: The message was a dbInfo but lacked description. " + dbInfo.toString()); return; } boolean keepTransactionOpen = false; ClientProperty keepOpenProp = description.getAttribute(ReplicationConstants.KEEP_TRANSACTION_OPEN); if (keepOpenProp != null) { keepTransactionOpen = keepOpenProp.getBooleanValue(); log.fine("Keep transaction open is '" + keepTransactionOpen + "'"); } if (!keepTransactionOpen) this.exceptionInTransaction = false; // make sure we reset it here, otherwise it will not store anything anymore ClientProperty endOfTransition = description.getAttribute(ReplicationConstants.END_OF_TRANSITION); if (endOfTransition != null && endOfTransition.getBooleanValue()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -