⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationwriter.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
               }               catch (SQLException ex) {                  log.severe("store: an sql exception occured when trying to commit: " + ex.getMessage());                  String txt = "Error code:'" + ex.getErrorCode() + "' state='" + ex.getSQLState() + "' localizedMsg='" + ex.getLocalizedMessage() + "'";                  log.severe(txt);                  log.severe(Global.getStackTraceAsString(ex));                  throw (Exception)ex;               }               catch (Throwable ex) {                  log.severe("store: an exception occured when trying to commit: " + ex.getMessage());                  log.severe(Global.getStackTraceAsString(ex));                  if (ex instanceof Exception)                     throw (Exception)ex;                  else                     throw new Exception(ex);               }            }            else               log.warning("An exception occured in this transaction previously, not doing anything here (no storage will be done)");         }         catch (SQLException ex) {            this.exceptionInTransaction = true;            log.severe("An exception occured when trying storing the entry." + ex.getMessage());            String txt = "Error code:'" + ex.getErrorCode() + "' state='" + ex.getSQLState() + "' localizedMsg='" + ex.getLocalizedMessage() + "'";            log.severe(txt);            log.severe(Global.getStackTraceAsString(ex));            // since if the structure has changed we may want to re-read it             removeTableDescriptionFromCache(catalog, schema, table);            throw ex;         }         catch (Exception ex) {            this.exceptionInTransaction = true;            log.severe("An exception occured when trying storing the entry." + ex.getMessage());            log.severe(Global.getStackTraceAsString(ex));            removeTableDescriptionFromCache(catalog, schema, table);            throw ex;         }         catch (Throwable ex) {            this.exceptionInTransaction = true;            log.severe("A Throwable exception occured when trying storing the entry." + ex.getMessage());            log.severe(Global.getStackTraceAsString(ex));            removeTableDescriptionFromCache(catalog, schema, table);            throw new Exception(ex);         }         finally {            if (conn != null) {               if (needRollback && !keepTransactionOpen) {                  try {                     conn.rollback();                     log.fine("Transaction rolled back");                  }                  catch (Throwable ex) {                     log.severe("store: an exception occured when trying to rollback: " + ex.getMessage());                     log.severe(Global.getStackTraceAsString(ex));                  }                  finally {                     conn = SpecificDefault.removeFromPool(conn, SpecificDefault.ROLLBACK_NO, this.pool);                  }               }               if (oldAutoCommitKnown && conn != null) {                  try {                     if (oldAutoCommit)                        conn.setAutoCommit(oldAutoCommit);                  }                  catch (Throwable ex) {                     log.severe("store: an exception occured when reverting to original autocommit settings: " + ex.getMessage());                     log.severe(Global.getStackTraceAsString(ex));                  }               }               if (keepTransactionOpen) // we need to keep the connection                  this.keptConnection = conn;               else {                  conn = SpecificDefault.releaseIntoPool(conn, SpecificDefault.COMMIT_NO, this.pool);                  this.exceptionInTransaction = false; // we make sure to clear this flag               }            }         }      }      else {         log.severe("store with not command. The entry will be ignored. " + dbInfo.toString());      }   }   private final String getKey(String catalog, String schema, String tableName) {      StringBuffer buf = new StringBuffer(256);      catalog = (catalog == null) ? "" : catalog.trim();      schema = (schema == null) ? "" : schema.trim();      tableName = (tableName == null) ? "" : tableName.trim();      buf.append(catalog).append(".").append(schema).append(".").append(tableName);      return buf.toString();   }   private final synchronized SqlInfo getTableDescriptionFromCache(String catalog, String schema, String tableName) {      return (SqlInfo)this.sqlInfoCache.get(getKey(catalog, schema, tableName));   }      private final synchronized void removeTableDescriptionFromCache(String catalog, String schema, String tableName) {      sqlInfoCache.remove(getKey(catalog, schema, tableName));   }      private final synchronized void addToSqlInfoCache(SqlInfo sqlInfo) {      if (this.sqlInfoCache.size() > this.sqlInfoCacheMaxSize) {         log.warning("The maximum cache size for the sqlInfo objects of " + this.sqlInfoCacheMaxSize + " entries has already been reached");         return;      }      SqlDescription desc = sqlInfo.getDescription();      String catalog = desc.getCatalog();      String schema = desc.getSchema();      String table = desc.getIdentity();      String key = getKey(catalog, schema, table);      this.sqlInfoCache.put(key, sqlInfo);   }      private final synchronized void clearSqlInfoCache() {      this.sqlInfoCache.clear();   }      /**    * Returns the structure information of the table in question. This could be cached to get    * better performance.     *     * @param schema    * @param tableName    * @param conn    * @return    * @throws Exception    */   private synchronized SqlDescription getTableDescription(String catalog, String schema, String tableName, Connection conn) throws Exception {      SqlInfo sqlInfo = getTableDescriptionFromCache(catalog, schema, tableName);      if (sqlInfo == null) {         sqlInfo = new SqlInfo(this.info);         if (sqlInfo.fillMetadata(conn, catalog, schema, tableName, null, null))            addToSqlInfoCache(sqlInfo);      }      return sqlInfo.getDescription();   }   private final String getCompleteFileName(String filename) {      return this.importLocation + File.separator + filename;   }      private void deleteFiles(String filename) {      String completeFilename = getCompleteFileName(filename);      if (!this.keepDumpFiles) {         File fileToDelete = new File(completeFilename);         boolean del = fileToDelete.delete();         if (!del)            log.warning("could not delete the file '" + completeFilename + "' please delete it manually");      }   }      private void deleteFiles(Map attrMap) {      ClientProperty filenameProp =  (ClientProperty)attrMap.get(ReplicationConstants.FILENAME_ATTR);      String filename = null;      if (filenameProp != null)         filename = filenameProp.getStringValue();      if (filename != null && filename.length() > 0) {         deleteFiles(filename);      }      else         log.warning("Could not cleanup since the '" + ReplicationConstants.FILENAME_ATTR + "' attribute was not set");   }      /**    * This is invoked for dump files    */   private void updateDump(String topic, InputStream is, Map attrMap) throws Exception {      clearSqlInfoCache();      ClientProperty prop = (ClientProperty)attrMap.get(FILENAME_ATTR);      String filename = null;      if (prop == null) {         log.warning("The property '" + FILENAME_ATTR + "' has not been found. Will choose an own temporary one");         filename = "tmpFilename.dmp";      }      else          filename = prop.getStringValue();      log.info("'" + topic + "' dumping file '" + filename + "' on '" + this.importLocation + "'");      // will now write to the file system      this.callback.update(topic, is, attrMap);      // and now perform an import of the DB      boolean isEof = true;      boolean isException = false;      int seqNumber = -1;      String exTxt = "";      prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_SEQ, attrMap);      if (prop != null) {         seqNumber = prop.getIntValue();         prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_EOF, attrMap);         if (prop == null) {            isEof = false;         }         else {            prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_EX, attrMap);            if (prop != null) {               exTxt = prop.getStringValue();               isException = true;            }         }      }      log.info("'" + topic + "' dumped file '" + filename + "' on '" + this.importLocation + "' seq nr. '" + seqNumber + "' ex='" + exTxt + "'");            if (isEof && !isException) {         this.dbSpecific.initialCommandPre();         if (this.hasInitialCmd) {            String completeFilename = getCompleteFileName(filename);                        if (this.schemaToWipeout != null) {               log.info("Going to clean up the schema '" + this.schemaToWipeout);               final String catalog = null;               try {                  this.dbSpecific.wipeoutSchema(catalog, this.schemaToWipeout, null);               }               catch (Exception ex) {                  log.severe("Could not clean up completely the schema");                  log.severe(Global.getStackTraceAsString(ex));               }            }            String version = null;            this.dbSpecific.initialCommand(null, completeFilename, version);         }         else            log.info("since no 'replication.initialCmd' property defined, the initial command will not be executed (not either the wipeout of the schema)");      }   }   private void updateManualTransfer(String topic, InputStream is, Map attrMap) throws Exception {      ClientProperty subDirProp = (ClientProperty)attrMap.get(ReplicationConstants.INITIAL_DATA_ID);      if (subDirProp == null)         throw new Exception("updateManualTransfer: the mandatory property '" + ReplicationConstants.INITIAL_DATA_ID + "' was not found in the message");      String subDirName = subDirProp.getStringValue();      if (subDirName == null || subDirName.trim().length() < 1)         throw new Exception("updateManualTransfer: the mandatory property '" + ReplicationConstants.INITIAL_DATA_ID + "' was empty");            String initialFilesLocation = this.info.get("replication.initialFilesLocation", "${user.home}/tmp");      XmlScriptParser xmlScriptParser = new XmlScriptParser();      xmlScriptParser.init(new Global(), null, null);      DbWriter dbWriter = (DbWriter)this.info.getObject("org.xmlBlaster.contrib.dbwriter.DbWriter");      if (dbWriter == null)         throw new Exception("The DbWriter is not set in the info");            File dirWhereToStore = ReplManagerPlugin.checkExistance(initialFilesLocation);      File subDir = new File(dirWhereToStore, subDirName);      if (!subDir.exists()) {         String txt = "directory '" + subDir.getAbsolutePath() + "' does not exist";         log.severe(txt);         throw new Exception(txt);      }      if (!subDir.isDirectory()) {         String txt = "file '" + subDir.getAbsolutePath() + "' is not a directory";         log.severe(txt);         throw new Exception(txt);      }            String[] files = subDir.list();      log.info("retreiving '" + files.length + "' manual transferred data files from directory '" + subDir.getAbsolutePath() + "'");      // alphabetical order guarantees correct sequence.      TreeSet set = new TreeSet();      for (int i=0; i < files.length; i++)         set.add(files[i]);      files = (String[])set.toArray(new String[set.size()]);            for (int i=0; i < files.length; i++) {         File file = new File(subDir, files[i]);         if (!file.exists())            throw new Exception("The entry nr. '" + i + "': file '" + file.getAbsolutePath() + "' does not exist");         if (!file.isFile())            throw new Exception("The entry nr. '" + i + "': file '" + file.getAbsolutePath() + "' is not a file");         FileInputStream fis = new FileInputStream(file);         MsgInfo[] msgInfos = xmlScriptParser.parse(fis);         log.info("Processing entry " + i + " of " + files.length + ": '" + file.getAbsoluteFile() + "' which has " + msgInfos.length + "' msg");         for (int j=0; j < msgInfos.length; j++) {            MsgUnitRaw[] messages = msgInfos[j].getMessageArr();            for (int k=0; k < messages.length; k++) {               MsgUnit msgUnit = new MsgUnit(new Global(), messages[k].getKey(), messages[k].getContent(), messages[k].getQos());               // MsgUnit msgUnit = (MsgUnit)messages[k].getMsgUnit();               if (msgUnit == null)                  throw new Exception("The message unit for '" + file.getAbsoluteFile() + "' is null");               if (msgUnit.getQosData() == null)                  throw new Exception("The qos for message unit of '" + file.getAbsoluteFile() + "' is null");               Map subMap = msgUnit.getQosData().getClientProperties();               byte[] subContent = msgUnit.getContent();               dbWriter.update(topic, new ByteArrayInputStream(subContent), subMap);            }         }      }   }      public void update(String topic, InputStream is, Map attrMap) throws Exception {      if (this.nirvanaClient) {         log.warning("The content of the data for this writer ' is sent to nirvana since 'replication.nirvanaClient' is set to 'true'");         return;      }            ClientProperty dumpProp = (ClientProperty)attrMap.get(ReplicationConstants.DUMP_ACTION);      ClientProperty endToRemoteProp = (ClientProperty)attrMap.get(ReplicationConstants.INITIAL_DATA_END_TO_REMOTE);      ClientProperty endOfTransition = (ClientProperty)attrMap.get(ReplicationConstants.END_OF_TRANSITION);      if (endOfTransition != null && endOfTransition.getBooleanValue()) {         deleteFiles(attrMap);      }      else if (dumpProp != null)         updateDump(topic, is, attrMap);      else if (endToRemoteProp != null)         updateManualTransfer(topic, is, attrMap);      else         log.severe("Unknown operation");         }   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -