📄 specificoracle.java
字号:
/*------------------------------------------------------------------------------ Name: SpecificDefault.java Project: xmlBlaster.org Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file ------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication.impl;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.sql.Types;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.logging.Logger;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwatcher.convert.I_AttributeTransformer;import org.xmlBlaster.contrib.dbwriter.info.SqlColumn;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.dbwriter.info.SqlRow;import org.xmlBlaster.contrib.replication.TableToWatchInfo;public class SpecificOracle extends SpecificDefault { private static Logger log = Logger.getLogger(SpecificOracle.class.getName()); private String ownSchema; private boolean debug; private String debugFunction; private boolean wipeoutTriggers; private boolean wipeoutSequences; private boolean wipeoutFunctions; private boolean wipeoutPackages; private boolean wipeoutProcedures; private boolean wipeoutViews; private boolean wipeoutTables; private boolean wipeoutSynonyms; private boolean wipeoutIndexes; private boolean wipeoutExIfConnected; private boolean createDropAlterDetection; /** * Not doing anything. */ public SpecificOracle() { super(); } public void init(I_Info info) throws Exception { super.init(info); this.ownSchema = this.dbMetaHelper.getIdentifier(this.info.get("db.user", null)); this.debug = this.info.getBoolean("replication.plsql.debug", false); this.debugFunction = this.info.get("replication.plsql.debugFunction", null); this.wipeoutExIfConnected = this.info.getBoolean("replication.oracle.wipeoutExIfConnected", false); this.wipeoutTriggers = this.info.getBoolean("replication.oracle.wipeoutTriggers", true); this.wipeoutSequences = this.info.getBoolean("replication.oracle.wipeoutSequences", true); this.wipeoutFunctions = this.info.getBoolean("replication.oracle.wipeoutFunctions", true); this.wipeoutPackages = this.info.getBoolean("replication.oracle.wipeoutPackages", true); this.wipeoutProcedures = this.info.getBoolean("replication.oracle.wipeoutProcedures", true); this.wipeoutViews = this.info.getBoolean("replication.oracle.wipeoutViews", true); this.wipeoutTables = this.info.getBoolean("replication.oracle.wipeoutTables", true); this.wipeoutSynonyms = this.info.getBoolean("replication.oracle.wipeoutSynonyms", true); this.wipeoutIndexes = this.info.getBoolean("replication.oracle.wipeoutIndexes", true); this.createDropAlterDetection = this.info.getBoolean("replication.createDropAlterDetection", true); } /** * Adds a schema to be watched. By Oracle it would add triggers to the * schema. * * @param catalog * @param schema * @throws Exception. Thrown if an exception occurs on the backend. Note that if an * exception occurs you must cleanup the connection since it might become stale. */ public void addSchemaToWatch(Connection conn, String catalog, String schema) throws Exception { if (schema == null || schema.length() < 1) return; Map map = this.replacer.getAdditionalMapClone(); map.put("schemaName", schema); Replacer tmpReplacer = new Replacer(this.info, map); boolean doWarn = true; boolean force = true; // overwrites existing ones if (this.createDropAlterDetection) updateFromFile(conn, "createDropAlter", "replication.createDropAlterFile", "org/xmlBlaster/contrib/replication/setup/oracle/createDropAlter.sql", doWarn, force, tmpReplacer); } /** * * @param description * @return */ private boolean checkIfContainsLongs(SqlDescription description) { SqlColumn[] cols = description.getColumns(); for (int i = 0; i < cols.length; i++) { int type = cols[i].getSqlType(); if (type == Types.LONGVARCHAR || type == Types.LONGVARBINARY) return true; } return false; } /** * * @param col * @param prefix * can be 'old' or 'new' * @return */ protected String createVariableSqlPart(SqlDescription description, String prefix, boolean containsLongs, boolean isInsert) { String newOldPrefix = ":"; // ":" on ora10 ? SqlColumn[] cols = description.getColumns(); String contName = prefix + "Cont"; // will be newCont or oldCont StringBuffer buf = new StringBuffer(); String tablePrefix = newOldPrefix + prefix; buf.append(" ").append(contName).append(" := NULL;\n"); buf.append(" oid := ROWIDTOCHAR(").append(tablePrefix).append(".rowid);\n"); boolean isNew = "new".equals(prefix); if (isNew) { if (containsLongs) return buf.toString(); } // note when using LONGS the newCont must be NULL (not EMPTY_CLOB) buf.append(" ").append(contName).append(" := EMPTY_CLOB;\n"); buf.append(" dbms_lob.createtemporary(").append(contName).append(", TRUE);\n"); buf.append(" dbms_lob.open(").append(contName).append(", dbms_lob.lob_readwrite);\n"); for (int i = 0; i < cols.length; i++) { String colName = cols[i].getColName(); String typeName = cols[i].getTypeName(); int type = cols[i].getSqlType(); String varName = tablePrefix + "." + colName; // for example :new.colname' if (this.debug) { buf.append(" IF debug != 0 THEN\n"); if (this.debugFunction != null) buf.append(" ").append(this.debugFunction).append("(' col ").append(colName).append(" type ").append(typeName).append(" typeNr ").append(type).append(" prefix ").append(prefix).append("');\n"); buf.append(" END IF;\n"); } boolean doLongWorkaround = type != Types.LONGVARCHAR && type != Types.LONGVARBINARY; // used also at the end of loop if (doLongWorkaround) buf.append(" IF ").append(varName).append(" IS NOT NULL THEN\n"); boolean doProcess = isNew || cols[i].isSearchable(); if (doProcess) { // we don't want it on old entries since these are only used to find the entry to process // this could be a problem if the type of a col is searchable on the source(master) but not on the slave. if (type == Types.LONGVARCHAR || type == Types.LONGVARBINARY) { /* * don't use the LONGS since they can not be used inside a trigger. For INSERT * the RAWID will be used and the entry will be read when processing the data, for * DELETE and UPDATE the LONGS will not be used to search. */ } else if (type == Types.BINARY || type == Types.BLOB || type == Types.JAVA_OBJECT || type == Types.VARBINARY || type == Types.STRUCT) { buf.append(" blobCont := EMPTY_BLOB;\n"); buf.append(" dbms_lob.createtemporary(blobCont, TRUE);\n"); buf.append(" dbms_lob.open(blobCont, dbms_lob.lob_readwrite);\n"); if (type == Types.BLOB) { buf.append(" dbms_lob.append(blobCont,").append(varName).append(");\n"); } else { buf.append(" dbms_lob.writeappend(blobCont,").append("length("); buf.append(varName).append("),").append(varName).append(");\n"); } buf.append(" fake := ").append(this.replPrefix).append("col2xml_base64('").append(colName).append("', blobCont,").append(contName).append(");\n"); buf.append(" dbms_lob.close(blobCont);\n"); buf.append(" dbms_lob.freetemporary(blobCont);\n"); } else if (type == Types.DATE || type == Types.TIMESTAMP || typeName.equals("TIMESTAMP")) { buf.append(" tmpCont := EMPTY_CLOB;\n"); buf.append(" dbms_lob.createtemporary(tmpCont, TRUE);\n"); buf.append(" dbms_lob.open(tmpCont, dbms_lob.lob_readwrite);\n"); // on new oracle data coming from old versions could be sqlType=TIMESTAMP but type='DATE' if (typeName.equals("DATE") || type == Types.DATE) buf.append(" tmpNum := TO_CHAR(").append(varName).append(",'YYYY-MM-DD HH24:MI:SS');\n"); else // then timestamp buf.append(" tmpNum := TO_CHAR(").append(varName).append(",'YYYY-MM-DD HH24:MI:SS.FF');\n"); // do not use 'YYYY-MM-DD HH24:MI:SSXFF' since it would take the local punctuator of the env or // invoking session buf.append(" dbms_lob.writeappend(tmpCont, LENGTH(tmpNum), tmpNum);\n"); buf.append(" fake := ").append(this.replPrefix).append("col2xml('"); buf.append(colName).append("', tmpCont,").append(contName).append(");\n"); buf.append(" dbms_lob.close(tmpCont);\n"); buf.append(" dbms_lob.freetemporary(tmpCont);\n"); } else { if (type == Types.INTEGER || type == Types.NUMERIC || type == Types.DECIMAL || type == Types.FLOAT || type == Types.DOUBLE || type == Types.DATE || type == Types.TIMESTAMP || type == Types.OTHER) { buf.append(" tmpNum := TO_CHAR(").append(varName).append(");\n"); buf.append(" fake := ").append(this.replPrefix); buf.append("fill_blob_char(tmpNum, '").append(colName).append("',").append(contName).append(");\n"); } else { // buf.append(" tmpNum := ").append(varName).append(";\n"); buf.append(" fake := ").append(this.replPrefix); buf.append("fill_blob_char(").append(varName).append(", '").append(colName).append("',"); buf.append(contName).append(");\n"); } } } else buf.append(" fake := 0;\n"); if (doLongWorkaround) { if (!isInsert) { // on inserts we want to avoid writing unnecessary null entries buf.append(" ELSE\n"); buf.append(" fake := ").append(this.replPrefix).append("col2xml_null('"); buf.append(colName).append("', ").append(contName).append(");\n"); } buf.append(" END IF;\n"); } } return buf.toString(); } public String createTableTrigger(SqlDescription infoDescription, TableToWatchInfo tableToWatch) { String triggerName = tableToWatch.getTrigger(); String replFlags = tableToWatch.getActions(); if (replFlags == null) replFlags = ""; boolean doDeletes = replFlags.indexOf('D') > -1; boolean doInserts = replFlags.indexOf('I') > -1; boolean doUpdates = replFlags.indexOf('U') > -1; String tableName = infoDescription.getIdentity(); // should be the table // name String completeTableName = tableName; String schemaName = infoDescription.getSchema(); if (schemaName != null && schemaName.trim().length() > 0) { completeTableName = schemaName + "." + tableName; } String dbName = "NULL"; // still unsure on how to retrieve this // information on a correct way. StringBuffer buf = new StringBuffer(); buf.append("-- ---------------------------------------------------------------------------- \n"); buf.append("-- This is the function which will be registered to the triggers. \n"); buf.append("-- It must not take any parameter. \n"); buf.append("-- This is the only method which is business data specific. It is depending on \n"); buf.append("-- the table to be replicated. This should be generated by a tool. \n"); buf.append("-- \n"); buf.append("-- For each table you should just write out in a sequence the complete content \n"); buf.append("-- of the row to replicate. You could make more fancy stuff here, for example \n"); buf.append("-- you could just send the minimal stuff, i.e. only the stuff which has changed \n"); buf.append("-- (for the new stuff) and for the old one you could always send an empty one. \n"); buf.append("-- ---------------------------------------------------------------------------- \n"); buf.append("-- AND THE TRIGGER FOR THE replTest TABLE \n"); buf.append("-- ---------------------------------------------------------------------------- \n"); buf.append("\n"); buf.append("CREATE OR REPLACE TRIGGER ").append(triggerName).append("\n"); boolean first = true; buf.append("AFTER"); if (doUpdates) { buf.append(" UPDATE"); first = false; } if (doDeletes) { if (!first) buf.append(" OR"); else first = false; buf.append(" DELETE"); } if (doInserts) { if (!first) buf.append(" OR"); else first = false; buf.append(" INSERT"); } buf.append(" ON ").append(completeTableName).append("\n"); buf.append("FOR EACH ROW\n"); buf.append("DECLARE\n"); buf.append(" blobCont BLOB; \n"); buf.append(" oldCont CLOB; \n"); buf.append(" newCont CLOB;\n"); buf.append(" tmpCont CLOB;\n"); buf.append(" tmpNum VARCHAR(255);\n"); buf.append(" oid VARCHAR(50);\n"); buf.append(" replKey INTEGER;\n"); buf.append(" ret VARCHAR(10);\n"); buf.append(" transId VARCHAR2(50);\n"); buf.append(" op VARCHAR(15);\n"); buf.append(" longKey INTEGER;\n"); buf.append(" debug INTEGER;\n"); buf.append(" fake INTEGER;\n"); buf.append("BEGIN\n"); buf.append("\n"); if (this.debug) { buf.append(" debug := ").append(this.replPrefix).append("debug_trigger('").append(schemaName).append("','").append(tableName).append("');\n"); buf.append(" IF debug != 0 THEN\n"); if (this.debugFunction != null) buf.append(" ").append(this.debugFunction).append("('TRIGGER ON ").append(completeTableName).append(" invoked');\n"); // buf.append(" ").append(this.replPrefix).append("debug('TRIGGER ON ").append(completeTableName).append(" invoked');\n"); buf.append(" END IF;\n"); // buf.append(" ").append(this.replPrefix).append("debug('TRIGGER ON '").append(completeTableName).append("' invoked');\n"); // buf.append(" KG_WAKEUP.PG$DBGMESS('TRIGGER ON '").append(completeTableName).append("' invoked');\n"); } boolean containsLongs = checkIfContainsLongs(infoDescription); boolean isInsert = true; // optimizes: does not write NULL when insert buf.append(" IF INSERTING THEN\n"); buf.append(" op := 'INSERT';\n"); buf.append(createVariableSqlPart(infoDescription, "new", containsLongs, isInsert)); isInsert = false; // now for update and delete buf.append(" ELSIF DELETING THEN\n"); buf.append(" op := 'DELETE';\n"); buf.append(createVariableSqlPart(infoDescription, "old", containsLongs, isInsert)); buf.append(" ELSE\n"); buf.append(" op := 'UPDATE';\n"); buf.append(createVariableSqlPart(infoDescription, "old", containsLongs, isInsert)); buf.append(createVariableSqlPart(infoDescription, "new", containsLongs, isInsert)); buf.append(" END IF;\n"); String dbNameTmp = null; String tableNameTmp = "'" + tableName + "'"; String schemaNameTmp = null; if (dbName == null) dbNameTmp = "NULL"; else dbNameTmp = "'" + dbName + "'"; if (schemaName == null) schemaNameTmp = "NULL"; else schemaNameTmp = "'" + schemaName + "'"; buf.append(" SELECT " + this.replPrefix + "seq.nextval INTO replKey FROM DUAL;\n"); buf.append(" INSERT INTO " + this.replPrefix + "items (repl_key, trans_key, dbId, tablename, guid,\n"); buf.append(" db_action, db_catalog, db_schema, \n"); buf.append(" content, oldContent, version) values \n"); buf.append(" (replKey, 'UNKNOWN',").append(dbNameTmp).append(",\n"); buf.append(" ").append(tableNameTmp).append(", oid, op, NULL, ").append(schemaNameTmp).append( ", newCont, \n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -