📄 replicationconverter.java
字号:
/*------------------------------------------------------------------------------Name: ReplicationConverter.javaProject: org.xmlBlasterProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.util.logging.Level;import java.util.logging.Logger;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.StringTokenizer;import java.io.OutputStream;import java.sql.Clob;import java.sql.Connection;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.db.DbInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwatcher.ChangeEvent;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.dbwatcher.convert.I_AttributeTransformer;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlRow;import org.xmlBlaster.contrib.replication.impl.SpecificDefault;import org.xmlBlaster.util.ReplaceVariable;/** * Creates a standardized XML dump from the given ResultSets. * Note this class is not thread safe, in other words you must make sure the same instance of * this class can not be invoked concurently from more than one thread. * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter * @see org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter * @author Michele Laghi */public class ReplicationConverter implements I_DataConverter, ReplicationConstants { private static Logger log = Logger.getLogger(ReplicationConverter.class.getName()); private I_DbSpecific dbSpecific; private SqlInfo sqlInfo; private I_Info info; private I_AttributeTransformer transformer; private OutputStream out; private boolean sendInitialTableContent = true; private long oldReplKey = -1L; private I_Info persistentInfo; private String oldReplKeyPropertyName; private ChangeEvent event; private String transactionId; /** All transactions in this message (needed to delete entries after publishing) */ private List allTransactions; private String replPrefix; private I_DbPool dbPool; private String transSeqPropertyName; private long transSeq; private String messageSeqPropertyName; private long messageSeq; private long newReplKey; private boolean sendUnchangedUpdates = true; /** * Default constructor, you need to call <tt>init(info)</tt> thereafter. */ public ReplicationConverter() { this.allTransactions = new ArrayList(); } /** * Create this plugin. * @param info Possible configuration parameters you find in the class description * @throws Exception If transformer instantiation fails */ public ReplicationConverter(I_Info info) throws Exception { this(); init(info); } /** * This method creates every time a new instance * @param info * @param forceNewIfNeeded if true and the entry is not found in the registry, a new object is created, initialized and added * to the registry, otherwise it only returns entries found in the registry (without initializing the object) or null if none is found. * @return * @throws Exception */ public static I_DbSpecific getDbSpecific(I_Info info, boolean forceNewIfNeeded) throws Exception { String dbSpecificClass = info.get("replication.dbSpecific.class", "org.xmlBlaster.contrib.replication.impl.SpecificOracle"); I_DbSpecific dbSpecific = (I_DbSpecific)info.getObject(dbSpecificClass + ".object"); if (!forceNewIfNeeded) return dbSpecific; if (dbSpecific == null) { if (dbSpecificClass.length() > 0) { ClassLoader cl = ReplicationConverter.class.getClassLoader(); dbSpecific = (I_DbSpecific)cl.loadClass(dbSpecificClass).newInstance(); if (log.isLoggable(Level.FINE)) log.fine(dbSpecificClass + " created and initialized"); info.putObject(dbSpecificClass + ".object" ,dbSpecific); } else log.info("Couldn't initialize I_DataConverter, please configure 'converter.class' if you need a conversion."); } dbSpecific.init(info); return dbSpecific; } /** * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter#init(I_Info) */ public synchronized void init(I_Info info) throws Exception { this.info = info; String propsToConnect = this.info.get(DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT, null); if (propsToConnect == null) { // String val = REPL_PREFIX_KEY + "," + REPL_VERSION; String val = "*"; log.info("Adding the property '" + DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT + "' with value '" + val + "'"); this.info.put(DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT, val); } else { Set set = getKeys(propsToConnect, ","); if (!set.contains(REPL_PREFIX_KEY)) { String txt = "the property '" + DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT + "' has been explicitly set to '" + propsToConnect + "' but it must contain '" + REPL_PREFIX_KEY + "'"; throw new Exception(txt); } if (!set.contains(REPL_VERSION)) { String txt = "the property '" + DbWatcherConstants.MOM_PROPS_TO_ADD_TO_CONNECT + "' has been explicitly set to '" + propsToConnect + "' but it must contain '" + REPL_VERSION + "'"; throw new Exception(txt); } } ClassLoader cl = this.getClass().getClassLoader(); String transformerClassName = info.get("transformer.class", ""); if (transformerClassName != null && transformerClassName.length() > 0) { this.transformer = (I_AttributeTransformer)cl.loadClass(transformerClassName).newInstance(); this.transformer.init(info); log.info("Loaded transformer pluging '" + transformerClassName + "'"); } this.replPrefix = SpecificDefault.getReplPrefix(this.info); boolean forceCreationAndInit = true; this.dbSpecific = getDbSpecific(info, forceCreationAndInit); this.dbSpecific.setAttributeTransformer(this.transformer); final boolean doFix = true; this.dbSpecific.checkTriggerConsistency(doFix); this.sendInitialTableContent = this.info.getBoolean("replication.sendInitialTableContent", true); // this.persistentMap = new PersistentMap(CONTRIB_PERSISTENT_MAP); // this.persistentMap = new Info(CONTRIB_PERSISTENT_MAP); this.dbPool = DbWatcher.getDbPool(this.info); this.persistentInfo = new DbInfo(DbWatcher.getDbInfoPool(this.info), "replication", this.info); // we now recreate the triggers if the version has changed String oldVersionName = this.dbSpecific.getName() + ".previousVersion"; String oldVersion = this.persistentInfo.get(oldVersionName, null); String currentVersion = this.info.get("replication.version", "0.0"); if (oldVersion == null || !currentVersion.equals(oldVersion)) this.persistentInfo.put(oldVersionName, currentVersion); boolean versionHasChanged = false; if (oldVersion != null && !currentVersion.equals(oldVersion)) versionHasChanged = true; if (versionHasChanged) { final boolean force = true; final boolean forceSend = false; this.dbSpecific.addTriggersIfNeeded(force, null, forceSend); } // end of recreating the triggers (if neeed) this.oldReplKeyPropertyName = this.dbSpecific.getName() + ".oldReplKey"; this.transSeqPropertyName = this.dbSpecific.getName() + ".transactionSequence"; this.messageSeqPropertyName = this.dbSpecific.getName() + ".messageSequence"; this.transSeq = this.persistentInfo.getLong(this.transSeqPropertyName, 0L); this.messageSeq = this.persistentInfo.getLong(this.messageSeqPropertyName, 0L); this.info.put(MESSAGE_SEQ, "" + this.messageSeq); this.sendUnchangedUpdates = this.info.getBoolean(REPLICATION_SEND_UNCHANGED_UPDATES, true); long tmp = this.persistentInfo.getLong(this.oldReplKeyPropertyName, -1L); if (tmp > -1L) { this.oldReplKey = tmp; log.info("One entry found in persistent map '" + CONTRIB_PERSISTENT_MAP + "' with key '" + this.oldReplKeyPropertyName + "' found. Will start with '" + this.oldReplKey + "'"); // the following to fix the situation where the peristent DBINFO has not been cleaned up but the sequence has been reset Connection conn = null; try { conn = this.dbPool.reserve(); conn.setAutoCommit(true); long realCounter = this.dbSpecific.incrementReplKey(conn); if (this.oldReplKey > realCounter) { log.warning("The counter from persistence is '" + this.oldReplKey + "' while the sequence is '" + realCounter + "' which is lower. Supposly you have reset the sequence but not the persistence. This is now fixed by initially setting the old replKey to be '" + realCounter + "'"); this.oldReplKey = realCounter; } } catch (Throwable ex) { log.severe("An exception occured when verifying the intial status of the counter against the old replKeY: " + ex.getMessage()); ex.printStackTrace(); SpecificDefault.removeFromPool(conn, SpecificDefault.ROLLBACK_NO, this.dbPool); conn = null; } finally { SpecificDefault.releaseIntoPool(conn, SpecificDefault.COMMIT_NO, this.dbPool); } } else { log.info("No entry found in persistent map '" + CONTRIB_PERSISTENT_MAP + "' with key '" + this.oldReplKeyPropertyName + "' found. Starting by 0'"); this.oldReplKey = 0L; } } private static Set getKeys(String val, String sep) { Set ret = new HashSet(); if (val == null || val.trim().length() < 1) return ret; StringTokenizer tokenizer = new StringTokenizer(val, sep); while (tokenizer.hasMoreTokens()) ret.add(tokenizer.nextToken().trim()); return ret; } /** * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter#shutdown */ public synchronized void shutdown() throws Exception { try { if (this.dbSpecific != null) this.dbSpecific.shutdown(); } finally { this.dbPool.shutdown(); this.dbSpecific = null; } } /** * Add another result set to the XML string. * This method is invoked for each SQL Operation. Each transaction, i.e. each message * can contain several such operations. * * @see org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter#addInfo(ResultSet, int) */ public void addInfo(Connection conn, ResultSet rs, int what) throws Exception { if (rs == null) throw new IllegalArgumentException("ReplicationConverter: Given ResultSet is null"); if (this.dbSpecific.isDatasourceReadonly()) { this.sqlInfo.fillOneRowWithObjects(rs, this.transformer); return; } ResultSetMetaData meta = rs.getMetaData(); int numberOfColumns = meta.getColumnCount(); if (numberOfColumns != 11)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -