📄 replicationwriter.java
字号:
} catch (SQLException ex) { log.severe("store: an sql exception occured when trying to commit: " + ex.getMessage()); String txt = "Error code:'" + ex.getErrorCode() + "' state='" + ex.getSQLState() + "' localizedMsg='" + ex.getLocalizedMessage() + "'"; log.severe(txt); log.severe(Global.getStackTraceAsString(ex)); throw (Exception)ex; } catch (Throwable ex) { log.severe("store: an exception occured when trying to commit: " + ex.getMessage()); log.severe(Global.getStackTraceAsString(ex)); if (ex instanceof Exception) throw (Exception)ex; else throw new Exception(ex); } } else log.warning("An exception occured in this transaction previously, not doing anything here (no storage will be done)"); } catch (SQLException ex) { this.exceptionInTransaction = true; log.severe("An exception occured when trying storing the entry." + ex.getMessage()); String txt = "Error code:'" + ex.getErrorCode() + "' state='" + ex.getSQLState() + "' localizedMsg='" + ex.getLocalizedMessage() + "'"; log.severe(txt); log.severe(Global.getStackTraceAsString(ex)); // since if the structure has changed we may want to re-read it removeTableDescriptionFromCache(catalog, schema, table); throw ex; } catch (Exception ex) { this.exceptionInTransaction = true; log.severe("An exception occured when trying storing the entry." + ex.getMessage()); log.severe(Global.getStackTraceAsString(ex)); removeTableDescriptionFromCache(catalog, schema, table); throw ex; } catch (Throwable ex) { this.exceptionInTransaction = true; log.severe("A Throwable exception occured when trying storing the entry." + ex.getMessage()); log.severe(Global.getStackTraceAsString(ex)); removeTableDescriptionFromCache(catalog, schema, table); throw new Exception(ex); } finally { if (conn != null) { if (needRollback && !keepTransactionOpen) { try { conn.rollback(); log.fine("Transaction rolled back"); } catch (Throwable ex) { log.severe("store: an exception occured when trying to rollback: " + ex.getMessage()); log.severe(Global.getStackTraceAsString(ex)); } finally { conn = SpecificDefault.removeFromPool(conn, SpecificDefault.ROLLBACK_NO, this.pool); } } if (oldAutoCommitKnown && conn != null) { try { if (oldAutoCommit) conn.setAutoCommit(oldAutoCommit); } catch (Throwable ex) { log.severe("store: an exception occured when reverting to original autocommit settings: " + ex.getMessage()); log.severe(Global.getStackTraceAsString(ex)); } } if (keepTransactionOpen) // we need to keep the connection this.keptConnection = conn; else { conn = SpecificDefault.releaseIntoPool(conn, SpecificDefault.COMMIT_NO, this.pool); this.exceptionInTransaction = false; // we make sure to clear this flag } } } } else { log.severe("store with not command. The entry will be ignored. " + dbInfo.toString()); } } private final String getKey(String catalog, String schema, String tableName) { StringBuffer buf = new StringBuffer(256); catalog = (catalog == null) ? "" : catalog.trim(); schema = (schema == null) ? "" : schema.trim(); tableName = (tableName == null) ? "" : tableName.trim(); buf.append(catalog).append(".").append(schema).append(".").append(tableName); return buf.toString(); } private final synchronized SqlInfo getTableDescriptionFromCache(String catalog, String schema, String tableName) { return (SqlInfo)this.sqlInfoCache.get(getKey(catalog, schema, tableName)); } private final synchronized void removeTableDescriptionFromCache(String catalog, String schema, String tableName) { sqlInfoCache.remove(getKey(catalog, schema, tableName)); } private final synchronized void addToSqlInfoCache(SqlInfo sqlInfo) { if (this.sqlInfoCache.size() > this.sqlInfoCacheMaxSize) { log.warning("The maximum cache size for the sqlInfo objects of " + this.sqlInfoCacheMaxSize + " entries has already been reached"); return; } SqlDescription desc = sqlInfo.getDescription(); String catalog = desc.getCatalog(); String schema = desc.getSchema(); String table = desc.getIdentity(); String key = getKey(catalog, schema, table); this.sqlInfoCache.put(key, sqlInfo); } private final synchronized void clearSqlInfoCache() { this.sqlInfoCache.clear(); } /** * Returns the structure information of the table in question. This could be cached to get * better performance. * * @param schema * @param tableName * @param conn * @return * @throws Exception */ private synchronized SqlDescription getTableDescription(String catalog, String schema, String tableName, Connection conn) throws Exception { SqlInfo sqlInfo = getTableDescriptionFromCache(catalog, schema, tableName); if (sqlInfo == null) { sqlInfo = new SqlInfo(this.info); if (sqlInfo.fillMetadata(conn, catalog, schema, tableName, null, null)) addToSqlInfoCache(sqlInfo); } return sqlInfo.getDescription(); } private final String getCompleteFileName(String filename) { return this.importLocation + File.separator + filename; } private void deleteFiles(String filename) { String completeFilename = getCompleteFileName(filename); if (!this.keepDumpFiles) { File fileToDelete = new File(completeFilename); boolean del = fileToDelete.delete(); if (!del) log.warning("could not delete the file '" + completeFilename + "' please delete it manually"); } } private void deleteFiles(Map attrMap) { ClientProperty filenameProp = (ClientProperty)attrMap.get(ReplicationConstants.FILENAME_ATTR); String filename = null; if (filenameProp != null) filename = filenameProp.getStringValue(); if (filename != null && filename.length() > 0) { deleteFiles(filename); } else log.warning("Could not cleanup since the '" + ReplicationConstants.FILENAME_ATTR + "' attribute was not set"); } /** * This is invoked for dump files */ private void updateDump(String topic, InputStream is, Map attrMap) throws Exception { clearSqlInfoCache(); ClientProperty prop = (ClientProperty)attrMap.get(FILENAME_ATTR); String filename = null; if (prop == null) { log.warning("The property '" + FILENAME_ATTR + "' has not been found. Will choose an own temporary one"); filename = "tmpFilename.dmp"; } else filename = prop.getStringValue(); log.info("'" + topic + "' dumping file '" + filename + "' on '" + this.importLocation + "'"); // will now write to the file system this.callback.update(topic, is, attrMap); // and now perform an import of the DB boolean isEof = true; boolean isException = false; int seqNumber = -1; String exTxt = ""; prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_SEQ, attrMap); if (prop != null) { seqNumber = prop.getIntValue(); prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_EOF, attrMap); if (prop == null) { isEof = false; } else { prop = XBMessage.get(XBConnectionMetaData.JMSX_GROUP_EX, attrMap); if (prop != null) { exTxt = prop.getStringValue(); isException = true; } } } log.info("'" + topic + "' dumped file '" + filename + "' on '" + this.importLocation + "' seq nr. '" + seqNumber + "' ex='" + exTxt + "'"); if (isEof && !isException) { this.dbSpecific.initialCommandPre(); if (this.hasInitialCmd) { String completeFilename = getCompleteFileName(filename); if (this.schemaToWipeout != null) { log.info("Going to clean up the schema '" + this.schemaToWipeout); final String catalog = null; try { this.dbSpecific.wipeoutSchema(catalog, this.schemaToWipeout, null); } catch (Exception ex) { log.severe("Could not clean up completely the schema"); log.severe(Global.getStackTraceAsString(ex)); } } String version = null; this.dbSpecific.initialCommand(null, completeFilename, version); } else log.info("since no 'replication.initialCmd' property defined, the initial command will not be executed (not either the wipeout of the schema)"); } } private void updateManualTransfer(String topic, InputStream is, Map attrMap) throws Exception { ClientProperty subDirProp = (ClientProperty)attrMap.get(ReplicationConstants.INITIAL_DATA_ID); if (subDirProp == null) throw new Exception("updateManualTransfer: the mandatory property '" + ReplicationConstants.INITIAL_DATA_ID + "' was not found in the message"); String subDirName = subDirProp.getStringValue(); if (subDirName == null || subDirName.trim().length() < 1) throw new Exception("updateManualTransfer: the mandatory property '" + ReplicationConstants.INITIAL_DATA_ID + "' was empty"); String initialFilesLocation = this.info.get("replication.initialFilesLocation", "${user.home}/tmp"); XmlScriptParser xmlScriptParser = new XmlScriptParser(); xmlScriptParser.init(new Global(), null, null); DbWriter dbWriter = (DbWriter)this.info.getObject("org.xmlBlaster.contrib.dbwriter.DbWriter"); if (dbWriter == null) throw new Exception("The DbWriter is not set in the info"); File dirWhereToStore = ReplManagerPlugin.checkExistance(initialFilesLocation); File subDir = new File(dirWhereToStore, subDirName); if (!subDir.exists()) { String txt = "directory '" + subDir.getAbsolutePath() + "' does not exist"; log.severe(txt); throw new Exception(txt); } if (!subDir.isDirectory()) { String txt = "file '" + subDir.getAbsolutePath() + "' is not a directory"; log.severe(txt); throw new Exception(txt); } String[] files = subDir.list(); log.info("retreiving '" + files.length + "' manual transferred data files from directory '" + subDir.getAbsolutePath() + "'"); // alphabetical order guarantees correct sequence. TreeSet set = new TreeSet(); for (int i=0; i < files.length; i++) set.add(files[i]); files = (String[])set.toArray(new String[set.size()]); for (int i=0; i < files.length; i++) { File file = new File(subDir, files[i]); if (!file.exists()) throw new Exception("The entry nr. '" + i + "': file '" + file.getAbsolutePath() + "' does not exist"); if (!file.isFile()) throw new Exception("The entry nr. '" + i + "': file '" + file.getAbsolutePath() + "' is not a file"); FileInputStream fis = new FileInputStream(file); MsgInfo[] msgInfos = xmlScriptParser.parse(fis); log.info("Processing entry " + i + " of " + files.length + ": '" + file.getAbsoluteFile() + "' which has " + msgInfos.length + "' msg"); for (int j=0; j < msgInfos.length; j++) { MsgUnitRaw[] messages = msgInfos[j].getMessageArr(); for (int k=0; k < messages.length; k++) { MsgUnit msgUnit = new MsgUnit(new Global(), messages[k].getKey(), messages[k].getContent(), messages[k].getQos()); // MsgUnit msgUnit = (MsgUnit)messages[k].getMsgUnit(); if (msgUnit == null) throw new Exception("The message unit for '" + file.getAbsoluteFile() + "' is null"); if (msgUnit.getQosData() == null) throw new Exception("The qos for message unit of '" + file.getAbsoluteFile() + "' is null"); Map subMap = msgUnit.getQosData().getClientProperties(); byte[] subContent = msgUnit.getContent(); dbWriter.update(topic, new ByteArrayInputStream(subContent), subMap); } } } } public void update(String topic, InputStream is, Map attrMap) throws Exception { if (this.nirvanaClient) { log.warning("The content of the data for this writer ' is sent to nirvana since 'replication.nirvanaClient' is set to 'true'"); return; } ClientProperty dumpProp = (ClientProperty)attrMap.get(ReplicationConstants.DUMP_ACTION); ClientProperty endToRemoteProp = (ClientProperty)attrMap.get(ReplicationConstants.INITIAL_DATA_END_TO_REMOTE); ClientProperty endOfTransition = (ClientProperty)attrMap.get(ReplicationConstants.END_OF_TRANSITION); if (endOfTransition != null && endOfTransition.getBooleanValue()) { deleteFiles(attrMap); } else if (dumpProp != null) updateDump(topic, is, attrMap); else if (endToRemoteProp != null) updateManualTransfer(topic, is, attrMap); else log.severe("Unknown operation"); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -