📄 dbwatcher.java
字号:
/*------------------------------------------------------------------------------Name: TestResultSetToXmlConverter.javaProject: org.xmlBlasterProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher;import java.sql.ResultSet;import java.io.ByteArrayOutputStream;import java.io.BufferedOutputStream;import java.util.logging.Logger;import java.util.logging.Level;import java.util.Map;import java.util.Properties;import java.util.StringTokenizer;import java.sql.Connection;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.InfoHelper;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.db.I_ResultCb;import org.xmlBlaster.contrib.dbwatcher.convert.I_DataConverter;import org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer;import org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector;/** * This is the core processor class to handle database observation. * <p> * We handle detected changes of the observed target, typically a database table, * and forward them to the {@link I_ChangePublisher} implementation, * typically a message oriented middleware (MoM). * The changes are XML formatted as implemented by the * {@link I_DataConverter} plugin. * <p /> * <p> * This class loads all plugins by reflection and starts them. Each plugin * has its specific configuration paramaters. Descriptions thereof you find * in the plugins documentation. * </p> * <p> * To get you quickly going have a look into <tt>Example.java</tt> * </p> * Configuration: * <ul> * <li><tt>dbPool.class</tt> configures your implementation of interface {@link I_DbPool} which defaults to <tt>org.xmlBlaster.contrib.db.DbPool</tt></li> * <li><tt>db.queryMeatStatement</tt> if given a SQL select string this * is executed on changes and the query result is dumped according * to the configured I_DataConverter plugin and send as message content * to the MoM</li> * <li><tt>converter.class</tt> configures your implementation of interface {@link I_DataConverter} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter</tt></li> * <li><tt>mom.class</tt> configures your implementation of interface {@link I_ChangePublisher} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher</tt></li> * <li><tt>changeDetector.class</tt> configures your implementation of interface {@link I_ChangeDetector} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector</tt></li> * <li><tt>alertProducer.class</tt> configures your implementation of interface {@link I_AlertProducer} which defaults to <tt>org.xmlBlaster.contrib.dbwatcher.detector.AlertScheduler</tt> * Here you can configure multiple classes with a comma separated list. * Each of them can trigger an new check in parallel, for example * <tt>alertProducer.class=org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher,org.xmlBlaster.contrib.dbwatcher.detector.AlertSchedulery</tt> will check regularly via * the AlertScheduler and on message via XmlBlasterPublisher</li>. * </ul> * * @see org.xmlBlaster.contrib.dbwatcher.test.TestResultSetToXmlConverter * @author Marcel Ruff */public class DbWatcher implements I_ChangeListener { private static Logger log = Logger.getLogger(DbWatcher.class.getName()); private String queryMeatStatement; private I_Info info; private I_DataConverter dataConverter; private I_ChangePublisher publisher; private I_ChangeDetector changeDetector; private I_DbPool dbPool; private I_AlertProducer[] alertProducerArr; private int changeCount; private int collectedMessages = 1; private int maxCollectedMessages; /** * Default constructor, you need to call {@link #init} thereafter. */ public DbWatcher() { // void } /** * Convenience constructor, creates a processor for changes, calls {@link #init}. * @param info Configuration * @throws Exception Can be of any type */ public DbWatcher(I_Info info) throws Exception { init(info); } public static I_ChangePublisher getChangePublisher(I_Info info) throws Exception { synchronized (info) { I_ChangePublisher publisher = (I_ChangePublisher)info.getObject("mom.publisher"); if (publisher == null) { ClassLoader cl = DbWatcher.class.getClassLoader(); String momClass = info.get("mom.class", "org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher").trim(); if (momClass.length() > 0) { publisher = (I_ChangePublisher)cl.loadClass(momClass).newInstance(); info.putObject("mom.publisher", publisher); if (log.isLoggable(Level.FINE)) log.fine(momClass + " created and initialized"); } else log.severe("Couldn't initialize I_ChangePublisher, please configure 'mom.class'."); } publisher.init(info); return publisher; } } public static I_DbPool getDbPool(I_Info info) throws Exception { synchronized (info) { I_DbPool dbPool = (I_DbPool)info.getObject("db.pool"); if (dbPool == null) { ClassLoader cl = DbWatcher.class.getClassLoader(); String dbPoolClass = info.get("dbPool.class", "org.xmlBlaster.contrib.db.DbPool"); if (dbPoolClass.length() > 0) { dbPool = (I_DbPool)cl.loadClass(dbPoolClass).newInstance(); if (log.isLoggable(Level.FINE)) log.fine(dbPoolClass + " created and initialized"); } else throw new IllegalArgumentException("Couldn't initialize I_DbPool, please configure 'dbPool.class' to provide a valid JDBC access."); info.putObject("db.pool", dbPool); } dbPool.init(info); return dbPool; } } /** * We scan for dbinfo.* properties and replace it with db.* in a second info object to use for the configuration * of the DbPool. Defaults to getDbPool if no 'dbinfo.url' is specified in the configuration. * @param info * @return * @throws Exception */ public static I_DbPool getDbInfoPool(I_Info info) throws Exception { I_Info infoForDbInfo = (I_Info)info.getObject("infoForDbInfo"); if (infoForDbInfo == null) { Map dbInfoMap = InfoHelper.getPropertiesStartingWith("dbinfo.", info, null, "db."); if (dbInfoMap.containsKey("db.url")) { infoForDbInfo = new PropertiesInfo(new Properties()); InfoHelper.fillInfoWithEntriesFromMap(infoForDbInfo, dbInfoMap); info.putObject("infoForDbInfo", infoForDbInfo); } } if (infoForDbInfo != null) return getDbPool(infoForDbInfo); return getDbPool(info); } /** * Creates a processor for changes. * The alert producers need to be started later with a call to * {@link #startAlertProducers} * @param info Configuration * @throws Exception Can be of any type */ public void init(I_Info info) throws Exception { if (info == null) throw new IllegalArgumentException("Missing configuration, info is null"); this.info = info; //this.dataConverter = converter; //this.publisher = publisher; ClassLoader cl = this.getClass().getClassLoader(); this.queryMeatStatement = info.get("db.queryMeatStatement", (String)null); if (this.queryMeatStatement != null && this.queryMeatStatement.length() < 1) this.queryMeatStatement = null; if (this.queryMeatStatement != null) this.dbPool = getDbPool(this.info); // Now we load all plugins to do the job this.maxCollectedMessages = this.info.getInt("dbWatcher.maxCollectedStatements", 0); String converterClass = this.info.get("converter.class", "org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter").trim(); String changeDetectorClass = this.info.get("changeDetector.class", "org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector").trim(); String alerSchedulerClasses = this.info.get("alertProducer.class", "org.xmlBlaster.contrib.dbwatcher.detector.AlertScheduler").trim(); // comma separated list if (converterClass.length() > 0) { this.dataConverter = (I_DataConverter)cl.loadClass(converterClass).newInstance(); this.dataConverter.init(info); if (log.isLoggable(Level.FINE)) log.fine(converterClass + " created and initialized"); } else log.info("Couldn't initialize I_DataConverter, please configure 'converter.class' if you need a conversion."); // this must be invoked after the converter class has been initialized since the converter class can set properties on the info object // for example to register applications such as replication (for example the purpose) this.publisher = getChangePublisher(this.info); if (changeDetectorClass.length() > 0) { this.changeDetector = (I_ChangeDetector)cl.loadClass(changeDetectorClass).newInstance(); this.changeDetector.init(info, this, this.dataConverter); if (log.isLoggable(Level.FINE)) log.fine(changeDetectorClass + " created and initialized"); } else log.severe("Couldn't initialize I_ChangeDetector, please configure 'changeDetector.class'."); StringTokenizer st = new StringTokenizer(alerSchedulerClasses, ":, "); int countPlugins = st.countTokens(); this.alertProducerArr = new I_AlertProducer[countPlugins]; for (int i = 0; i < countPlugins; i++) { String clazz = st.nextToken().trim(); try { Object o = info.getObject(clazz); if (o != null) { this.alertProducerArr[i] = (I_AlertProducer)o; if (log.isLoggable(Level.FINE)) log.fine("Existing AlertProducer '" + this.alertProducerArr[i] + "' reused."); } else { this.alertProducerArr[i] = (I_AlertProducer)cl.loadClass(clazz).newInstance(); if (log.isLoggable(Level.FINE)) log.fine("AlertProducer '" + this.alertProducerArr[i] + "' created."); } this.alertProducerArr[i].init(info, this.changeDetector); //this.alertProducerArr[i].startProducing(); } catch (Throwable e) { this.alertProducerArr[i] = null; log.severe("Couldn't initialize I_AlertProducer '" + clazz + "', the reason is: " + e.toString()); } } if (countPlugins == 0) { log.warning("No AlertProducers are registered, set 'alertProducer.class' to point to your plugin class name"); } if (log.isLoggable(Level.FINE)) log.fine("DbWatcher created"); } /** * Start the work. */ public void startAlertProducers() { for (int i=0; i<this.alertProducerArr.length; i++) { try { this.alertProducerArr[i].startProducing(); } catch(Throwable e) { log.warning(e.toString()); } } log.info("DbWatcher is running"); } /** * Suspend processing. */ public void stopAlertProducers() { for (int i=0; i<this.alertProducerArr.length; i++) { try { this.alertProducerArr[i].shutdown(); } catch(Throwable e) { log.warning(e.toString()); } } } /** * Access the MoM handele. * @return The I_ChangePublisher plugin */ public I_ChangePublisher getMom() { return this.publisher; } /** * Access the change detector handele. * @return The I_ChangeDetector plugin */ public I_ChangeDetector getChangeDetector() { return this.changeDetector; } /** * @see org.xmlBlaster.contrib.dbwatcher.I_ChangeListener#hasChanged(ChangeEvent) */ public void hasChanged(final ChangeEvent changeEvent) { hasChanged(changeEvent, false); } private final void clearMessageAttributesAfterPublish(Map attributeMap) { // we need to remove this since if several transactions are detected in the same // detector sweep the same Event is used, so we would not send subsequent messages attributeMap.remove(I_DataConverter.IGNORE_MESSAGE); attributeMap.remove(DbWatcherConstants._UNCOMPRESSED_SIZE); attributeMap.remove(DbWatcherConstants._COMPRESSION_TYPE); } /** * Publishes a message if it has changed and if it has not to be ignored. * @param changeEvent The event data
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -