📄 initialupdater.java
字号:
/*------------------------------------------------------------------------------ Name: InitialUpdater.java Project: xmlBlaster.org Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file ------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication.impl;import java.io.BufferedInputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.InputStream;import java.sql.Connection;import java.sql.SQLException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.TextMessage;import org.xmlBlaster.util.ReplaceVariable;import org.xmlBlaster.client.I_ConnectionStateListener;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.contrib.ContribConstants;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_ContribPlugin;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.InfoHelper;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.VersionTransformerCache;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.replication.I_DbSpecific;import org.xmlBlaster.contrib.replication.I_ReplSource;import org.xmlBlaster.contrib.replication.ReplSourceEngine;import org.xmlBlaster.contrib.replication.ReplicationConstants;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.jms.XBDestination;import org.xmlBlaster.jms.XBMessage;import org.xmlBlaster.jms.XBMessageProducer;import org.xmlBlaster.jms.XBSession;import org.xmlBlaster.jms.XBStreamingMessage;import org.xmlBlaster.util.Execute;import org.xmlBlaster.util.I_ExecuteListener;import org.xmlBlaster.util.I_ReplaceContent;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;public class InitialUpdater implements I_Update, I_ContribPlugin, I_ConnectionStateListener, I_ReplaceContent, ReplicationConstants, I_ReplSource { public class ConnectionInfo { private Connection connection; private boolean committed; public ConnectionInfo(Connection conn) { this.connection = conn; } /** * @return Returns the connection. */ public Connection getConnection() { return connection; } /** * @return Returns the committed. */ public boolean isCommitted() { return committed; } /** * @param committed The committed to set. */ public synchronized void commit() { if (this.connection == null) return; try { this.connection.commit(); this.committed = true; } catch (SQLException ex) { ex.printStackTrace(); } } } class ExecuteListener implements I_ExecuteListener { StringBuffer errors = new StringBuffer(); long sleepTime = 0L; ConnectionInfo connInfo; final String stringToCheck; public ExecuteListener(String stringToCheck, ConnectionInfo connInfo) { this.stringToCheck = stringToCheck; this.connInfo = connInfo; } public void stderr(String data) { log.warning(data); // log.info(data); // this.errors.append(data).append("\n"); // sleep(this.sleepTime); // checkForCommit(data); } public void stdout(String data) { log.info(data); // sleep(this.sleepTime); // checkForCommit(data); } String getErrors() { return this.errors.toString(); } } class ExecutionThread extends Thread { private String replTopic; private I_DbSpecific dbSpecific; private String initialFilesLocation; private List slaveNamesList; private String replManagerAddress; private String version; /** * * @param replTopic The topic to use to publish the initial data * @param replManagerAddress The address to which to send the end-of-data message * @param dbSpecific * @param initialFilesLocation */ public ExecutionThread(String replTopic, String replManagerAddress, I_DbSpecific dbSpecific, String initialFilesLocation) { this.slaveNamesList = new ArrayList(); this.replManagerAddress = replManagerAddress; this.replTopic = replTopic; this.dbSpecific = dbSpecific; this.initialFilesLocation = initialFilesLocation; } /** * Adds a destination to this initial update (so that it is possible to perform several I.U. * with the same data. * * @param destination The destination (PtP) of this initial Update * @param slaveName The name of the slave * @param version The version for this update. * @return true if the entry has the correct version, false otherwise (in which case it * will not be added). */ public boolean add(String slaveName, String replManagerAddress, String version) { if (this.version == null) this.version = version; else { if (!this.version.equals(version)) { return false; } } if (this.replManagerAddress == null) this.replManagerAddress = replManagerAddress; else { if (!this.replManagerAddress.equals(replManagerAddress)) { return false; } } this.slaveNamesList.add(slaveName); return true; } public void process() { start(); } public void run() { String[] slaveNames = (String[])this.slaveNamesList.toArray(new String[this.slaveNamesList.size()]); try { this.dbSpecific.initiateUpdate(replTopic, this.replManagerAddress, slaveNames, this.version, this.initialFilesLocation); } catch (Exception ex) { log.severe("An Exception occured when running intial update for '" + replTopic + "' for '" + this.replManagerAddress + "' as slave '" + SpecificDefault.toString(slaveNames) + "'"); ex.printStackTrace(); } } }; private String CREATE_COUNTER_KEY = "_createCounter"; private static Logger log = Logger.getLogger(InitialUpdater.class.getName()); /** used to publish CREATE changes */ protected I_ChangePublisher publisher; protected I_Info info; private String initialCmd; private String initialCmdPre; private String initialCmdPath; private boolean keepDumpFiles; private String replPrefix; private I_DbSpecific dbSpecific; private String stringToCheck; private Map runningExecutes = new HashMap(); private String initialDataTopic; /** Contains updates to be executed where the key is the version */ private Map preparedUpdates = new HashMap(); private boolean collectInitialUpdates; private boolean initialDumpAsXml; private int initialDumpMaxSize = 1048576; private long initialCmdSleepDelay = 10L; private ReplSourceEngine replSourceEngine; /** * Not doing anything. */ public InitialUpdater(I_DbSpecific dbSpecific) { this.dbSpecific = dbSpecific; } /** * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys() */ public final Set getUsedPropertyKeys() { Set set = new HashSet(); set.add("replication.prefix"); set.add("maxRowsOnCreate"); if (this.publisher != null) PropertiesInfo.addSet(set, this.publisher.getUsedPropertyKeys()); return set; } public ConnectionInfo getConnectionInfo(Connection conn) { return new ConnectionInfo(conn); } /** * @see I_DbSpecific#init(I_Info) * */ public final void init(I_Info info) throws Exception { log.info("going to initialize the resources"); this.info = info; this.replPrefix = SpecificDefault.getReplPrefix(this.info);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -