⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationwriter.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
         ClientProperty filenameProp =  description.getAttribute(ReplicationConstants.FILENAME_ATTR);         String filename = null;         if (filenameProp != null)            filename = filenameProp.getStringValue();         if (filename != null && filename.length() > 0) {            deleteFiles(filename);         }         else            log.warning("Could not cleanup since the '" + ReplicationConstants.FILENAME_ATTR + "' attribute was not set");         return;      }            String command = description.getCommand();      if (command.equals(INITIAL_XML_CMD)) {         // "_filename"         // "_timestamp"         // XBMessage.get                  // "JMSXGroupSeq";         // "JMSXGroupEof";         // "JMSXGroupEx";                  // XBConnectionMetaData.JMSX_GROUP_SEQ         // XBConnectionMetaData.JMSX_GROUP_EOF         // XBConnectionMetaData.JMSX_GROUP_EX         ClientProperty filename = description.getAttribute(FILENAME_ATTR);         ClientProperty timestamp = description.getAttribute(TIMESTAMP_ATTR);         ClientProperty contentProp = description.getAttribute(DUMP_CONTENT_ATTR);         ClientProperty groupSeq = description.getAttribute(XBConnectionMetaData.JMSX_GROUP_SEQ);         ClientProperty groupEof = description.getAttribute(XBConnectionMetaData.JMSX_GROUP_EOF);         ClientProperty groupEx = description.getAttribute(XBConnectionMetaData.JMSX_GROUP_EX);         byte[] content = contentProp.getBlobValue();         Map map = new HashMap();         if (filename != null)            map.put(FILENAME_ATTR, filename);         if (timestamp != null)            map.put(TIMESTAMP_ATTR, timestamp);         if (groupSeq != null)            map.put(XBConnectionMetaData.JMSX_GROUP_SEQ, groupSeq);         if (groupEof != null)            map.put(XBConnectionMetaData.JMSX_GROUP_EOF, groupEof);         if (groupEx != null)            map.put(XBConnectionMetaData.JMSX_GROUP_EX, groupEx);         String topic = "xmlDump";         this.updateDump(topic, new ByteArrayInputStream(content), map);         return;      }            String action = getStringAttribute(ACTION_ATTR, null, description);      String originalCatalog = getStringAttribute(CATALOG_ATTR, null, description);       String originalSchema = getStringAttribute(SCHEMA_ATTR, null, description);      String originalTable = getStringAttribute(TABLE_NAME_ATTR, null, description);      // these are still without consideration of the column      String catalog = this.mapper.getMappedCatalog(originalCatalog, originalSchema, originalTable, null, originalCatalog);      String schema = this.mapper.getMappedSchema(originalCatalog, originalSchema, originalTable, null, originalSchema);      String table = this.mapper.getMappedTable(originalCatalog, originalSchema, originalTable, null, originalTable);      String completeTableName = table;      if (schema != null && schema.length() > 1)         completeTableName = schema + "." + table;            if (log.isLoggable(Level.FINE))          log.fine("store invoked for \n" + dbInfo.toString());            if (this.nirvanaClient)         return;            if (isAllowedCommand(command)) {         Connection conn = this.keptConnection; // in case in the middle of a transaction !!         if (conn == null)            conn = this.pool.reserve();         else            log.info("Reusing stored connection since in an open transaction");         boolean oldAutoCommitKnown = false;         boolean oldAutoCommit = false;         boolean needRollback = true;         try {            if (!this.exceptionInTransaction) {               List rows = dbInfo.getRows();               oldAutoCommit = conn.getAutoCommit();               oldAutoCommitKnown = true;               conn.setAutoCommit(false); // everything will be handled within the same transaction                if (command.equalsIgnoreCase(REPLICATION_CMD)) {                  for (int i=0; i < rows.size(); i++) {                     SqlRow row = ((SqlRow)rows.get(i)).cloneRow();                     // TODO consistency check                     action = getStringAttribute(ACTION_ATTR, row, description);                     originalCatalog = getStringAttribute(CATALOG_ATTR, row, description);                     originalSchema = getStringAttribute(SCHEMA_ATTR, row, description);                     originalTable = getStringAttribute(TABLE_NAME_ATTR, row, description);                     // row specific but still without considering colums                     catalog = this.mapper.getMappedCatalog(originalCatalog, originalSchema, originalTable, null, originalCatalog);                     schema = this.mapper.getMappedSchema(originalCatalog, originalSchema, originalTable, null, originalSchema);                     table = this.mapper.getMappedTable(originalCatalog, originalSchema, originalTable, null, originalTable);                     if (action == null)                        throw new Exception(ME + ".store: row with no action invoked '" + row.toXml("", true, false, true));                     int count = modifyColumnsIfNecessary(originalCatalog, originalSchema, originalTable, row);                     if (count != 0) {                        if (log.isLoggable(Level.FINE)) log.fine("modified '" + count  + "' entries");                     }                     if (log.isLoggable(Level.FINE)) log.fine("store: " + row.toXml("", true, true, true));                     SqlDescription desc = getTableDescription(catalog, schema, table, conn);                     boolean process = true;                     if (this.prePostStatement != null) // row is the modified column name                        process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, row);                     if (process) {                        if (action.equalsIgnoreCase(INSERT_ACTION)) {                           desc.insert(conn, row);                        }                        else if (action.equalsIgnoreCase(UPDATE_ACTION)) {                           desc.update(conn, row, this.parserForOldInUpdates);                        }                        else if (action.equalsIgnoreCase(DELETE_ACTION)) {                           desc.delete(conn, row);                        }                        else { // TODO implement this possibility too                            if (action.equalsIgnoreCase(CREATE_ACTION) ||                                action.equalsIgnoreCase(ALTER_ACTION) ||                                action.equalsIgnoreCase(DROP_ACTION)) {                              throw new Exception("The execution of action='" + action + "' inside a multi-operation transaction is not implemented");                           }                           else // we don't throw an exception here to be backwards compatible. In future we can throw one                              log.severe("The action='" + action + "' is not recognized as an SQL operation and will therefore not be executed");                        }                        if (this.prePostStatement != null)                           this.prePostStatement.postStatement(action, conn, dbInfo, desc, row);                     }                  }               }               else { // then it is a CREATE / DROP / ALTER or DUMP command (does not have any rows associated)                  if (action.equalsIgnoreCase(CREATE_ACTION)) {                     if (this.doCreate) {                        // check if the table already exists ...                        ResultSet rs = conn.getMetaData().getTables(catalog, schema, table, null);                        boolean tableExistsAlready = rs.next();                        rs.close();                        boolean invokeCreate = true;                        completeTableName = table;                        if (schema != null && schema.length() > 1)                           completeTableName = schema + "." + table;                        boolean process = true;                        SqlDescription desc = getTableDescription(catalog, schema, table, conn);                        if (this.prePostStatement != null) {                           final SqlRow currentRow = null;                           process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, currentRow);                        }                                                if (process) {                           if (tableExistsAlready) {                              if (!this.overwriteTables) {                                 throw new Exception("ReplicationStorer.store: the table '" + completeTableName + "' exists already and 'replication.overwriteTables' is set to 'false'");                              }                              else {                                 if (this.recreateTables) {                                    log.warning("store: the table '" + completeTableName + "' exists already. 'replication.overwriteTables' is set to 'true': will drop the table and recreate it");                                    Statement st = conn.createStatement();                                    st.executeUpdate("DROP TABLE " + completeTableName);                                    st.close();                                 }                                 else {                                    log.warning("store: the table '" + completeTableName + "' exists already. 'replication.overwriteTables' is set to 'true' and 'replication.recreateTables' is set to false. Will only delete contents of table but keep the old structure");                                    invokeCreate = false;                                 }                              }                           }                           String sql = null;                           if (invokeCreate) {                              sql = this.dbSpecific.getCreateTableStatement(description, this.mapper);                              log.info("CREATE STATEMENT: '" + sql + "'");                           }                           else {                              sql = "DELETE FROM " + completeTableName;                              log.info("CLEANING UP TABLE '" + completeTableName + "'");                           }                           Statement st = conn.createStatement();                           try {                              st.executeUpdate(sql);                           }                           finally {                              st.close();                           }                           if (this.prePostStatement != null) {                              final SqlRow currentRow = null;                              this.prePostStatement.postStatement(action, conn, dbInfo, desc, currentRow);                           }                        }                     }                     else                        log.fine("CREATE is disabled for this writer");                  }                  else if (action.equalsIgnoreCase(DROP_ACTION)) {                     if (this.doDrop) {                        boolean process = true;                        SqlDescription desc = getTableDescription(catalog, schema, table, conn);                        if (this.prePostStatement != null) {                           final SqlRow currentRow = null;                           process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, currentRow);                        }                        if (process) {                           completeTableName = table;                           if (schema != null && schema.length() > 1)                              completeTableName = schema + "." + table;                           String sql = "DROP TABLE " + completeTableName;                           Statement st = conn.createStatement();                           try {                              st.executeUpdate(sql);                           }                           catch (SQLException e) {                              // this is currently only working on oracle: TODO make it work for other DB too.                              if (e.getMessage().indexOf("does not exist") > -1)                                 log.warning("table '" + completeTableName + "' was not found and could therefore not be dropped. Continuing anyway");                              else                                 throw e;                           }                           finally {                              st.close();                           }                           if (this.prePostStatement != null) {                              final SqlRow currentRow = null;                              this.prePostStatement.postStatement(action, conn, dbInfo, desc, currentRow);                           }                        }                     }                     else                        log.fine("DROP is disabled for this writer");                  }                  else if (action.equalsIgnoreCase(ALTER_ACTION)) {                     if (this.doAlter) {                        boolean process = true;                        SqlDescription desc = getTableDescription(catalog, schema, table, conn);                        if (this.prePostStatement != null) {                           final SqlRow currentRow = null;                           process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, currentRow);                        }                        if (process) {                           log.severe("store: operation '" + action + "' invoked but not implemented yet '" + description.toXml(""));                           if (this.prePostStatement != null) {                              final SqlRow currentRow = null;                              this.prePostStatement.postStatement(action, conn, dbInfo, desc, currentRow);                           }                        }                     }                     else                        log.fine("ALTER is disabled for this writer");                  }                  else if (action.equalsIgnoreCase(DUMP_ACTION)) {                     log.severe("store: operation '" + action + "' invoked but not implemented yet '" + description.toXml(""));                     // store entry on the file system at a location specified by the 'replication.importLocation' property                     // the name of the file is stored in DUMP_FILENAME (it will be used to store the file)                  }                  else if (action.equalsIgnoreCase(STATEMENT_ACTION)) {                     // since it could alter some table                     clearSqlInfoCache();                     if (this.doStatement) {                        String sql = getStringAttribute(STATEMENT_ATTR, null, description);                        String tmp = getStringAttribute(MAX_ENTRIES_ATTR, null, description);                                                long maxResponseEntries = 0L;                        if (tmp != null)                           maxResponseEntries = Long.parseLong(tmp.trim());                        // tmp = getStringAttribute(STATEMENT_PRIO, null, description);                        final boolean isHighPrio = false; // does not really matter here                            final boolean isMaster = false;                        final String noId = null; // no statement id necessary here                        final String noTopic = null; // no need for a topic in the slave                        byte[] response = null;                        Exception ex = null;                        try {                           response = this.dbSpecific.broadcastStatement(sql, maxResponseEntries, isHighPrio, isMaster, noTopic, noId);                        }                        catch (Exception e) {                           ex = e;                           response = "".getBytes();                        }                        if (true) { // TODO add here the possibility to block sending of messages                           I_ChangePublisher momEngine = (I_ChangePublisher)this.info.getObject("org.xmlBlaster.contrib.dbwriter.mom.MomEventEngine");                           if (momEngine == null)                              throw new Exception("ReplicationWriter: the momEngine used can not handle publishes");                           String statementId = getStringAttribute(STATEMENT_ID_ATTR, null, description);                           String sqlTopic = getStringAttribute(SQL_TOPIC_ATTR, null, description);                           HashMap map = new HashMap();                           map.put(STATEMENT_ID_ATTR, statementId);                           if (ex != null)                              map.put(ReplicationConstants.EXCEPTION_ATTR, ex.getMessage());                           momEngine.publish(sqlTopic, response, map);                        }                        else                           log.info("statement '" + sql + "' resulted in response '" + new String(response));                        if (ex != null) // now that we notified the server we can throw the exception                           throw ex;                     }                     else                        log.fine("STATEMENT is disabled for this writer");                  }                  else {                     log.severe("store: description with unknown action '" + action + "' invoked '" + description.toXml(""));                  }               }               try {                  if (!keepTransactionOpen) {                     conn.commit();                     log.fine("Transaction has been committed");                  }                  else                     log.fine("The transaction will still be kept open");                  needRollback = false;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -