⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 specificpostgres.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
/*------------------------------------------------------------------------------ Name:      SpecificPostgres.java Project:   xmlBlaster.org Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file ------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication.impl;import java.sql.Connection;import java.sql.Types;import org.xmlBlaster.contrib.dbwatcher.convert.I_AttributeTransformer;import org.xmlBlaster.contrib.dbwriter.info.SqlColumn;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.replication.TableToWatchInfo;public class SpecificPostgres extends SpecificDefault {   /**    * Not doing anything.    */   public SpecificPostgres() {      super();   }   /**    * Helper method used to construct the CREATE TABLE statement part belonging    * to a single COLUMN.    *     * @param colInfoDescription    * @return    */   public StringBuffer getColumnStatement(SqlColumn colInfoDescription) {      String type = colInfoDescription.getType();      /*       * if (charLength > 0) { type = type + "[" + charLength + "]"; }       */      StringBuffer buf = new StringBuffer(colInfoDescription.getColName())            .append(" ").append(type);      return buf;   }   /**    *     * @param col    * @param prefix    *           can be 'old' or 'new'    * @return    */   protected String createVariableSqlPart(SqlDescription description,         String prefix) {      SqlColumn[] cols = description            .getColumns();      StringBuffer buf = new StringBuffer("       ").append(prefix).append(            "Cont = '';\n");      for (int i = 0; i < cols.length; i++) {         String colName = cols[i].getColName();         int type = cols[i].getSqlType();         if (type == Types.BINARY || type == Types.BLOB               || type == Types.JAVA_OBJECT || type == Types.VARBINARY               || type == Types.STRUCT) {            buf.append("       blobCont = " + prefix + "." + colName + ";\n");            buf.append("       tmp = " + prefix                  + "Cont || " + this.replPrefix + "col2xml_base64('" + colName                  + "', blobCont);\n");            buf.append("       " + prefix + "Cont = tmp;\n");         } else {            buf.append("       tmp = " + prefix + "Cont || " + this.replPrefix + "col2xml('"                  + colName + "'," + prefix + "." + colName + ");\n");            buf.append("       " + prefix + "Cont = tmp;\n");         }      }      buf.append("       oid = ").append(prefix).append(".oid;\n");      return buf.toString();   }   /**    * This method creates a function to be associated to a trigger to detect    * INSERT DELETE and UPDATE operations on a particular table.    *     * @param infoDescription    *           the info object containing the necessary information for the    *           table.    * @return a String containing the sql update. It can be executed.    */   public String createTableFunction(SqlDescription infoDescription, String functionName) {      StringBuffer buf = new StringBuffer();      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("-- This is the function which will be registered to the triggers.               \n");      buf.append("-- It must not take any parameter.                                              \n");      buf.append("-- This is the only method which is business data specific. It is depending on  \n");      buf.append("-- the table to be replicated. This should be generated by a tool.              \n");      buf.append("--                                                                              \n");      buf.append("-- For each table you should just write out in a sequence the complete content  \n");      buf.append("-- of the row to replicate. You could make more fancy stuff here, for example   \n");      buf.append("-- you could just send the minimal stuff, i.e. only the stuff which has changed \n");      buf.append("-- (for the new stuff) and for the old one you could always send an empty one.  \n");      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("\n");      buf.append("CREATE OR REPLACE FUNCTION ").append(functionName).append("() RETURNS trigger AS $").append(functionName).append("$\n");      buf.append("DECLARE blobCont BYTEA; \n");      buf.append("        oldCont TEXT; \n");      buf.append("   newCont TEXT;\n");      buf.append("   comment TEXT;\n");      buf.append("   oid     TEXT;\n");      buf.append("   tmp     TEXT;\n");      buf.append("BEGIN\n");      buf.append("    oldCont = NULL;\n");      buf.append("    newCont = NULL;\n");      buf.append("    tmp = " + this.replPrefix + "check_structure();\n");      buf.append("\n");      buf.append("    IF (TG_OP != 'INSERT') THEN\n");      buf.append(createVariableSqlPart(infoDescription, "old"));      buf.append("    END IF;\n");      buf.append("\n");      buf.append("    IF (TG_OP != 'DELETE') THEN\n");      buf.append(createVariableSqlPart(infoDescription, "new"));      buf.append("\n");      buf.append("    END IF;\n");      buf.append("    INSERT INTO " + this.replPrefix + "items (trans_key, dbId, tablename, guid,\n");      buf.append("                           db_action, db_catalog, db_schema, \n");      buf.append("                           content, oldContent, version) values \n");      buf.append("                           (CURRENT_TIMESTAMP,current_database(),\n");      buf.append("            TG_RELNAME, oid, TG_OP, NULL, current_schema(), newCont, \n");      buf.append("            oldCont, '").append(this.replVersion).append("');\n");      buf.append("    tmp = inet_client_addr();\n");      buf.append("\n");      buf.append("    IF (TG_OP = 'DELETE') THEN RETURN OLD;\n");      buf.append("    END IF;\n");      buf.append("    RETURN NEW;\n");      buf.append("END;\n");      buf.append("$").append(functionName).append("$ LANGUAGE 'plpgsql';\n");      buf.append("\n");      return buf.toString();   }   /**    * This method creates a trigger to detect INSERT DELETE and UPDATE    * operations on a particular table.    *     * @param infoDescription    *           the info object containing the necessary information for the    *           table.    * @return a String containing the sql update. It can be executed.    */   public String createTableTrigger(SqlDescription infoDescription, TableToWatchInfo tableToWatch) {      String triggerName = tableToWatch.getTrigger();      String replFlags = tableToWatch.getActions();      return createTableTrigger(infoDescription, triggerName, replFlags);   }      private String createTableTrigger(SqlDescription infoDescription, String triggerName, String replFlags) {      String tableName = infoDescription.getIdentity(); // should be the table                                                         // name      String functionName = tableName + "_f";      StringBuffer buf = new StringBuffer();      buf.append(createTableFunction(infoDescription, functionName));            // and now append the associated trigger ....      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("-- THE TRIGGER FOR THE replTest TABLE                                           \n");      buf.append("-- ---------------------------------------------------------------------------- \n");      buf.append("\n");      buf.append("-- DROP TRIGGER ").append(triggerName).append(" ON ").append(tableName).append(" CASCADE;\n");      buf.append("CREATE TRIGGER ").append(triggerName).append("\n");      buf.append("AFTER UPDATE OR DELETE OR INSERT\n");      buf.append("ON ").append(tableName).append("\n");      buf.append("FOR EACH ROW\n");      buf.append("EXECUTE PROCEDURE ").append(functionName).append("();\n");      buf.append("\n");      return buf.toString();   }   /**    * Adds a schema to be watched. By Oracle it would add triggers to the schema.     * @param catalog    * @param schema    * @throws Exception    */   public void addSchemaToWatch(Connection conn, String catalog, String schema) throws Exception {      // do nothing as a default   }   public boolean removeTrigger(String triggerName, String tableName, boolean isSchemaTrigger) {      if (isSchemaTrigger || triggerName == null) // we don't have schema triggers          return false;      try {         this.dbPool.update("DROP TRIGGER " + triggerName + " ON " + tableName + " CASCADE");         return true;      }      catch (Exception ex) {         return false;      }   }   public int wipeoutSchema(String catalog, String schema, boolean[] objectsToWipeout) throws Exception {      throw new Exception("cleanupSchema for '" + schema + "' can not be executed since not implemented");   }   /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#getContentFromGuid(java.lang.String, java.lang.String, java.lang.String, java.lang.String)    */   public String getContentFromGuid(String guid, String catalog, String schema, String table, I_AttributeTransformer transformer) throws Exception {      throw new Exception("SpecificPostgres.getContentFromGuid is not implemented yet for table='" + table + "' and guid='" + guid + "'");   }      /**    * returns true if the sequence exists already.    */   protected boolean sequenceExists(Connection conn, String sequenceName) throws Exception {      throw new Exception("The method sequenceExist has not been implemented yet. Please implement it");   }   /**    * returns true if the trigger exists already.    */   protected boolean triggerExists(Connection conn, String sequenceName) throws Exception {      throw new Exception("The method triggerExist has not been implemented yet. Please implement it");   }   /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#triggerExists(org.xmlBlaster.contrib.replication.TableToWatchInfo)    */   public boolean triggerExists(Connection conn, TableToWatchInfo tableToWatch) throws Exception {      throw new Exception("The method triggerExists is not implemented yet for the postgres implementation");   }   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -