📄 tabletowatchinfo.java
字号:
/*------------------------------------------------------------------------------Name: TableToWatchInfo.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.StringTokenizer;import java.util.TreeMap;import java.util.logging.Logger;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwriter.info.SqlColumn;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.util.StringPairTokenizer;/** * * TableToWatchInfo is a place holder (as an ejb) for data which is * stored in the ${replPrefix}tables table. * It also offers facility to retrieve the data from info objects. * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class TableToWatchInfo { private static Logger log = Logger.getLogger(TableToWatchInfo.class.getName()); public final static String ACTION_KEY = "actions"; public final static String TRIGGER_KEY = "trigger"; public final static String SEQUENCE_KEY = "sequence"; public final static String STATUS_CREATING = "CREATING"; public final static String STATUS_OK = "OK"; public final static String STATUS_REMOVE = "REMOVE"; private String catalog; private String schema; private String table; private String status; private long replKey = -1L; private String trigger; private long debug; /** * flags which are set mean the replication does happen for these flags. * For example 'IDU' means everything will be replicated: (I)nserts, (D)eletes, and (U)pdates. */ private String actions = ""; public final static String TABLE_PREFIX = "table"; public final static String SCHEMA_PREFIX = "schema"; public final static String KEY_SEP = "."; public final static String VAL_SEP = ","; public final static String EMPTY = " "; public final static String ALL_TOKEN = "*"; /** this is used as the prefix for all tables to replicate */ public final static String TABLE_PREFIX_WITH_SEP = TABLE_PREFIX + KEY_SEP; /** * Checks if there are foreign keys which are not resolved yet * @return true if all foreign keys are resolved or if there was no foreign key, false otherwise. */ private static boolean checkIfForeignKeysAreResolved(SqlDescription desc, Set setOfProcessedTables, Map allTables) throws Exception { SqlColumn[] cols = desc.getColumns(); for (int i=0; i < cols.length; i++) { SqlColumn col = cols[i]; if (col.isFk()) { String fkTable = col.getFkTable(); if (fkTable == null) throw new Exception("The column '" + cols[i].getTable() + "' has a column '" + cols[i].getColName() + "' which is a foreign key but no associated table name was found"); if (!setOfProcessedTables.contains(fkTable)) { if (!allTables.containsKey(fkTable)) log.warning("The column '" + cols[i].getTable() + "' has a column '" + cols[i].getColName() + "' which is a foreign key. It is associated with a table which is not replicated (remember to make sure that this table is available on the destination also."); else return false; } } } return true; } /** * Returns all table names for the given catalog and schema. It only returns tables (not views), * and it uses the MetaData of the connection. * * @param prefixToAdd the prefix to be added to the table names, if null nothing is added. * @param conn the connection to be used. * @param tableToWatch The tableToWatch object containing the name of the catalog and schema * @return a String[] containing the names of the tables. The name of the tables is the absolute name. * @throws Exception if an exception on the backend occurs. */ private final static String[] getTablesForSchema(String prefixToAdd, Connection conn, TableToWatchInfo tableToWatch) throws SQLException { if (tableToWatch == null) throw new SQLException("TableToWatchInfo.getTablesForSchema: table to watch is null"); String table = tableToWatch.getTable(); if (table != null) { table = table.trim(); if (table.length() > 0 && !table.equals(ALL_TOKEN)) log.warning("The table '" + table + "' should either be empty or '" + ALL_TOKEN + "' (we ignore it here but may be mis-configuration)"); } String catalog = tableToWatch.getCatalog(); if (catalog != null && catalog.trim().length() < 1) catalog = null; String schema = tableToWatch.getSchema(); if (schema != null && schema.trim().length() < 1) schema = null; ResultSet rs = null; ArrayList list = new ArrayList(); try { rs = conn.getMetaData().getTables(catalog, schema, null, new String[] {"TABLE"}); while (rs.next()) { String tableName = rs.getString("TABLE_NAME"); String completeTableName = ""; if (prefixToAdd != null) completeTableName += prefixToAdd; if (catalog != null) completeTableName += catalog + KEY_SEP; if (schema != null) completeTableName += schema + KEY_SEP; completeTableName += tableName; list.add(completeTableName); } } finally { if (rs != null) rs.close(); } return (String[])list.toArray(new String[list.size()]); } public static String getSortedTablesToWatch(Connection conn, I_Info info, List outputSequence) throws Exception { TableToWatchInfo[] tables = getTablesToWatch(conn, info); List nonExisting = new ArrayList(); List toProcess = new ArrayList(); Map tableMap = new HashMap(); for (int i=0; i < tables.length; i++) { try { SqlInfo sqlInfo = new SqlInfo(info); String catalog = tables[i].getCatalog(); String schema = tables[i].getSchema(); String tableName = tables[i].getTable(); tableMap.put(tableName, tables[i]); sqlInfo.fillMetadata(conn, catalog, schema, tableName, null, null); toProcess.add(sqlInfo.getDescription()); } catch (Throwable ex) { ex.printStackTrace(); nonExisting.add(tables[i]); } } Set set = new HashSet(); // better performance StringBuffer buf = new StringBuffer(); int sweepCount = 0; int maxSweepCount = toProcess.size(); if (maxSweepCount < 2) maxSweepCount = 2; while (toProcess.size() > 0) { int count = 0; SqlDescription[] sqlDesc = (SqlDescription[])toProcess.toArray(new SqlDescription[toProcess.size()]); for (int i=0; i < sqlDesc.length; i++) { if (checkIfForeignKeysAreResolved(sqlDesc[i], set, tableMap)) { String tableName = sqlDesc[i].getIdentity(); set.add(tableName); outputSequence.add(sqlDesc[i]); SqlDescription removed = (SqlDescription)toProcess.remove(i-count); count++; TableToWatchInfo tableToWatch = (TableToWatchInfo)tableMap.get(tableName); if (tableToWatch == null) throw new Exception("Table '" + tableToWatch + "' was not found in the list of tables to be processed"); tableToWatch.setReplKey((long)i); buf.append(" <attribute id='").append(tableToWatch.getConfigKey()).append("'>").append(tableToWatch.getConfigValue()).append("</attribute>\n"); if (!removed.getIdentity().equalsIgnoreCase(tableName)) throw new Exception("An inconsistency aroze when trying to determine the correct loading sequence for tables. Failed for *" + sqlDesc[i-count].toXml("") + "' but has removed '" + removed.toXml("")); } } sweepCount++; if (sweepCount >= maxSweepCount) { StringBuffer exBuf = new StringBuffer(); for (int i=0; i < toProcess.size(); i++) { SqlDescription desc = (SqlDescription)toProcess.get(i); exBuf.append(desc.getIdentity()).append(" "); } throw new Exception("Still entries to be processed after '" + sweepCount + "' sweeps. Still to be processed: '" + exBuf.toString() + "'"); } } if (nonExisting.size() > 0) { buf.append(" <!-- THE FOLLOWING ENTRIES WHERE NOT FOUND ON THE DATABASE. THEIR SEQUENCE CAN THEREFORE NOT BE GUARANTEED -->\n"); for (int i=0; i < nonExisting.size(); i++) { TableToWatchInfo tableToWatch = (TableToWatchInfo)nonExisting.get(i); if (tableToWatch == null) throw new Exception("Table '" + tableToWatch + "' was not found in the list of tables to be processed"); tableToWatch.setReplKey(-1L); buf.append(" <attribute id='").append(tableToWatch.getConfigKey()).append("'>").append(tableToWatch.getConfigValue()).append("</attribute>\n"); } } return buf.toString(); } /** * Gets an array containing all the tables to watch found in this configuration * info object. * * @param info * @return * @throws Exception */ public static TableToWatchInfo[] getTablesToWatch(Connection conn, I_Info originalInfo) throws Exception { synchronized (originalInfo) { Iterator iter = originalInfo.getKeys().iterator(); // prepare defaults defined with a '*' token I_Info ownInfo = new PropertiesInfo(new Properties()); while (iter.hasNext()) { String key = ((String)iter.next()).trim(); if (!key.startsWith(TABLE_PREFIX_WITH_SEP)) continue; String val = originalInfo.get(key, null); if (key.indexOf(ALL_TOKEN) < 0L) { ownInfo.put(key, val); continue; } TableToWatchInfo tableToWatch = new TableToWatchInfo(); tableToWatch.assignFromInfoPair(key, val); String[] tableNames = getTablesForSchema(TABLE_PREFIX_WITH_SEP, conn, tableToWatch); for (int i=0; i < tableNames.length; i++) ownInfo.put(tableNames[i], ""); } TreeMap map = new TreeMap(); int count = 0; iter = ownInfo.getKeys().iterator(); while (iter.hasNext()) { String key = ((String)iter.next()).trim(); if (!key.startsWith(TABLE_PREFIX_WITH_SEP)) continue; count++; String val = ownInfo.get(key, null); TableToWatchInfo tableToWatch = new TableToWatchInfo(); tableToWatch.assignFromInfoPair(key, val); Long mapKey = new Long(tableToWatch.getReplKey()); ArrayList list = (ArrayList)map.get(mapKey); if (list == null) { list = new ArrayList(); map.put(mapKey, list); } list.add(tableToWatch); } // handle here allt tables which have been assigned as default: for example // <attribute id='table.XMLBLASTER.*'></attribute> TableToWatchInfo[] tables = new TableToWatchInfo[count]; count = 0; iter = map.keySet().iterator(); while (iter.hasNext()) { Object mapKey = iter.next(); ArrayList list = (ArrayList)map.get(mapKey); for (int i=0; i < list.size(); i++) { TableToWatchInfo obj = (TableToWatchInfo)list.get(i); tables[count] = obj; count++; } } return tables; } } public TableToWatchInfo() { } public TableToWatchInfo(String catalog, String schema, String table) { this.catalog = catalog; this.schema = schema; this.table = table; if (this.catalog == null || this.catalog.trim().length() < 1) this.catalog = EMPTY; } /** * Parses the key and fills this object appropriately. The * syntax to be parsed is of the kind: <br/> * table.[${catalog}.][${schema}.]${table}<br/> * examples: * <ul> * <li>table.table1</li> * <li>table.schema1.table1</li> * <li>table.catalog1.schema1.table1</li> * </ul> * @param key * @throws Exception */ private final void parseKey(String key) throws Exception { if (key == null) throw new Exception("TableToWatchInfo.assignFromInfoPair: The key is empty, you must assign one"); key = key.trim(); if (!key.startsWith(TABLE_PREFIX_WITH_SEP)) throw new Exception("TableToWatchInfo.assignFromInfoPair: The key '" + key + "' does not start with '" + TABLE_PREFIX + "'"); key = key.substring(TABLE_PREFIX_WITH_SEP.length()); String tmp1 = null; String tmp2 = null; String tmp3 = null; StringTokenizer tokenizer = new StringTokenizer(key, KEY_SEP); if (tokenizer.hasMoreTokens()) { tmp1 = tokenizer.nextToken(); if (tokenizer.hasMoreTokens()) { tmp2 = tokenizer.nextToken(); if (tokenizer.hasMoreTokens()) tmp3 = tokenizer.nextToken(); } } if (tmp3 == null) { this.catalog = null; if (tmp2 == null) { this.schema = null; this.table = tmp1.trim(); } else { this.schema = tmp1.trim(); this.table = tmp2.trim(); } } else { this.catalog = tmp1.trim(); this.schema = tmp2.trim(); this.table = tmp3.trim(); } } public String getConfigKey() { StringBuffer buf = new StringBuffer(); buf.append("table").append(KEY_SEP); if (this.catalog != null && this.catalog.length() > 0) buf.append(this.catalog).append(KEY_SEP); if (this.schema != null && this.schema.length() > 0) buf.append(this.schema).append(KEY_SEP); else if (this.catalog != null && this.catalog.length() > 0) buf.append(KEY_SEP); buf.append(this.table); return buf.toString(); } public String getConfigValue() { StringBuffer buf = new StringBuffer(); boolean isFirst = true; if (this.actions != null && this.actions.length() > 0) { buf.append(ACTION_KEY).append("=").append(this.actions); isFirst = false; } if (this.trigger != null && this.trigger.length() > 0) { if (!isFirst) buf.append(VAL_SEP); buf.append(TRIGGER_KEY).append("=").append(this.trigger); isFirst = false; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -