⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 specificoracle.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------ Name:      SpecificDefault.java Project:   xmlBlaster.org Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file ------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication.impl;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.sql.Types;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.logging.Logger;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwatcher.convert.I_AttributeTransformer;import org.xmlBlaster.contrib.dbwriter.info.SqlColumn;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlRow;import org.xmlBlaster.contrib.replication.TableToWatchInfo;public class SpecificOracle extends SpecificDefault {   private static Logger log = Logger.getLogger(SpecificOracle.class.getName());   private String ownSchema;      private boolean debug;   private String debugFunction;   private boolean wipeoutTriggers;   private boolean wipeoutSequences;   private boolean wipeoutFunctions;   private boolean wipeoutPackages;   private boolean wipeoutProcedures;    private boolean wipeoutViews;    private boolean wipeoutTables;   private boolean wipeoutSynonyms;    private boolean wipeoutIndexes;   private boolean wipeoutExIfConnected;   private boolean createDropAlterDetection;      /**    * Not doing anything.    */   public SpecificOracle() {      super();   }   public void init(I_Info info) throws Exception {      super.init(info);      this.ownSchema = this.dbMetaHelper.getIdentifier(this.info.get("db.user", null));            this.debug = this.info.getBoolean("replication.plsql.debug", false);      this.debugFunction = this.info.get("replication.plsql.debugFunction", null);      this.wipeoutExIfConnected = this.info.getBoolean("replication.oracle.wipeoutExIfConnected", false);      this.wipeoutTriggers = this.info.getBoolean("replication.oracle.wipeoutTriggers", true);      this.wipeoutSequences = this.info.getBoolean("replication.oracle.wipeoutSequences", true);      this.wipeoutFunctions = this.info.getBoolean("replication.oracle.wipeoutFunctions", true);      this.wipeoutPackages = this.info.getBoolean("replication.oracle.wipeoutPackages", true);      this.wipeoutProcedures = this.info.getBoolean("replication.oracle.wipeoutProcedures", true);      this.wipeoutViews = this.info.getBoolean("replication.oracle.wipeoutViews", true);      this.wipeoutTables = this.info.getBoolean("replication.oracle.wipeoutTables", true);      this.wipeoutSynonyms = this.info.getBoolean("replication.oracle.wipeoutSynonyms", true);      this.wipeoutIndexes = this.info.getBoolean("replication.oracle.wipeoutIndexes", true);      this.createDropAlterDetection = this.info.getBoolean("replication.createDropAlterDetection", true);   }      /**    * Adds a schema to be watched. By Oracle it would add triggers to the    * schema.    *     * @param catalog    * @param schema    * @throws Exception. Thrown if an exception occurs on the backend. Note that if an    * exception occurs you must cleanup the connection since it might become stale.    */   public void addSchemaToWatch(Connection conn, String catalog, String schema) throws Exception {      if (schema == null || schema.length() < 1) return;      Map map = this.replacer.getAdditionalMapClone();      map.put("schemaName", schema);      Replacer tmpReplacer = new Replacer(this.info, map);      boolean doWarn = true;      boolean force = true; // overwrites existing ones      if (this.createDropAlterDetection)         updateFromFile(conn, "createDropAlter", "replication.createDropAlterFile",               "org/xmlBlaster/contrib/replication/setup/oracle/createDropAlter.sql", doWarn, force, tmpReplacer);   }   /**    *     * @param description    * @return    */   private boolean checkIfContainsLongs(SqlDescription description) {      SqlColumn[] cols = description.getColumns();      for (int i = 0; i < cols.length; i++) {         int type = cols[i].getSqlType();         if (type == Types.LONGVARCHAR || type == Types.LONGVARBINARY)            return true;      }      return false;   }      /**    *     * @param col    * @param prefix    *           can be 'old' or 'new'    * @return    */   protected String createVariableSqlPart(SqlDescription description, String prefix, boolean containsLongs, boolean isInsert) {      String newOldPrefix = ":"; // ":" on ora10 ?      SqlColumn[] cols = description.getColumns();      String contName = prefix + "Cont"; // will be newCont or oldCont      StringBuffer buf = new StringBuffer();      String tablePrefix = newOldPrefix + prefix;      buf.append("       ").append(contName).append(" := NULL;\n");      buf.append("       oid := ROWIDTOCHAR(").append(tablePrefix).append(".rowid);\n");      boolean isNew = "new".equals(prefix);      if (isNew) {         if (containsLongs)            return buf.toString();      }      // note when using LONGS the newCont must be NULL (not EMPTY_CLOB)      buf.append("       ").append(contName).append(" := EMPTY_CLOB;\n");      buf.append("       dbms_lob.createtemporary(").append(contName).append(", TRUE);\n");      buf.append("       dbms_lob.open(").append(contName).append(", dbms_lob.lob_readwrite);\n");      for (int i = 0; i < cols.length; i++) {         String colName = cols[i].getColName();         String typeName = cols[i].getTypeName();         int type = cols[i].getSqlType();         String varName = tablePrefix + "." + colName; // for example :new.colname'                  if (this.debug) {            buf.append("       IF debug != 0 THEN\n");            if (this.debugFunction != null)               buf.append("          ").append(this.debugFunction).append("('   col ").append(colName).append(" type ").append(typeName).append(" typeNr ").append(type).append(" prefix ").append(prefix).append("');\n");            buf.append("       END IF;\n");         }         boolean doLongWorkaround = type != Types.LONGVARCHAR && type != Types.LONGVARBINARY; // used also at the end of loop         if (doLongWorkaround)            buf.append("          IF ").append(varName).append(" IS NOT NULL THEN\n");                  boolean doProcess = isNew || cols[i].isSearchable();         if (doProcess) { // we don't want it on old entries since these are only used to find the entry to process            // this could be a problem if the type of a col is searchable on the source(master) but not on the slave.            if (type == Types.LONGVARCHAR || type == Types.LONGVARBINARY) {               /*                * don't use the LONGS since they can not be used inside a trigger. For INSERT                * the RAWID will be used and the entry will be read when processing the data, for                * DELETE and UPDATE the LONGS will not be used to search.                */            }            else if (type == Types.BINARY || type == Types.BLOB || type == Types.JAVA_OBJECT || type == Types.VARBINARY                  || type == Types.STRUCT) {               buf.append("             blobCont := EMPTY_BLOB;\n");               buf.append("             dbms_lob.createtemporary(blobCont, TRUE);\n");               buf.append("             dbms_lob.open(blobCont, dbms_lob.lob_readwrite);\n");               if (type == Types.BLOB) {                  buf.append("             dbms_lob.append(blobCont,").append(varName).append(");\n");               }               else {                  buf.append("             dbms_lob.writeappend(blobCont,").append("length(");                  buf.append(varName).append("),").append(varName).append(");\n");               }               buf.append("             fake := ").append(this.replPrefix).append("col2xml_base64('").append(colName).append("', blobCont,").append(contName).append(");\n");               buf.append("             dbms_lob.close(blobCont);\n");               buf.append("             dbms_lob.freetemporary(blobCont);\n");            }            else if (type == Types.DATE || type == Types.TIMESTAMP || typeName.equals("TIMESTAMP")) {               buf.append("             tmpCont := EMPTY_CLOB;\n");               buf.append("             dbms_lob.createtemporary(tmpCont, TRUE);\n");               buf.append("             dbms_lob.open(tmpCont, dbms_lob.lob_readwrite);\n");               // on new oracle data coming from old versions could be sqlType=TIMESTAMP but type='DATE'               if (typeName.equals("DATE") || type == Types.DATE)                   buf.append("             tmpNum := TO_CHAR(").append(varName).append(",'YYYY-MM-DD HH24:MI:SS');\n");               else // then timestamp                  buf.append("             tmpNum := TO_CHAR(").append(varName).append(",'YYYY-MM-DD HH24:MI:SS.FF');\n");               // do not use 'YYYY-MM-DD HH24:MI:SSXFF' since it would take the local punctuator of the env or                // invoking session               buf.append("             dbms_lob.writeappend(tmpCont, LENGTH(tmpNum), tmpNum);\n");               buf.append("             fake := ").append(this.replPrefix).append("col2xml('");               buf.append(colName).append("', tmpCont,").append(contName).append(");\n");               buf.append("             dbms_lob.close(tmpCont);\n");               buf.append("             dbms_lob.freetemporary(tmpCont);\n");            }            else {               if (type == Types.INTEGER || type == Types.NUMERIC || type == Types.DECIMAL || type == Types.FLOAT                     || type == Types.DOUBLE || type == Types.DATE || type == Types.TIMESTAMP || type == Types.OTHER) {                  buf.append("             tmpNum := TO_CHAR(").append(varName).append(");\n");                  buf.append("             fake := ").append(this.replPrefix);                  buf.append("fill_blob_char(tmpNum, '").append(colName).append("',").append(contName).append(");\n");               }               else {                  // buf.append("             tmpNum := ").append(varName).append(";\n");                  buf.append("             fake := ").append(this.replPrefix);                  buf.append("fill_blob_char(").append(varName).append(", '").append(colName).append("',");                  buf.append(contName).append(");\n");               }                           }         }         else            buf.append("             fake := 0;\n");                  if (doLongWorkaround) {            if (!isInsert) { // on inserts we want to avoid writing unnecessary null entries               buf.append("          ELSE\n");               buf.append("             fake := ").append(this.replPrefix).append("col2xml_null('");               buf.append(colName).append("', ").append(contName).append(");\n");            }            buf.append("          END IF;\n");         }      }      return buf.toString();   }   public String createTableTrigger(SqlDescription infoDescription, TableToWatchInfo tableToWatch) {      String triggerName = tableToWatch.getTrigger();      String replFlags = tableToWatch.getActions();      if (replFlags == null)         replFlags = "";      boolean doDeletes = replFlags.indexOf('D') > -1;      boolean doInserts = replFlags.indexOf('I') > -1;      boolean doUpdates = replFlags.indexOf('U') > -1;            String tableName = infoDescription.getIdentity(); // should be the table                                                         // name      String completeTableName = tableName;      String schemaName = infoDescription.getSchema();      if (schemaName != null && schemaName.trim().length() > 0) {         completeTableName = schemaName + "." + tableName;      }      String dbName = "NULL"; // still unsure on how to retrieve this                              // information on a correct way.      StringBuffer buf = new StringBuffer();      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("-- This is the function which will be registered to the triggers.               \n");      buf.append("-- It must not take any parameter.                                              \n");      buf.append("-- This is the only method which is business data specific. It is depending on  \n");      buf.append("-- the table to be replicated. This should be generated by a tool.              \n");      buf.append("--                                                                              \n");      buf.append("-- For each table you should just write out in a sequence the complete content  \n");      buf.append("-- of the row to replicate. You could make more fancy stuff here, for example   \n");      buf.append("-- you could just send the minimal stuff, i.e. only the stuff which has changed \n");      buf.append("-- (for the new stuff) and for the old one you could always send an empty one.  \n");      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("-- AND THE TRIGGER FOR THE replTest TABLE                                       \n");      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("\n");      buf.append("CREATE OR REPLACE TRIGGER ").append(triggerName).append("\n");      boolean first = true;      buf.append("AFTER");      if (doUpdates) {         buf.append(" UPDATE");         first = false;      }      if (doDeletes) {         if (!first)            buf.append(" OR");         else            first = false;         buf.append(" DELETE");      }      if (doInserts) {         if (!first)            buf.append(" OR");         else            first = false;         buf.append(" INSERT");      }      buf.append(" ON ").append(completeTableName).append("\n");      buf.append("FOR EACH ROW\n");      buf.append("DECLARE\n");      buf.append("   blobCont BLOB; \n");      buf.append("   oldCont CLOB; \n");      buf.append("   newCont CLOB;\n");      buf.append("   tmpCont CLOB;\n");      buf.append("   tmpNum  VARCHAR(255);\n");      buf.append("   oid     VARCHAR(50);\n");      buf.append("   replKey INTEGER;\n");      buf.append("   ret     VARCHAR(10);\n");      buf.append("   transId VARCHAR2(50);\n");      buf.append("   op      VARCHAR(15);\n");      buf.append("   longKey INTEGER;\n");      buf.append("   debug   INTEGER;\n");      buf.append("   fake    INTEGER;\n");      buf.append("BEGIN\n");      buf.append("\n");      if (this.debug) {         buf.append("    debug := ").append(this.replPrefix).append("debug_trigger('").append(schemaName).append("','").append(tableName).append("');\n");         buf.append("    IF debug != 0 THEN\n");         if (this.debugFunction != null)            buf.append("       ").append(this.debugFunction).append("('TRIGGER ON ").append(completeTableName).append(" invoked');\n");         // buf.append("       ").append(this.replPrefix).append("debug('TRIGGER ON ").append(completeTableName).append(" invoked');\n");         buf.append("    END IF;\n");         // buf.append("    ").append(this.replPrefix).append("debug('TRIGGER ON '").append(completeTableName).append("' invoked');\n");         // buf.append("    KG_WAKEUP.PG$DBGMESS('TRIGGER ON '").append(completeTableName).append("' invoked');\n");      }            boolean containsLongs = checkIfContainsLongs(infoDescription);      boolean isInsert = true; // optimizes: does not write NULL when insert      buf.append("    IF INSERTING THEN\n");      buf.append("       op := 'INSERT';\n");      buf.append(createVariableSqlPart(infoDescription, "new", containsLongs, isInsert));      isInsert = false; // now for update and delete      buf.append("    ELSIF DELETING THEN\n");      buf.append("       op := 'DELETE';\n");      buf.append(createVariableSqlPart(infoDescription, "old", containsLongs, isInsert));      buf.append("    ELSE\n");      buf.append("       op := 'UPDATE';\n");      buf.append(createVariableSqlPart(infoDescription, "old", containsLongs, isInsert));      buf.append(createVariableSqlPart(infoDescription, "new", containsLongs, isInsert));      buf.append("    END IF;\n");      String dbNameTmp = null;      String tableNameTmp = "'" + tableName + "'";      String schemaNameTmp = null;      if (dbName == null)         dbNameTmp = "NULL";      else dbNameTmp = "'" + dbName + "'";      if (schemaName == null)         schemaNameTmp = "NULL";      else schemaNameTmp = "'" + schemaName + "'";      buf.append("    SELECT " + this.replPrefix + "seq.nextval INTO replKey FROM DUAL;\n");            buf.append("    INSERT INTO " + this.replPrefix + "items (repl_key, trans_key, dbId, tablename, guid,\n");      buf.append("                           db_action, db_catalog, db_schema, \n");      buf.append("                           content, oldContent, version) values \n");      buf.append("                           (replKey, 'UNKNOWN',").append(dbNameTmp).append(",\n");      buf.append("            ").append(tableNameTmp).append(", oid, op, NULL, ").append(schemaNameTmp).append(            ", newCont, \n");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -