📄 specificdefault.java
字号:
conn.setAutoCommit(true); try { if (triggerExists(conn, name)) { log.info("trigger '" + name + "' exists, will not create it"); return 0; } else { log.info("trigger '" + name + "' does not exist, will create it"); return 1; } //incrementReplKey(conn); //log.info("sequence '" + name + "' exists, will not create it"); //return 0; } catch (Exception ex) { log.info("trigger '" + name + "' does not exist (an exception occured), will create it"); return 1; } } catch (Exception ex) { conn = removeFromPool(conn, ROLLBACK_NO); throw ex; } finally { conn = releaseIntoPool(conn, COMMIT_NO); } } /** * Convenience method for nice output, also used to set the _destination property in the * Client properties of a message. * @param str * @return */ public static String toString(String[] str) { if (str == null) return ""; StringBuffer buf = new StringBuffer(); for (int i=0; i < str.length; i++) { if (i != 0) buf.append(","); buf.append(str[i]); } return buf.toString(); } /** * Reads the content to be executed from a file. * * @param conn The connection on which to operate. Must not be null. * @param method The method which uses this invocation (used for logging * purposes). * @param propKey The name (or key) of the property to retrieve. The content of * this property is the bootstrap file name * @param propDefault The default of the property. * @param force if force is true it will add it no matter what (overwrites * existing stuff), otherwise it will check for existence. * @throws Exception if an exception occurs when reading the bootstrap file. Note * that in case of an exception you need to erase the connection from the pool (if you * are using a pool) */ protected void updateFromFile(Connection conn, String method, String propKey, String propDefault, boolean doWarn, boolean force, Replacer repl) throws Exception { Statement st = null; String fileName = this.info.get(propKey, propDefault); final String FLUSH_SEPARATOR = "-- FLUSH"; final String CMD_SEPARATOR = "-- EOC"; List sqls = getContentFromClasspath(fileName, method, FLUSH_SEPARATOR, CMD_SEPARATOR); for (int i = 0; i < sqls.size(); i++) { String[] cmds = (String[]) sqls.get(i); String cmd = ""; try { st = conn.createStatement(); boolean doExecuteBatch = false; for (int j = 0; j < cmds.length; j++) { cmd = replaceTokens(cmds[j], repl); if (!force) { if (checkTableForCreation(cmd) == 0) continue; if (checkSequenceForCreation(cmd) == 0) continue; if (checkTriggerForCreation(cmd) == 0) continue; } if (cmd.trim().length() > 0) { doExecuteBatch = true; st.addBatch(cmd); } } if (doExecuteBatch) { st.executeBatch(); if (!conn.getAutoCommit()) conn.commit(); } } catch (SQLException ex) { if (doWarn /*|| log.isLoggable(Level.FINE)*/) { StringBuffer buf = new StringBuffer(); for (int j = 0; j < cmds.length; j++) buf.append(cmd).append("\n"); log.warning("operation:\n" + buf.toString() + "\n failed: " + ex.getMessage()); } if (conn != null && !conn.getAutoCommit()) conn.rollback(); } catch (Throwable ex) { StringBuffer buf = new StringBuffer(); for (int j = 0; j < cmds.length; j++) buf.append(cmd).append("\n"); log.severe("operation:\n" + buf.toString() + "\n failed: " + ex.getMessage()); if (conn != null && !conn.getAutoCommit()) conn.rollback(); if (ex instanceof Exception) throw (Exception)ex; else throw new Exception(ex); } finally { if (st != null) { st.close(); st = null; } } } } /** * @see I_DbSpecific#bootstrap(Connection). * In case of an exception you need to cleanup the connection yourself. */ public void bootstrap(Connection conn, boolean doWarn, boolean force) throws Exception { updateFromFile(conn, "bootstrap", "replication.bootstrapFile", "org/xmlBlaster/contrib/replication/setup/postgres/bootstrap.sql", doWarn, force, this.replacer); } /** * @see I_DbSpecific#cleanup(Connection). In case of an exception you need to cleanup * the connection yourself. */ public void cleanup(Connection conn, boolean doWarn) throws Exception { /* * This cleans up the triggers on the own schema by oracle. It is needed * since if there is an 'unclean' zombie trigger, then no operation is * possible anymore on the schema and cleanup will fail. */ removeTrigger(null, null, true); String replTables = this.dbMetaHelper.getIdentifier(this.replPrefix + "TABLES"); TableToWatchInfo[] tables = TableToWatchInfo.getAll(conn, replTables); HashSet set = new HashSet(); // to remember removed schema triggers for (int i=0; i < tables.length; i++) { String schema = tables[i].getSchema(); boolean doRemove = !set.contains(schema); set.add(schema); removeTableToWatch(tables[i], doRemove); } updateFromFile(conn, "cleanup", "replication.cleanupFile", "org/xmlBlaster/contrib/replication/setup/postgres/cleanup.sql", doWarn, true, this.replacer); } /** * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys() */ public final Set getUsedPropertyKeys() { Set set = new HashSet(); set.add("replication.prefix"); set.add("maxRowsOnCreate"); PropertiesInfo.addSet(set, this.dbPool.getUsedPropertyKeys()); PropertiesInfo.addSet(set, this.initialUpdater.getUsedPropertyKeys()); return set; } /** * Returns a name identifying this SpecificDefault. This is the replication.prefix. * @return */ public final String getName() { return this.replPrefix; } /** * @see I_DbSpecific#init(I_Info) * */ public synchronized void init(I_Info info) throws Exception { if (this.initCount > 0) { this.initCount++; return; } log.info("going to initialize the resources"); this.replaceVariable = new ReplaceVariable(); this.info = info; this.replPrefix = SpecificDefault.getReplPrefix(this.info); this.replVersion = this.info.get("replication.version", "0.0"); Map map = new HashMap(); map.put("replVersion", this.replVersion); map.put("replPrefix", this.replPrefix); map.put("charWidth", this.info.get("replication.charWidth", "50")); map.put("charWidthSmall", this.info.get("replication.charWidthSmall", "10")); this.replacer = new Replacer(this.info, map); this.initialUpdater = new InitialUpdater(this); this.initialUpdater.init(info); this.dbPool = DbWatcher.getDbPool(this.info); this.dbMetaHelper = new DbMetaHelper(this.dbPool); this.rowsPerMessage = this.info.getInt("replication.maxRowsOnCreate", 250); if (this.isDbWriteable) { Connection conn = this.dbPool.reserve(); try { // just to check that the configuration is OK (better soon than later) TableToWatchInfo.getTablesToWatch(conn, this.info); } catch (Exception ex) { log.severe("The syntax of one of the 'tables' attributes in the configuration is wrong. " + ex.getMessage()); throw ex; } finally { if (conn != null) this.dbPool.release(conn); } } if (this.isDbWriteable) { boolean needsPublisher = this.info.getBoolean(NEEDS_PUBLISHER_KEY, true); if (needsPublisher) { this.isInMaster = true; this.bootstrapWarnings = this.info.getBoolean("replication.bootstrapWarnings", false); doBootstrapIfNeeded(); } } this.initCount++; } protected String getOwnSchema() { return null; } /** * Checks the consistency of the triggers. If an entry is found in the TABLES table, and the * table does not exist, nothing is done. */ public void checkTriggerConsistency(boolean doFix) throws Exception { Connection conn = this.dbPool.reserve(); try { conn.setAutoCommit(true); TableToWatchInfo[] tables = TableToWatchInfo.getAll(conn, this.replPrefix + "TABLES"); for (int i=0; i < tables.length; i++) { if (!triggerExists(conn, tables[i])) { String txt = "Trigger '" + tables[i].getTrigger() + "' on table '" + tables[i].getTable() + "' does in fact not exist."; if (doFix) { // check first if the table really exists ResultSet rs = conn.getMetaData().getTables(null, null, tables[i].getTable(), null); try { if (!rs.next()) { log.info(txt + " and the table does not exist either. Will not do anything"); continue; } } finally { if (rs != null) rs.close(); } log.info(txt + " Will add it now"); tables[i].setStatus(TableToWatchInfo.STATUS_REMOVE); tables[i].storeStatus(this.replPrefix, this.dbPool); // addTrigger(conn, tables[i], null); String catalog = tables[i].getCatalog(); String schema = tables[i].getSchema(); String table = tables[i].getTable(); readNewTable(catalog, schema, table, null, false); } else log.info(txt); } } } catch (Exception ex) { conn = removeFromPool(conn, ROLLBACK_NO); throw ex; } finally { conn = releaseIntoPool(conn, COMMIT_NO); } } /** * Checks wheter a bootstrapping is needed. If it is needed it will do first * a complete cleanup and therafter a bootstrap. * The criteria to decide wether it is needed or not is if the table * ${replPrefix}tables exists or not. It it does not exist, then it will do a * bootstrap. * * @return * @throws Exception */ private final void doBootstrapIfNeeded() throws Exception { Connection conn = null; try { conn = this.dbPool.reserve(); conn.setAutoCommit(true); boolean noForce = false; bootstrap(conn, this.bootstrapWarnings, noForce); /* ResultSet rs = conn.getMetaData().getTables(null, null, this.dbMetaHelper.getIdentifier(this.replPrefix + "TABLES"), null); if (!rs.next()) { rs.close(); boolean noWarn = false; boolean noForce = false; boolean doWarn = true; log.warning("A BOOTSTRAP IS NEEDED SINCE THE TABLE '" + this.replPrefix + "TABLES' has not been found"); cleanup(conn, noWarn); bootstrap(conn, doWarn, noForce); return true; } else rs.close(); return false; */ } catch (Exception ex) { conn = removeFromPool(conn, ROLLBACK_NO); throw ex; } finally { conn = releaseIntoPool(conn, COMMIT_NO); } } /** * @see I_DbSpecific#shutdown() */ public final synchronized void shutdown() throws Exception { this.initCount--; if (this.initCount > 0) return; try { log.info("going to shutdown: cleaning up resources"); // registering this instance to the Replication Manager this.initialUpdater.shutdown(); this.initialUpdater = null; } catch (Throwable e) { e.printStackTrace(); log.warning(e.toString()); } if (this.dbPool != null) { this.dbPool.shutdown(); } } /** * Increments and retreives the ${replPrefix}key sequence counter. The connection * must not be null. * * Description of sequences for oracle: * http://www.lc.leidenuniv.nl/awcourse/oracle/server.920/a96540/statements_615a.htm#2067095 * * * @param conn * @return * @throws Exception * @see I_DbSpecific#incrementReplKey(Connection) * */ public long incrementReplKey(Connection conn) throws Exception { if (conn == null) throw new Exception(
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -