📄 replicationconverter.java
字号:
throw new Exception("ReplicationConverter.addInfo: wrong number of columns: should be 11 but was " + numberOfColumns); /* * Note that this test implies that the replKeys come in a growing sequence. If this is not the case, this test is useless. */ this.newReplKey = rs.getLong(1); boolean markProcessed = false; if (this.newReplKey <= this.oldReplKey) { log.warning("the replication key '" + this.newReplKey + "' has already been processed since the former key was '" + this.oldReplKey + "'. It will be marked "); markProcessed = true; } else { this.oldReplKey = this.newReplKey; } // puts this in the metadata attributes of the message to be sent over the mom this.event.getAttributeMap().put(REPL_KEY_ATTR, "" + this.newReplKey); String transKey = rs.getString(2); String dbId = rs.getString(3); String tableName = rs.getString(4); String guid = rs.getString(5); String action = rs.getString(6); String catalog = rs.getString(7); String schema = rs.getString(8); if (transKey == null) log.severe("The transaction key was not set. Must be set in order to be processed correctly"); else { if (transKey.equals("UNKNOWN")) { String txt = "The transaction key has not been set at the time of entry collection. This is a bug which shall be fixed"; log.severe(txt); throw new Exception(txt); } } log.fine("sequence number '" + this.newReplKey + "' processing now for table '" + tableName + "' and transId='" + transKey + "'"); if (this.transactionId == null) { this.transactionId = transKey; this.allTransactions.add(transKey); } else { if (!this.transactionId.equals(transKey)) { // log.severe("the entry with replKey='" + this.newReplKey + "' tableName='" + tableName + "' with action='" + action + "' had transaction '" + transKey + "' but was expected '" + this.transactionId + "'"); log.fine("the entry with replKey='" + this.newReplKey + "' tableName='" + tableName + "' with action='" + action + "' had transaction '" + transKey + "' old transaction was '" + this.transactionId + "' (multiple transactions in one message)"); this.transactionId = transKey; this.allTransactions.add(transKey); } } // String newContent = rs.getString(9); // could be null String newContent = null; Clob clob = rs.getClob(9); if (clob != null) { long length = clob.length(); if (length > Integer.MAX_VALUE) throw new Exception("ReplicationConverter.addInfo: the content is too big ('" + length + "' bytes) to fit in one msg, can not process"); byte[] buf = new byte[(int)length]; clob.getAsciiStream().read(buf); newContent = new String(buf); } // String oldContent = rs.getString(10); String oldContent = null; clob = rs.getClob(10); if (clob != null) { long length = clob.length(); if (length > Integer.MAX_VALUE) throw new Exception("ReplicationConverter.addInfo: the old content is too big ('" + length + "' bytes) to fit in one msg, can not process"); byte[] buf = new byte[(int)length]; clob.getAsciiStream().read(buf); oldContent = new String(buf); } // check if it needs to read the new content explicitly, this is used for cases // where it was not possible to fill with meat in the synchronous PL/SQL part. if (newContent == null && (INSERT_ACTION.equals(action) || (UPDATE_ACTION.equals(action)))) { if (guid == null) log.severe("could not operate since no guid and no newContent on UPDATE or INSERT"); else newContent = this.dbSpecific.getContentFromGuid(guid, catalog, schema, tableName, this.transformer); } String version = rs.getString(11); if (this.sqlInfo.getRowCount() == 0L) { if (this.transformer != null) { Map attr = this.transformer.transform(rs, -1); if (attr != null) { this.sqlInfo.getDescription().addAttributes(attr); } } } if (what == ALL || what == ROW_ONLY) { Map completeAttrs = new HashMap(); completeAttrs.put(TABLE_NAME_ATTR, tableName); completeAttrs.put(REPL_KEY_ATTR, "" + this.newReplKey); completeAttrs.put(TRANSACTION_ATTR, transKey); completeAttrs.put(DB_ID_ATTR, dbId); completeAttrs.put(GUID_ATTR, guid); completeAttrs.put(CATALOG_ATTR, catalog); completeAttrs.put(SCHEMA_ATTR, schema); completeAttrs.put(VERSION_ATTR, version); completeAttrs.put(ACTION_ATTR, action); if (markProcessed) completeAttrs.put(ALREADY_PROCESSED_ATTR, "true"); if (action.equalsIgnoreCase(CREATE_ACTION)) { try { log.info("addInfo: going to create a new table '" + tableName + "'"); boolean forceSend = false; if (newContent != null) { String destination = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='_destination'"); if (destination == null || destination.length() < 1) log.severe("The destination could not be extracted from '" + newContent + "'"); else completeAttrs.put("_destination", destination); String forceSendTxt = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='_forceSend'"); if (forceSendTxt != null && "true".equalsIgnoreCase(forceSendTxt.trim())) { forceSend = true; log.info("Force Send was set to 'true'"); } } this.dbSpecific.readNewTable(catalog, schema, tableName, completeAttrs, this.sendInitialTableContent || forceSend); } catch (Exception ex) { ex.printStackTrace(); log.severe("Could not correctly add trigger on table '" + tableName + "' : " + ex.getMessage()); } } else if (action.equalsIgnoreCase(DROP_ACTION)) { SqlDescription description = this.sqlInfo.getDescription(); description.setCommand(action); description.addAttributes(completeAttrs); } else if (action.equalsIgnoreCase(ALTER_ACTION)) { SqlDescription description = this.sqlInfo.getDescription(); description.setCommand(action); description.addAttributes(completeAttrs); dbSpecific.addTrigger(conn, catalog, schema, tableName); } else if (action.equalsIgnoreCase(INSERT_ACTION)) { SqlRow row = this.sqlInfo.fillOneRow(rs, newContent, this.transformer); row.addAttributes(completeAttrs); } else if (action.equalsIgnoreCase(UPDATE_ACTION)) { boolean doSend = true; if (!this.sendUnchangedUpdates && oldContent.equals(newContent)) doSend = false; if (doSend) { completeAttrs.put(OLD_CONTENT_ATTR, oldContent); SqlRow row = this.sqlInfo.fillOneRow(rs, newContent, this.transformer); row.addAttributes(completeAttrs); } else log.fine("an update with unchanged content was detected on table '" + tableName + "' and transId='" + this.transactionId + "': will not send it"); } else if (action.equalsIgnoreCase(DELETE_ACTION)) { SqlRow row = this.sqlInfo.fillOneRow(rs, oldContent, this.transformer); row.addAttributes(completeAttrs); } else if (action.equalsIgnoreCase(STATEMENT_ACTION)) { String sql = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + STATEMENT_ATTR + "'"); String statementPrio = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + STATEMENT_PRIO_ATTR + "'"); String maxEntries = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + MAX_ENTRIES_ATTR + "'"); String id = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + STATEMENT_ID_ATTR + "'"); String sqlTopic = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + SQL_TOPIC_ATTR + "'"); this.sqlInfo.getDescription().setCommand(STATEMENT_ACTION); this.sqlInfo.getDescription().setAttribute(ACTION_ATTR, STATEMENT_ACTION); this.sqlInfo.getDescription().setAttribute(STATEMENT_ID_ATTR, id); this.sqlInfo.getDescription().setAttribute(STATEMENT_ATTR, sql); this.sqlInfo.getDescription().setAttribute(STATEMENT_PRIO_ATTR, statementPrio); this.sqlInfo.getDescription().setAttribute(MAX_ENTRIES_ATTR, maxEntries); this.sqlInfo.getDescription().setAttribute(SQL_TOPIC_ATTR, sqlTopic); } } else log.warning("The entry has not been sent since what='" + what + "'"); } public void addInfo(Map attributeMap) throws Exception { // nothing to be done here } /** * This method is invoked before sending the message over the mom. */ public int done() throws Exception { int ret = this.sqlInfo.getRowCount(); boolean doSend = true; if (ret < 1) { // mark it to block delivery if it has no data and is not a STATEMENT String command = this.sqlInfo.getDescription().getCommand(); if (command == null || REPLICATION_CMD.equals(command)) { // puts this in the metadata attributes of the message to be sent over the mom log.fine("setting property '" + IGNORE_MESSAGE + "' to inhibit sending of message for trans='" + this.transactionId + "'"); this.event.getAttributeMap().put(IGNORE_MESSAGE, "" + true); doSend = false; } } if (doSend) { // we put it in the attribute map not in the message itself int numOfTransactions = this.allTransactions.size(); this.transSeq += numOfTransactions; this.messageSeq++; this.persistentInfo.put(this.transSeqPropertyName, "" + this.transSeq); this.event.getAttributeMap().put(NUM_OF_TRANSACTIONS, "" + numOfTransactions); this.event.getAttributeMap().put(MESSAGE_SEQ, "" + this.messageSeq); this.persistentInfo.put(this.messageSeqPropertyName, "" + this.messageSeq); this.persistentInfo.put(this.oldReplKeyPropertyName, "" + this.oldReplKey); } String tmp = this.sqlInfo.toXml(""); this.out.write(tmp.getBytes()); this.out.flush(); this.sqlInfo = null; return ret; } public void setOutputStream(OutputStream out, String command, String ident, ChangeEvent event) throws Exception { this.out = out; this.transactionId = null; this.allTransactions.clear(); this.sqlInfo = new SqlInfo(this.info); SqlDescription description = new SqlDescription(this.info); description.setCommand(REPLICATION_CMD); if (ident != null) description.setIdentity(ident); this.sqlInfo.setDescription(description); this.event = event; } public String getPostStatement() { if (this.transactionId == null) { if (this.sqlInfo != null) log.severe("No transaction id has been found for " + this.sqlInfo.toXml("")); return null; } String statement = null; if (this.allTransactions.size() == 1) { statement = "DELETE FROM " + this.replPrefix + "ITEMS WHERE TRANS_KEY='" + this.transactionId + "'"; } else if (this.allTransactions.size() > 1) { StringBuffer buf = new StringBuffer(1024); buf.append("DELETE FROM ").append(this.replPrefix).append("ITEMS WHERE TRANS_KEY IN ("); for (int i=0; i < this.allTransactions.size(); i++) { if (i > 0) buf.append(","); buf.append("'").append(this.allTransactions.get(i)).append("'"); } buf.append(")"); statement = buf.toString(); } else { log.severe("The transaction for this message was not defined"); } return statement; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -