⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationconverter.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
         throw new Exception("ReplicationConverter.addInfo: wrong number of columns: should be 11 but was " + numberOfColumns);            /*       * Note that this test implies that the replKeys come in a growing sequence. If this is not the case, this test is useless.       */      this.newReplKey = rs.getLong(1);      boolean markProcessed = false;            if (this.newReplKey <= this.oldReplKey) {         log.warning("the replication key '" + this.newReplKey + "' has already been processed since the former key was '" + this.oldReplKey + "'. It will be marked ");         markProcessed = true;      }      else {         this.oldReplKey = this.newReplKey;      }      // puts this in the metadata attributes of the message to be sent over the mom      this.event.getAttributeMap().put(REPL_KEY_ATTR, "" + this.newReplKey);      String transKey = rs.getString(2);      String dbId = rs.getString(3);      String tableName = rs.getString(4);      String guid = rs.getString(5);      String action = rs.getString(6);      String catalog = rs.getString(7);      String schema = rs.getString(8);      if (transKey == null)         log.severe("The transaction key was not set. Must be set in order to be processed correctly");      else {         if (transKey.equals("UNKNOWN")) {            String txt = "The transaction key has not been set at the time of entry collection. This is a bug which shall be fixed";            log.severe(txt);            throw new Exception(txt);         }      }      log.fine("sequence number '" + this.newReplKey + "' processing now for table '" + tableName + "' and transId='" + transKey + "'");      if (this.transactionId == null) {         this.transactionId = transKey;         this.allTransactions.add(transKey);      }      else {         if (!this.transactionId.equals(transKey)) {            // log.severe("the entry with replKey='" + this.newReplKey + "' tableName='" + tableName + "' with action='" + action + "' had transaction '" + transKey + "' but was expected '" + this.transactionId + "'");            log.fine("the entry with replKey='" + this.newReplKey + "' tableName='" + tableName + "' with action='" + action + "' had transaction '" + transKey + "' old transaction was '" + this.transactionId + "' (multiple transactions in one message)");            this.transactionId = transKey;            this.allTransactions.add(transKey);         }      }            // String newContent = rs.getString(9); // could be null      String newContent = null;      Clob clob = rs.getClob(9);      if (clob != null) {         long length = clob.length();         if (length > Integer.MAX_VALUE)            throw new Exception("ReplicationConverter.addInfo: the content is too big ('" + length + "' bytes) to fit in one msg, can not process");         byte[] buf = new byte[(int)length];         clob.getAsciiStream().read(buf);         newContent = new String(buf);      }      // String oldContent = rs.getString(10);      String oldContent = null;      clob = rs.getClob(10);      if (clob != null) {         long length = clob.length();         if (length > Integer.MAX_VALUE)            throw new Exception("ReplicationConverter.addInfo: the old content is too big ('" + length + "' bytes) to fit in one msg, can not process");         byte[] buf = new byte[(int)length];         clob.getAsciiStream().read(buf);         oldContent = new String(buf);      }      // check if it needs to read the new content explicitly, this is used for cases      // where it was not possible to fill with meat in the synchronous PL/SQL part.      if (newContent == null && (INSERT_ACTION.equals(action) || (UPDATE_ACTION.equals(action)))) {         if (guid == null)            log.severe("could not operate since no guid and no newContent on UPDATE or INSERT");         else            newContent = this.dbSpecific.getContentFromGuid(guid, catalog, schema, tableName, this.transformer);      }            String version = rs.getString(11);      if (this.sqlInfo.getRowCount() == 0L) {         if (this.transformer != null) {            Map attr = this.transformer.transform(rs, -1);            if (attr != null) {               this.sqlInfo.getDescription().addAttributes(attr);            }         }      }      if (what == ALL || what == ROW_ONLY) {         Map completeAttrs = new HashMap();         completeAttrs.put(TABLE_NAME_ATTR, tableName);         completeAttrs.put(REPL_KEY_ATTR, "" + this.newReplKey);         completeAttrs.put(TRANSACTION_ATTR, transKey);         completeAttrs.put(DB_ID_ATTR, dbId);         completeAttrs.put(GUID_ATTR, guid);         completeAttrs.put(CATALOG_ATTR, catalog);         completeAttrs.put(SCHEMA_ATTR, schema);         completeAttrs.put(VERSION_ATTR, version);         completeAttrs.put(ACTION_ATTR, action);         if (markProcessed)            completeAttrs.put(ALREADY_PROCESSED_ATTR, "true");         if (action.equalsIgnoreCase(CREATE_ACTION)) {            try {               log.info("addInfo: going to create a new table '" + tableName + "'");               boolean forceSend = false;               if (newContent != null) {                  String destination = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='_destination'");                  if (destination == null || destination.length() < 1)                     log.severe("The destination could not be extracted from '" + newContent + "'");                  else                     completeAttrs.put("_destination", destination);                  String forceSendTxt = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='_forceSend'");                  if (forceSendTxt != null && "true".equalsIgnoreCase(forceSendTxt.trim())) {                     forceSend = true;                     log.info("Force Send was set to 'true'");                  }               }               this.dbSpecific.readNewTable(catalog, schema, tableName, completeAttrs, this.sendInitialTableContent || forceSend);            }            catch (Exception ex) {               ex.printStackTrace();               log.severe("Could not correctly add trigger on table '" + tableName + "' : " + ex.getMessage());            }         }         else if (action.equalsIgnoreCase(DROP_ACTION)) {            SqlDescription description = this.sqlInfo.getDescription();             description.setCommand(action);            description.addAttributes(completeAttrs);         }         else if (action.equalsIgnoreCase(ALTER_ACTION)) {            SqlDescription description = this.sqlInfo.getDescription();             description.setCommand(action);            description.addAttributes(completeAttrs);            dbSpecific.addTrigger(conn, catalog, schema, tableName);         }         else if (action.equalsIgnoreCase(INSERT_ACTION)) {            SqlRow row = this.sqlInfo.fillOneRow(rs, newContent, this.transformer);            row.addAttributes(completeAttrs);         }         else if (action.equalsIgnoreCase(UPDATE_ACTION)) {            boolean doSend = true;            if (!this.sendUnchangedUpdates && oldContent.equals(newContent))               doSend = false;            if (doSend) {               completeAttrs.put(OLD_CONTENT_ATTR, oldContent);               SqlRow row = this.sqlInfo.fillOneRow(rs, newContent, this.transformer);               row.addAttributes(completeAttrs);            }            else               log.fine("an update with unchanged content was detected on table '" + tableName + "' and transId='" + this.transactionId + "': will not send it");         }         else if (action.equalsIgnoreCase(DELETE_ACTION)) {            SqlRow row = this.sqlInfo.fillOneRow(rs, oldContent, this.transformer);            row.addAttributes(completeAttrs);         }         else if (action.equalsIgnoreCase(STATEMENT_ACTION)) {            String sql = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + STATEMENT_ATTR + "'");            String statementPrio = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + STATEMENT_PRIO_ATTR + "'");            String maxEntries = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + MAX_ENTRIES_ATTR + "'");            String id = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + STATEMENT_ID_ATTR + "'");            String sqlTopic = ReplaceVariable.extractWithMatchingAttrs(newContent, "attr", " id='" + SQL_TOPIC_ATTR + "'");            this.sqlInfo.getDescription().setCommand(STATEMENT_ACTION);            this.sqlInfo.getDescription().setAttribute(ACTION_ATTR, STATEMENT_ACTION);            this.sqlInfo.getDescription().setAttribute(STATEMENT_ID_ATTR, id);            this.sqlInfo.getDescription().setAttribute(STATEMENT_ATTR, sql);            this.sqlInfo.getDescription().setAttribute(STATEMENT_PRIO_ATTR, statementPrio);            this.sqlInfo.getDescription().setAttribute(MAX_ENTRIES_ATTR, maxEntries);            this.sqlInfo.getDescription().setAttribute(SQL_TOPIC_ATTR, sqlTopic);         }      }      else         log.warning("The entry has not been sent since what='" + what + "'");   }   public void addInfo(Map attributeMap) throws Exception {      // nothing to be done here   }   /**    * This method is invoked before sending the message over the mom.    */   public int done() throws Exception {      int ret = this.sqlInfo.getRowCount();      boolean doSend = true;      if (ret < 1) {         // mark it to block delivery if it has no data and is not a STATEMENT         String command = this.sqlInfo.getDescription().getCommand();         if (command == null || REPLICATION_CMD.equals(command)) {            // puts this in the metadata attributes of the message to be sent over the mom            log.fine("setting property '" + IGNORE_MESSAGE + "' to inhibit sending of message for trans='" + this.transactionId + "'");            this.event.getAttributeMap().put(IGNORE_MESSAGE, "" + true);            doSend = false;         }      }      if (doSend) { // we put it in the attribute map not in the message itself         int numOfTransactions = this.allTransactions.size();          this.transSeq += numOfTransactions;         this.messageSeq++;         this.persistentInfo.put(this.transSeqPropertyName, "" + this.transSeq);         this.event.getAttributeMap().put(NUM_OF_TRANSACTIONS, "" + numOfTransactions);         this.event.getAttributeMap().put(MESSAGE_SEQ, "" + this.messageSeq);                  this.persistentInfo.put(this.messageSeqPropertyName, "" + this.messageSeq);         this.persistentInfo.put(this.oldReplKeyPropertyName, "" + this.oldReplKey);      }               String tmp = this.sqlInfo.toXml("");      this.out.write(tmp.getBytes());      this.out.flush();      this.sqlInfo = null;      return ret;   }   public void setOutputStream(OutputStream out, String command, String ident, ChangeEvent event) throws Exception {      this.out = out;      this.transactionId = null;      this.allTransactions.clear();      this.sqlInfo = new SqlInfo(this.info);      SqlDescription description = new SqlDescription(this.info);      description.setCommand(REPLICATION_CMD);      if (ident != null)         description.setIdentity(ident);      this.sqlInfo.setDescription(description);      this.event = event;   }   public String getPostStatement() {      if (this.transactionId == null) {         if (this.sqlInfo != null)            log.severe("No transaction id has been found for " + this.sqlInfo.toXml(""));         return null;      }      String statement = null;      if (this.allTransactions.size() == 1) {         statement = "DELETE FROM " + this.replPrefix + "ITEMS WHERE TRANS_KEY='" + this.transactionId + "'";      }      else if (this.allTransactions.size() > 1) {         StringBuffer buf = new StringBuffer(1024);         buf.append("DELETE FROM ").append(this.replPrefix).append("ITEMS WHERE TRANS_KEY IN (");         for (int i=0; i < this.allTransactions.size(); i++) {            if (i > 0)               buf.append(",");            buf.append("'").append(this.allTransactions.get(i)).append("'");         }         buf.append(")");         statement = buf.toString();      }      else {         log.severe("The transaction for this message was not defined");      }      return statement;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -