📄 specificdefault.java
字号:
"SpecificDefault.incrementReplKey: the DB connection is null"); CallableStatement st = null; try { st = conn.prepareCall("{? = call " + this.replPrefix + "increment()}"); st.registerOutParameter(1, Types.INTEGER); st.executeQuery(); long ret = st.getLong(1); return ret; } finally { try { if (st != null) st.close(); } catch (Exception ex) { } } } /** * Adds a trigger. * * @param conn * @param tableToWatch * @param sqlInfo * @throws Exception */ private final void addTrigger(Connection conn, TableToWatchInfo tableToWatch, SqlInfo sqlInfo, boolean force) throws Exception { Statement st = null; String table = tableToWatch.getTable(); try { if (!tableToWatch.getStatus().equals(TableToWatchInfo.STATUS_OK) || force) { String createString = createTableTrigger(sqlInfo.getDescription(), tableToWatch); if (createString != null && createString.length() > 1) { log.info("adding triggers to '" + table + "':\n\n" + createString); st = conn.createStatement(); st.executeUpdate(createString); st.close(); } tableToWatch.setStatus(TableToWatchInfo.STATUS_OK); tableToWatch.storeStatus(this.replPrefix, this.dbPool); } } finally { if (st != null) st.close(); } } /** * @see org.xmlBlaster.contrib.replication.I_DbSpecific#addTrigger(java.sql.Connection, java.lang.String, java.lang.String, java.lang.String, org.xmlBlaster.contrib.dbwriter.info.SqlInfo) */ public void addTrigger(Connection conn, String catalog, String schema, String tableName) throws Exception { TableToWatchInfo tableToWatch = getTableToWatch(conn, catalog, schema, tableName); SqlInfo sqlInfo = new SqlInfo(this.info); if (sqlInfo.fillMetadata(conn, catalog, schema, tableName, null, null)) { final boolean force = true; addTrigger(conn, tableToWatch, sqlInfo, force); } else log.warning("The table='" + tableName + "' on schema='" + schema + "' and catalog='" + catalog + "' has not been found"); } /** * @see I_DbSpecific#readNewTable(String, String, String, Map) */ public final void readNewTable(String catalog, String schema, String table, Map attrs, boolean sendInitialContents) throws Exception { Connection conn = this.dbPool.reserve(); int oldTransIsolation = 0; boolean oldTransIsolationKnown = false; try { conn.setAutoCommit(true); oldTransIsolation = conn.getTransactionIsolation(); oldTransIsolationKnown = true; SqlInfo sqlInfo = new SqlInfo(this.info); if (catalog != null) catalog = this.dbMetaHelper.getIdentifier(catalog); if (schema != null) schema = this.dbMetaHelper.getIdentifier(schema); table = this.dbMetaHelper.getIdentifier(table); sqlInfo.fillMetadata(conn, catalog, schema, table, null, null); SqlDescription description = sqlInfo.getDescription(); description.addAttributes(attrs); // check if function and trigger are necessary (they are only if the // table has to be replicated. // it does not need this if the table only needs an initial synchronization. if (this.isDbWriteable) { TableToWatchInfo tableToWatch = getTableToWatch(conn, catalog, schema, table); if (tableToWatch != null) { boolean addTrigger = tableToWatch.isReplicate(); if (addTrigger) { // create the function and trigger here addTrigger(conn, tableToWatch, sqlInfo, false); } else log.info("trigger will not be added since entry '" + tableToWatch.toXml() + "' will not be replicated"); } else { log.info("table to watch '" + table + "' not found"); } } conn.commit(); // just to make oracle happy for the next set transaction conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); boolean autoCommit = false; conn.setAutoCommit(autoCommit); // retrieve the Sequence number here ... long newReplKey = incrementReplKey(conn); // publish the structure of the table (need to be done here since we // must retreive repl key after having added the trigger) if (sendInitialContents) { String destination = null; if (attrs != null) destination = (String)attrs.get("_destination"); this.initialUpdater.publishCreate(0, sqlInfo, newReplKey, destination); if (schema != null) table = schema + "." + table; String sql = new String("SELECT * FROM " + table); I_ResultCb resultHandler = new RsToSqlInfo(this.initialUpdater, sqlInfo, this.cancelledUpdates, this.transformer, newReplKey, this.rowsPerMessage, destination); this.dbPool.select(conn, sql, autoCommit, resultHandler); } conn.commit(); } catch (Exception ex) { removeFromPool(conn, ROLLBACK_YES); throw ex; } finally { if (conn != null) { if (oldTransIsolationKnown) { try { conn.setAutoCommit(true); conn.setTransactionIsolation(oldTransIsolation); } catch (Exception e) { e.printStackTrace(); } } conn = releaseIntoPool(conn, COMMIT_NO); } } } public void forceTableChangeCheck() throws Exception { Connection conn = null; CallableStatement st = null; try { String sql = "{? = call " + this.replPrefix + "check_structure()}"; conn = this.dbPool.reserve(); conn.setAutoCommit(true); st = conn.prepareCall(sql); st.registerOutParameter(1, Types.VARCHAR); st.executeQuery(); } catch (Exception ex) { conn = removeFromPool(conn, ROLLBACK_NO); } finally { try { if (st != null) st.close(); } catch (Exception ex) { ex.printStackTrace(); } conn = releaseIntoPool(conn, COMMIT_NO); } } /** * To use this method the arguments must already have been cleaned. * @param schema can not be null (use ' ' for null) * @return true if the table is found among the registered tables, false if not. * @throws SQLException */ private final boolean isSchemaRegistered(Connection conn, String schema) throws SQLException { Statement st = null; try { // check wether the item already exists, if it exists return false String sql = "SELECT * FROM " + this.replPrefix + "tables WHERE schemaname='" + schema + "'"; st = conn.createStatement(); ResultSet rs = st.executeQuery(sql); return rs.next(); } finally { if (st != null) { try { st.close(); } catch (SQLException ex) {ex.printStackTrace();} } } } private final TableToWatchInfo getTableToWatch(Connection conn, String catalog, String schema, String tableName) throws Exception { final String TABLES_TABLE = this.dbMetaHelper.getIdentifier(this.replPrefix + "TABLES"); return TableToWatchInfo.get(conn, TABLES_TABLE, catalog, schema, tableName, null); } /** * @see I_DbSpecific#addTableToWatch(String, String, String, String, String, boolean, String, boolean) */ public final boolean addTableToWatch(TableToWatchInfo firstTableToWatch, boolean force, String[] destinations, boolean forceSend) throws Exception { String catalog = firstTableToWatch.getCatalog(); String schema = firstTableToWatch.getSchema(); String tableName = firstTableToWatch.getTable(); String actions = firstTableToWatch.getActions(); String triggerName = firstTableToWatch.getTrigger(); if (catalog != null && catalog.trim().length() > 0) catalog = this.dbMetaHelper.getIdentifier(catalog); else catalog = " "; if (schema != null && schema.trim().length() > 0) schema = this.dbMetaHelper.getIdentifier(schema); else schema = " "; tableName = this.dbMetaHelper.getIdentifier(tableName); Connection conn = null; log.info("Checking for addition of '" + tableName + "'"); try { conn = this.dbPool.reserve(); conn.setAutoCommit(false); long tmp = this.incrementReplKey(conn); if (!isSchemaRegistered(conn, schema)) { log.info("schema '" + schema + "' is not registered, going to add it"); addSchemaToWatch(conn, catalog, schema); } TableToWatchInfo tableToWatch = getTableToWatch(conn, catalog, schema, tableName); if (!conn.getAutoCommit()) conn.commit(); // to be sure it is a new transaction if (!force && tableToWatch != null && tableToWatch.isStatusOk(this, conn)) { // send it manually since table exits already and trigger is OK. log.info("table '" + tableName + "' is already registered, will add directly an entry in the ENTRIES Table"); String destAttrName = "?"; if (destinations == null || destinations.length == 0) destAttrName = "NULL"; String sql = "{? = call " + this.replPrefix + "check_tables(NULL,?,?,?," + destAttrName + ")}"; // name text, content text) CallableStatement st = conn.prepareCall(sql); st.setString(2, schema); st.setString(3, tableName); st.setString(4, ReplicationConstants.CREATE_ACTION); if (destinations != null && destinations.length != 0) { String post = "</desc>"; if (forceSend) post = "<attr id='_forceSend'>true</attr>" + post; String destinationTxt = "<desc><attr id='_destination'>" + toString(destinations) + "</attr>" + post; st.setString(5, destinationTxt); } st.registerOutParameter(1, Types.VARCHAR); st.executeQuery(); st.close(); return false; } /* if (force) { if (isTableRegistered(conn, tableToWatch)) { log.info("table '" + tableName + "' is already registered and 'force' has been choosed. Will set its status to 'REMOVE'"); tableToWatch.setStatus(TableToWatchInfo.STATUS_REMOVE); tableToWatch.storeStatus(this.replPrefix, this.dbPool); } } */ // then it is either not OK or force true. or null if (tableToWatch != null) // then it is either not OK or force true. In both cases we need to remove old entry tableToWatch.removeFromDb(this.replPrefix, this.dbPool); if (triggerName == null) triggerName = this.replPrefix + tmp; triggerName = this.dbMetaHelper.getIdentifier(triggerName); long debug = 0; TableToWatchInfo finalTableToWatch = new TableToWatchInfo(catalog, schema, tableName); finalTableToWatch.setActions(actions); finalTableToWatch.setTrigger(triggerName); finalTableToWatch.setDebug((int)debug); finalTableToWatch.setReplKey(tmp); finalTableToWatch.store(this.replPrefix, this.dbPool, conn); return true; } catch (Throwable ex) { conn = removeFromPool(conn, ROLLBACK_YES); if (ex instanceof Exception) throw (Exception)ex; throw new Exception(ex); } finally { conn = releaseIntoPool(conn, COMMIT_YES); } } /** * Currently made public for testing. * @param schema */ public void removeSchemaTriggers(String schema) { removeTrigger(this.replPrefix + "drtg_" + schema, null, true); removeTrigger(this.replPrefix + "altg_" + schema, null, true); removeTrigger(this.replPrefix + "crtg_" + schema, null, true); } /** * @see I_DbSpecific#removeTableToWatch(String) */ public final void removeTableToWatch(TableToWatchInfo tableToWatch, boolean removeAlsoSchemaTrigger) throws Exception { String catalog = tableToWatch.getCatalog(); String schema = tableToWatch.getSchema(); String tableName = tableToWatch.getTable(); if (catalog != null && catalog.trim().length() > 0) catalog = this.dbMetaHelper.getIdentifier(catalog); else catalog = " "; if (schema != null && schema.trim().length() > 0) schema = this.dbMetaHelper.getIdentifier(schema); else schema = " "; tableName = this.dbMetaHelper.getIdentifier(tableName); String sql = "DELETE FROM " + this.replPrefix + "tables WHERE tablename='" + tableName + "' AND schemaname='" + schema + "' AND catalogname='" + catalog + "'"; this.dbPool.update(sql); if (removeAlsoSchemaTrigger) removeSchemaTriggers(schema); if (tableToWatch.isReplicate()) { String triggerName = tableToWatch.getTrigger(); removeTrigger(triggerName, tableName, false); } } /** * @see I_DbSpecific#getCreateTableStatement(SqlDescription, * I_Mapper) */ public final String getCreateTableStatement( SqlDescription infoDescription, I_Mapper mapper) { SqlColumn[] cols = infoDescription .getColumns(); StringBuffer buf = new StringBuffer(1024); String originalTableName = infoDescription.getIdentity(); String originalSchema = infoDescription.getSchema(); String originalCatalog = infoDescription.getCatalog(); String completeTableName = null; if (mapper != null) { String schema = null; String tableName = mapper.getMappedTable(originalCatalog, originalSchema, originalTableName, null, originalTableName); if (originalSchema != null) schema = mapper.getMappedSchema(originalCatalog, originalSchema, originalTableName, null, originalSchema); if (schema != null) completeTableName = schema + "." + tableName; else completeTableName = tableName; } else { if (originalSchema != null) completeTableName = originalSchema + "." + originalTableName; else completeTableName = originalTableName; } buf.append("CREATE TABLE ").append(completeTableName).append(" ("); StringBuffer pkBuf = new StringBuffer(); boolean hasPkAlready = false; for (int i = 0; i < cols.length; i++) { if (i != 0) buf.append(","); buf.append(getColumnStatement(cols[i])); if (cols[i].isPrimaryKey()) { buf.append(" NOT NULL"); if (hasPkAlready) pkBuf.append(","); pkBuf.append(cols[i].getColName());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -