⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 specificdefault.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
               "SpecificDefault.incrementReplKey: the DB connection is null");      CallableStatement st = null;      try {         st = conn.prepareCall("{? = call " + this.replPrefix + "increment()}");         st.registerOutParameter(1, Types.INTEGER);         st.executeQuery();         long ret = st.getLong(1);         return ret;      } finally {         try {            if (st != null)               st.close();         } catch (Exception ex) {         }      }   }   /**    * Adds a trigger.    *     * @param conn    * @param tableToWatch    * @param sqlInfo    * @throws Exception    */   private final void addTrigger(Connection conn, TableToWatchInfo tableToWatch, SqlInfo sqlInfo, boolean force) throws Exception {      Statement st = null;      String table = tableToWatch.getTable();      try {         if (!tableToWatch.getStatus().equals(TableToWatchInfo.STATUS_OK) || force) {            String createString = createTableTrigger(sqlInfo.getDescription(), tableToWatch);            if (createString != null && createString.length() > 1) {               log.info("adding triggers to '" + table + "':\n\n" + createString);               st = conn.createStatement();               st.executeUpdate(createString);               st.close();            }            tableToWatch.setStatus(TableToWatchInfo.STATUS_OK);            tableToWatch.storeStatus(this.replPrefix, this.dbPool);         }      }      finally {         if (st != null)            st.close();      }   }         /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#addTrigger(java.sql.Connection, java.lang.String, java.lang.String, java.lang.String, org.xmlBlaster.contrib.dbwriter.info.SqlInfo)    */   public void addTrigger(Connection conn, String catalog, String schema, String tableName) throws Exception {      TableToWatchInfo tableToWatch = getTableToWatch(conn, catalog, schema, tableName);      SqlInfo sqlInfo = new SqlInfo(this.info);      if (sqlInfo.fillMetadata(conn, catalog, schema, tableName, null, null)) {         final boolean force = true;         addTrigger(conn, tableToWatch, sqlInfo, force);      }      else         log.warning("The table='" + tableName + "' on schema='" + schema + "' and catalog='" + catalog + "' has not been found");   }   /**    * @see I_DbSpecific#readNewTable(String, String, String, Map)    */   public final void readNewTable(String catalog, String schema, String table,         Map attrs, boolean sendInitialContents) throws Exception {      Connection conn = this.dbPool.reserve();      int oldTransIsolation = 0;      boolean oldTransIsolationKnown = false;      try {         conn.setAutoCommit(true);         oldTransIsolation = conn.getTransactionIsolation();         oldTransIsolationKnown = true;         SqlInfo sqlInfo = new SqlInfo(this.info);         if (catalog != null)            catalog = this.dbMetaHelper.getIdentifier(catalog);         if (schema != null)            schema = this.dbMetaHelper.getIdentifier(schema);         table = this.dbMetaHelper.getIdentifier(table);         sqlInfo.fillMetadata(conn, catalog, schema, table, null, null);         SqlDescription description = sqlInfo.getDescription();         description.addAttributes(attrs);         // check if function and trigger are necessary (they are only if the         // table has to be replicated.         // it does not need this if the table only needs an initial synchronization.          if (this.isDbWriteable) {            TableToWatchInfo tableToWatch = getTableToWatch(conn, catalog, schema, table);                        if (tableToWatch != null) {               boolean addTrigger = tableToWatch.isReplicate();               if (addTrigger) { // create the function and trigger here                  addTrigger(conn, tableToWatch, sqlInfo, false);               }               else                  log.info("trigger will not be added since entry '" + tableToWatch.toXml() + "' will not be replicated");            }            else {               log.info("table to watch '" + table + "' not found");            }         }         conn.commit(); // just to make oracle happy for the next set transaction         conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);         boolean autoCommit = false;         conn.setAutoCommit(autoCommit);         // retrieve the Sequence number here ...         long newReplKey = incrementReplKey(conn);         // publish the structure of the table (need to be done here since we         // must retreive repl key after having added the trigger)                  if (sendInitialContents) {            String destination = null;            if (attrs != null)                destination = (String)attrs.get("_destination");            this.initialUpdater.publishCreate(0, sqlInfo, newReplKey, destination);            if (schema != null)               table = schema + "." + table;            String sql = new String("SELECT * FROM " + table);            I_ResultCb resultHandler = new RsToSqlInfo(this.initialUpdater, sqlInfo, this.cancelledUpdates, this.transformer, newReplKey, this.rowsPerMessage, destination);             this.dbPool.select(conn, sql, autoCommit, resultHandler);         }         conn.commit();      }       catch (Exception ex) {         removeFromPool(conn, ROLLBACK_YES);         throw ex;      }       finally {         if (conn != null) {            if (oldTransIsolationKnown) {               try {                  conn.setAutoCommit(true);                  conn.setTransactionIsolation(oldTransIsolation);               }                catch (Exception e) {                  e.printStackTrace();               }            }            conn = releaseIntoPool(conn, COMMIT_NO);         }      }   }   public void forceTableChangeCheck() throws Exception {      Connection conn = null;      CallableStatement st = null;      try {         String sql = "{? = call " + this.replPrefix + "check_structure()}";         conn = this.dbPool.reserve();         conn.setAutoCommit(true);         st = conn.prepareCall(sql);         st.registerOutParameter(1, Types.VARCHAR);         st.executeQuery();      }      catch (Exception ex) {         conn = removeFromPool(conn, ROLLBACK_NO);      }      finally {         try {            if (st != null)               st.close();         } catch (Exception ex) {            ex.printStackTrace();         }         conn = releaseIntoPool(conn, COMMIT_NO);      }   }   /**    * To use this method the arguments must already have been cleaned.    * @param schema can not be null (use ' ' for null)    * @return true if the table is found among the registered tables, false if not.    * @throws SQLException    */   private final boolean isSchemaRegistered(Connection conn, String schema) throws SQLException {      Statement st = null;      try {         // check wether the item already exists, if it exists return false         String sql = "SELECT * FROM " + this.replPrefix + "tables WHERE schemaname='" + schema + "'";         st = conn.createStatement();         ResultSet rs = st.executeQuery(sql);         return rs.next();      }      finally {         if (st != null) {            try { st.close(); } catch (SQLException ex) {ex.printStackTrace();}         }      }   }      private final TableToWatchInfo getTableToWatch(Connection conn, String catalog, String schema, String tableName) throws Exception {      final String TABLES_TABLE = this.dbMetaHelper.getIdentifier(this.replPrefix + "TABLES");      return TableToWatchInfo.get(conn, TABLES_TABLE, catalog, schema, tableName, null);   }      /**    * @see I_DbSpecific#addTableToWatch(String, String, String, String, String, boolean, String, boolean)    */   public final boolean addTableToWatch(TableToWatchInfo firstTableToWatch, boolean force, String[] destinations, boolean forceSend) throws Exception {      String catalog = firstTableToWatch.getCatalog();      String schema = firstTableToWatch.getSchema();      String tableName = firstTableToWatch.getTable();      String actions = firstTableToWatch.getActions();      String triggerName = firstTableToWatch.getTrigger();      if (catalog != null && catalog.trim().length() > 0)         catalog = this.dbMetaHelper.getIdentifier(catalog);      else         catalog = " ";      if (schema != null && schema.trim().length() > 0)         schema = this.dbMetaHelper.getIdentifier(schema);      else         schema = " ";      tableName = this.dbMetaHelper.getIdentifier(tableName);      Connection conn = null;      log.info("Checking for addition of '" + tableName + "'");      try {         conn = this.dbPool.reserve();         conn.setAutoCommit(false);         long tmp = this.incrementReplKey(conn);         if (!isSchemaRegistered(conn, schema)) {            log.info("schema '" + schema + "' is not registered, going to add it");            addSchemaToWatch(conn, catalog, schema);         }                  TableToWatchInfo tableToWatch = getTableToWatch(conn, catalog, schema, tableName);         if (!conn.getAutoCommit())            conn.commit(); // to be sure it is a new transaction         if (!force && tableToWatch != null && tableToWatch.isStatusOk(this, conn)) {            // send it manually since table exits already and trigger is OK.            log.info("table '" + tableName + "' is already registered, will add directly an entry in the ENTRIES Table");            String destAttrName = "?";            if (destinations == null || destinations.length == 0)               destAttrName = "NULL";            String sql = "{? = call " + this.replPrefix + "check_tables(NULL,?,?,?," + destAttrName + ")}"; // name text, content text)            CallableStatement st = conn.prepareCall(sql);            st.setString(2, schema);            st.setString(3, tableName);            st.setString(4, ReplicationConstants.CREATE_ACTION);            if (destinations != null && destinations.length != 0) {               String post = "</desc>";               if (forceSend)                  post = "<attr id='_forceSend'>true</attr>" + post;               String destinationTxt = "<desc><attr id='_destination'>" + toString(destinations) + "</attr>" + post;               st.setString(5, destinationTxt);            }            st.registerOutParameter(1, Types.VARCHAR);            st.executeQuery();            st.close();            return false;         }         /*         if (force) {            if (isTableRegistered(conn, tableToWatch)) {               log.info("table '" + tableName + "' is already registered and 'force' has been choosed. Will set its status to 'REMOVE'");               tableToWatch.setStatus(TableToWatchInfo.STATUS_REMOVE);               tableToWatch.storeStatus(this.replPrefix, this.dbPool);            }         }         */         // then it is either not OK or force true. or null          if (tableToWatch != null) // then it is either not OK or force true. In both cases we need to remove old entry            tableToWatch.removeFromDb(this.replPrefix, this.dbPool);                  if (triggerName == null)            triggerName = this.replPrefix + tmp;         triggerName = this.dbMetaHelper.getIdentifier(triggerName);                  long debug = 0;         TableToWatchInfo finalTableToWatch = new TableToWatchInfo(catalog, schema, tableName);         finalTableToWatch.setActions(actions);         finalTableToWatch.setTrigger(triggerName);         finalTableToWatch.setDebug((int)debug);         finalTableToWatch.setReplKey(tmp);         finalTableToWatch.store(this.replPrefix, this.dbPool, conn);                  return true;      }      catch (Throwable ex) {         conn = removeFromPool(conn, ROLLBACK_YES);         if (ex instanceof Exception)            throw (Exception)ex;         throw new Exception(ex);      }      finally {         conn = releaseIntoPool(conn, COMMIT_YES);      }   }   /**    * Currently made public for testing.    * @param schema    */   public void removeSchemaTriggers(String schema) {      removeTrigger(this.replPrefix + "drtg_" + schema, null, true);      removeTrigger(this.replPrefix + "altg_" + schema, null, true);      removeTrigger(this.replPrefix + "crtg_" + schema, null, true);   }      /**    * @see I_DbSpecific#removeTableToWatch(String)    */   public final void removeTableToWatch(TableToWatchInfo tableToWatch, boolean removeAlsoSchemaTrigger) throws Exception {      String catalog = tableToWatch.getCatalog();      String schema =  tableToWatch.getSchema();      String tableName = tableToWatch.getTable();      if (catalog != null && catalog.trim().length() > 0)         catalog = this.dbMetaHelper.getIdentifier(catalog);      else         catalog = " ";      if (schema != null && schema.trim().length() > 0)         schema = this.dbMetaHelper.getIdentifier(schema);      else         schema = " ";      tableName = this.dbMetaHelper.getIdentifier(tableName);      String sql = "DELETE FROM " + this.replPrefix + "tables WHERE tablename='" + tableName            + "' AND schemaname='" + schema + "' AND catalogname='" + catalog            + "'";      this.dbPool.update(sql);      if (removeAlsoSchemaTrigger)         removeSchemaTriggers(schema);      if (tableToWatch.isReplicate()) {         String triggerName = tableToWatch.getTrigger();         removeTrigger(triggerName, tableName, false);      }   }   /**    * @see I_DbSpecific#getCreateTableStatement(SqlDescription,    *      I_Mapper)    */   public final String getCreateTableStatement(         SqlDescription infoDescription, I_Mapper mapper) {      SqlColumn[] cols = infoDescription            .getColumns();      StringBuffer buf = new StringBuffer(1024);      String originalTableName = infoDescription.getIdentity();      String originalSchema = infoDescription.getSchema();      String originalCatalog = infoDescription.getCatalog();      String completeTableName = null;      if (mapper != null) {         String schema = null;         String tableName = mapper.getMappedTable(originalCatalog, originalSchema, originalTableName, null, originalTableName);         if (originalSchema != null)            schema = mapper.getMappedSchema(originalCatalog, originalSchema, originalTableName, null, originalSchema);         if (schema != null)            completeTableName = schema + "." + tableName;         else            completeTableName = tableName;      }      else {         if (originalSchema != null)            completeTableName = originalSchema + "." + originalTableName;         else            completeTableName = originalTableName;      }            buf.append("CREATE TABLE ").append(completeTableName).append(" (");      StringBuffer pkBuf = new StringBuffer();      boolean hasPkAlready = false;      for (int i = 0; i < cols.length; i++) {         if (i != 0)            buf.append(",");         buf.append(getColumnStatement(cols[i]));         if (cols[i].isPrimaryKey()) {            buf.append(" NOT NULL");            if (hasPkAlready)               pkBuf.append(",");            pkBuf.append(cols[i].getColName());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -