📄 replicationwriter.java
字号:
ClientProperty filenameProp = description.getAttribute(ReplicationConstants.FILENAME_ATTR); String filename = null; if (filenameProp != null) filename = filenameProp.getStringValue(); if (filename != null && filename.length() > 0) { deleteFiles(filename); } else log.warning("Could not cleanup since the '" + ReplicationConstants.FILENAME_ATTR + "' attribute was not set"); return; } String command = description.getCommand(); if (command.equals(INITIAL_XML_CMD)) { // "_filename" // "_timestamp" // XBMessage.get // "JMSXGroupSeq"; // "JMSXGroupEof"; // "JMSXGroupEx"; // XBConnectionMetaData.JMSX_GROUP_SEQ // XBConnectionMetaData.JMSX_GROUP_EOF // XBConnectionMetaData.JMSX_GROUP_EX ClientProperty filename = description.getAttribute(FILENAME_ATTR); ClientProperty timestamp = description.getAttribute(TIMESTAMP_ATTR); ClientProperty contentProp = description.getAttribute(DUMP_CONTENT_ATTR); ClientProperty groupSeq = description.getAttribute(XBConnectionMetaData.JMSX_GROUP_SEQ); ClientProperty groupEof = description.getAttribute(XBConnectionMetaData.JMSX_GROUP_EOF); ClientProperty groupEx = description.getAttribute(XBConnectionMetaData.JMSX_GROUP_EX); byte[] content = contentProp.getBlobValue(); Map map = new HashMap(); if (filename != null) map.put(FILENAME_ATTR, filename); if (timestamp != null) map.put(TIMESTAMP_ATTR, timestamp); if (groupSeq != null) map.put(XBConnectionMetaData.JMSX_GROUP_SEQ, groupSeq); if (groupEof != null) map.put(XBConnectionMetaData.JMSX_GROUP_EOF, groupEof); if (groupEx != null) map.put(XBConnectionMetaData.JMSX_GROUP_EX, groupEx); String topic = "xmlDump"; this.updateDump(topic, new ByteArrayInputStream(content), map); return; } String action = getStringAttribute(ACTION_ATTR, null, description); String originalCatalog = getStringAttribute(CATALOG_ATTR, null, description); String originalSchema = getStringAttribute(SCHEMA_ATTR, null, description); String originalTable = getStringAttribute(TABLE_NAME_ATTR, null, description); // these are still without consideration of the column String catalog = this.mapper.getMappedCatalog(originalCatalog, originalSchema, originalTable, null, originalCatalog); String schema = this.mapper.getMappedSchema(originalCatalog, originalSchema, originalTable, null, originalSchema); String table = this.mapper.getMappedTable(originalCatalog, originalSchema, originalTable, null, originalTable); String completeTableName = table; if (schema != null && schema.length() > 1) completeTableName = schema + "." + table; if (log.isLoggable(Level.FINE)) log.fine("store invoked for \n" + dbInfo.toString()); if (this.nirvanaClient) return; if (isAllowedCommand(command)) { Connection conn = this.keptConnection; // in case in the middle of a transaction !! if (conn == null) conn = this.pool.reserve(); else log.info("Reusing stored connection since in an open transaction"); boolean oldAutoCommitKnown = false; boolean oldAutoCommit = false; boolean needRollback = true; try { if (!this.exceptionInTransaction) { List rows = dbInfo.getRows(); oldAutoCommit = conn.getAutoCommit(); oldAutoCommitKnown = true; conn.setAutoCommit(false); // everything will be handled within the same transaction if (command.equalsIgnoreCase(REPLICATION_CMD)) { for (int i=0; i < rows.size(); i++) { SqlRow row = ((SqlRow)rows.get(i)).cloneRow(); // TODO consistency check action = getStringAttribute(ACTION_ATTR, row, description); originalCatalog = getStringAttribute(CATALOG_ATTR, row, description); originalSchema = getStringAttribute(SCHEMA_ATTR, row, description); originalTable = getStringAttribute(TABLE_NAME_ATTR, row, description); // row specific but still without considering colums catalog = this.mapper.getMappedCatalog(originalCatalog, originalSchema, originalTable, null, originalCatalog); schema = this.mapper.getMappedSchema(originalCatalog, originalSchema, originalTable, null, originalSchema); table = this.mapper.getMappedTable(originalCatalog, originalSchema, originalTable, null, originalTable); if (action == null) throw new Exception(ME + ".store: row with no action invoked '" + row.toXml("", true, false, true)); int count = modifyColumnsIfNecessary(originalCatalog, originalSchema, originalTable, row); if (count != 0) { if (log.isLoggable(Level.FINE)) log.fine("modified '" + count + "' entries"); } if (log.isLoggable(Level.FINE)) log.fine("store: " + row.toXml("", true, true, true)); SqlDescription desc = getTableDescription(catalog, schema, table, conn); boolean process = true; if (this.prePostStatement != null) // row is the modified column name process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, row); if (process) { if (action.equalsIgnoreCase(INSERT_ACTION)) { desc.insert(conn, row); } else if (action.equalsIgnoreCase(UPDATE_ACTION)) { desc.update(conn, row, this.parserForOldInUpdates); } else if (action.equalsIgnoreCase(DELETE_ACTION)) { desc.delete(conn, row); } else { // TODO implement this possibility too if (action.equalsIgnoreCase(CREATE_ACTION) || action.equalsIgnoreCase(ALTER_ACTION) || action.equalsIgnoreCase(DROP_ACTION)) { throw new Exception("The execution of action='" + action + "' inside a multi-operation transaction is not implemented"); } else // we don't throw an exception here to be backwards compatible. In future we can throw one log.severe("The action='" + action + "' is not recognized as an SQL operation and will therefore not be executed"); } if (this.prePostStatement != null) this.prePostStatement.postStatement(action, conn, dbInfo, desc, row); } } } else { // then it is a CREATE / DROP / ALTER or DUMP command (does not have any rows associated) if (action.equalsIgnoreCase(CREATE_ACTION)) { if (this.doCreate) { // check if the table already exists ... ResultSet rs = conn.getMetaData().getTables(catalog, schema, table, null); boolean tableExistsAlready = rs.next(); rs.close(); boolean invokeCreate = true; completeTableName = table; if (schema != null && schema.length() > 1) completeTableName = schema + "." + table; boolean process = true; SqlDescription desc = getTableDescription(catalog, schema, table, conn); if (this.prePostStatement != null) { final SqlRow currentRow = null; process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, currentRow); } if (process) { if (tableExistsAlready) { if (!this.overwriteTables) { throw new Exception("ReplicationStorer.store: the table '" + completeTableName + "' exists already and 'replication.overwriteTables' is set to 'false'"); } else { if (this.recreateTables) { log.warning("store: the table '" + completeTableName + "' exists already. 'replication.overwriteTables' is set to 'true': will drop the table and recreate it"); Statement st = conn.createStatement(); st.executeUpdate("DROP TABLE " + completeTableName); st.close(); } else { log.warning("store: the table '" + completeTableName + "' exists already. 'replication.overwriteTables' is set to 'true' and 'replication.recreateTables' is set to false. Will only delete contents of table but keep the old structure"); invokeCreate = false; } } } String sql = null; if (invokeCreate) { sql = this.dbSpecific.getCreateTableStatement(description, this.mapper); log.info("CREATE STATEMENT: '" + sql + "'"); } else { sql = "DELETE FROM " + completeTableName; log.info("CLEANING UP TABLE '" + completeTableName + "'"); } Statement st = conn.createStatement(); try { st.executeUpdate(sql); } finally { st.close(); } if (this.prePostStatement != null) { final SqlRow currentRow = null; this.prePostStatement.postStatement(action, conn, dbInfo, desc, currentRow); } } } else log.fine("CREATE is disabled for this writer"); } else if (action.equalsIgnoreCase(DROP_ACTION)) { if (this.doDrop) { boolean process = true; SqlDescription desc = getTableDescription(catalog, schema, table, conn); if (this.prePostStatement != null) { final SqlRow currentRow = null; process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, currentRow); } if (process) { completeTableName = table; if (schema != null && schema.length() > 1) completeTableName = schema + "." + table; String sql = "DROP TABLE " + completeTableName; Statement st = conn.createStatement(); try { st.executeUpdate(sql); } catch (SQLException e) { // this is currently only working on oracle: TODO make it work for other DB too. if (e.getMessage().indexOf("does not exist") > -1) log.warning("table '" + completeTableName + "' was not found and could therefore not be dropped. Continuing anyway"); else throw e; } finally { st.close(); } if (this.prePostStatement != null) { final SqlRow currentRow = null; this.prePostStatement.postStatement(action, conn, dbInfo, desc, currentRow); } } } else log.fine("DROP is disabled for this writer"); } else if (action.equalsIgnoreCase(ALTER_ACTION)) { if (this.doAlter) { boolean process = true; SqlDescription desc = getTableDescription(catalog, schema, table, conn); if (this.prePostStatement != null) { final SqlRow currentRow = null; process = this.prePostStatement.preStatement(action, conn, dbInfo, desc, currentRow); } if (process) { log.severe("store: operation '" + action + "' invoked but not implemented yet '" + description.toXml("")); if (this.prePostStatement != null) { final SqlRow currentRow = null; this.prePostStatement.postStatement(action, conn, dbInfo, desc, currentRow); } } } else log.fine("ALTER is disabled for this writer"); } else if (action.equalsIgnoreCase(DUMP_ACTION)) { log.severe("store: operation '" + action + "' invoked but not implemented yet '" + description.toXml("")); // store entry on the file system at a location specified by the 'replication.importLocation' property // the name of the file is stored in DUMP_FILENAME (it will be used to store the file) } else if (action.equalsIgnoreCase(STATEMENT_ACTION)) { // since it could alter some table clearSqlInfoCache(); if (this.doStatement) { String sql = getStringAttribute(STATEMENT_ATTR, null, description); String tmp = getStringAttribute(MAX_ENTRIES_ATTR, null, description); long maxResponseEntries = 0L; if (tmp != null) maxResponseEntries = Long.parseLong(tmp.trim()); // tmp = getStringAttribute(STATEMENT_PRIO, null, description); final boolean isHighPrio = false; // does not really matter here final boolean isMaster = false; final String noId = null; // no statement id necessary here final String noTopic = null; // no need for a topic in the slave byte[] response = null; Exception ex = null; try { response = this.dbSpecific.broadcastStatement(sql, maxResponseEntries, isHighPrio, isMaster, noTopic, noId); } catch (Exception e) { ex = e; response = "".getBytes(); } if (true) { // TODO add here the possibility to block sending of messages I_ChangePublisher momEngine = (I_ChangePublisher)this.info.getObject("org.xmlBlaster.contrib.dbwriter.mom.MomEventEngine"); if (momEngine == null) throw new Exception("ReplicationWriter: the momEngine used can not handle publishes"); String statementId = getStringAttribute(STATEMENT_ID_ATTR, null, description); String sqlTopic = getStringAttribute(SQL_TOPIC_ATTR, null, description); HashMap map = new HashMap(); map.put(STATEMENT_ID_ATTR, statementId); if (ex != null) map.put(ReplicationConstants.EXCEPTION_ATTR, ex.getMessage()); momEngine.publish(sqlTopic, response, map); } else log.info("statement '" + sql + "' resulted in response '" + new String(response)); if (ex != null) // now that we notified the server we can throw the exception throw ex; } else log.fine("STATEMENT is disabled for this writer"); } else { log.severe("store: description with unknown action '" + action + "' invoked '" + description.toXml("")); } } try { if (!keepTransactionOpen) { conn.commit(); log.fine("Transaction has been committed"); } else log.fine("The transaction will still be kept open"); needRollback = false;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -