📄 sqldescription.java
字号:
else buf.append(", "); buf.append(colNames[i]).append("=? "); } } return buf.toString(); } private final byte[] cutEndIfTooLong(SqlColumn col, String colName, byte[] val) { // if too log cut the end int colSize = col.getColSize(); if (colSize < 1 || val.length <= colSize) return val; int sqlType = col.getSqlType(); if (colSize < 1 || sqlType == Types.BLOB || sqlType == Types.CLOB || sqlType == Types.LONGVARBINARY || sqlType == Types.LONGVARCHAR) { log.fine("Not cutting entry '" + col.getColName() + "' since colSize=" + colSize + " and sqlType=" + sqlType); return val; } log.warning("The entry on column='" + colName + "' is too long: " + val.length + " but should be max " + colSize + ". Will cut the end"); byte[] tmpBuf = val; val = new byte[colSize]; for (int i=0; i < colSize; i++) val[i] = tmpBuf[i]; return val; } private final double getDouble(ClientProperty prop) { double val = prop.getDoubleValue(); return val; } private final long getLong(ClientProperty prop) { try { long val = prop.getLongValue(); return val; } catch (NumberFormatException ex) { double val = getDouble(prop); return (long)val; } } private final void insertIntoStatement(PreparedStatement st, int pos, ClientProperty prop) throws SQLException, IOException, ParseException { String colName = prop.getName(); SqlColumn col = getColumn(colName); int sqlType = col.getSqlType(); boolean isNull = prop != null && Constants.TYPE_NULL.equals(prop.getType()); if (prop == null) { st.setObject(pos, null); return; } String tmp = prop.getStringValue(); if (sqlType == Types.INTEGER) { if (isNull || tmp == null || tmp.trim().length() < 1) { st.setNull(pos, Types.INTEGER); return; } long val = getLong(prop); log.fine("Handling insert column=" + colName + " as INTEGER (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); st.setLong(pos, val); } if (sqlType == Types.DECIMAL) { if (isNull || tmp == null || tmp.trim().length() < 1) { st.setNull(pos, Types.DECIMAL); return; } double val = getDouble(prop); log.fine("Handling insert column=" + colName + " as DECIMAL (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); st.setDouble(pos, val); } else if (sqlType == Types.SMALLINT) { if (isNull || tmp == null || tmp.trim().length() < 1) { st.setNull(pos, Types.SMALLINT); return; } // int val = prop.getIntValue(); int val = (int)getLong(prop); log.fine("Handling insert column=" + colName + " as SMALLINT (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); st.setInt(pos, val); } else if (sqlType == Types.DOUBLE) { if (isNull || tmp == null || tmp.trim().length() < 1) { st.setNull(pos, Types.DOUBLE); return; } // double val = prop.getDoubleValue(); double val = getDouble(prop); log.fine("Handling insert column=" + colName + " as DOUBLE (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); st.setDouble(pos, val); } else if (sqlType == Types.FLOAT) { if (isNull || tmp == null || tmp.trim().length() < 1) { st.setNull(pos, Types.FLOAT); return; } // float val = prop.getFloatValue(); float val = (float)getDouble(prop); log.fine("Handling insert column=" + colName + " as FLOAT (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); st.setFloat(pos, val); } else if (sqlType == Types.VARBINARY) { if (isNull) { st.setNull(pos, Types.VARBINARY); return; } byte[] val = prop.getBlobValue(); val = cutEndIfTooLong(col, colName, val); log.fine("Handling insert column=" + colName + " as VARBINARY (type=" + sqlType + ", count=" + pos + ")"); st.setBytes(pos, val); } else if (sqlType == Types.VARCHAR) { if (isNull) { st.setNull(pos, Types.VARCHAR); return; } String val = prop.getStringValue(); log.fine("Handling insert column=" + colName + " as VARCHAR (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); // if too log cut the end if (col.getCharLength() > 0 && col.getCharLength() < val.length()) { log.warning("The entry on column='" + colName + "' is too long: " + val.length() + " but should be max " + col.getCharLength() + ". Will cut the end"); val = val.substring(0, col.getCharLength()); } st.setString(pos, val); } else if (sqlType == Types.CHAR) { if (isNull) { st.setNull(pos, Types.CHAR); return; } String val = prop.getStringValue(); log.fine("Handling insert column=" + colName + " as CHAR (type=" + sqlType + ", count=" + pos + ") '" + val + "'"); // if too log cut the end if (col.getCharLength() > 0 && col.getCharLength() < val.length()) { log.warning("The entry on column='" + colName + "' is too long: " + val.length() + " but should be max " + col.getCharLength() + ". Will cut the end"); val = val.substring(0, col.getCharLength()); } st.setString(pos, val); } else if (sqlType == Types.BLOB) { if (isNull) { st.setNull(pos, Types.BLOB); return; } byte[] val = prop.getBlobValue(); log.fine("Handling insert column=" + colName + " as BLOB (type=" + sqlType + ", count=" + pos + ")"); val = cutEndIfTooLong(col, colName, val); ByteArrayInputStream bais = new ByteArrayInputStream(val); st.setBinaryStream(pos, bais, val.length); } else if (sqlType == Types.CLOB) { if (isNull) { st.setNull(pos, Types.CLOB); return; } log.fine("Handling insert column=" + colName + " as CLOB (type=" + sqlType + ", count=" + pos + ")"); byte[] val = prop.getBlobValue(); val = cutEndIfTooLong(col, colName, val); ByteArrayInputStream bais = new ByteArrayInputStream(val); st.setAsciiStream(pos, bais, val.length); } else if (isBinaryType(sqlType)) { if (isNull) { st.setNull(pos, sqlType); return; } log.fine("Handling insert column=" + colName + " as binary (type=" + sqlType + ", count=" + pos + ")"); byte[] val = prop.getBlobValue(); val = cutEndIfTooLong(col, colName, val); ByteArrayInputStream blob_stream = new ByteArrayInputStream(val); st.setBinaryStream(pos, blob_stream, val.length); //(int)sizeInBytes); } else if (sqlType == Types.DATE || sqlType == Types.TIMESTAMP) { if (isNull || tmp == null || tmp.length() < 1) { st.setNull(pos, sqlType); return; } log.fine("Handling insert column=" + colName + " as Date (type=" + sqlType + ", count=" + pos + ")"); String dateTxt = prop.getStringValue(); Timestamp ts = null; try { ts = Timestamp.valueOf(dateTxt); } catch (NumberFormatException ex) { // this because for some reason timestamps and dates // can come with a comma separator instead of a dot. // This was probably due to: // TO_CHAR(LR_SERVER_DATE,'YYYY-MM-DD HH24:MI:SSXFF') in the triggers. It has now // been replaced by TO_CHAR(LR_SERVER_DATE,'YYYY-MM-DD HH24:MI:SS.FF') // 'X' meaning the locale punctuator char (of the env and the writing session) String dateTxt1 = dateTxt.replace(',', '.'); log.warning("Conversion of '" + dateTxt + "' not possible, trying it with '" + dateTxt1 + "'"); ts = Timestamp.valueOf(dateTxt1); } // this works even for older oracles where the content is a Date and not a Timestamp st.setTimestamp(pos, ts); } else { if (isNull || tmp == null) { st.setNull(pos, sqlType); return; } log.fine("Handling insert column=" + colName + " (type=" + sqlType + ", count=" + pos + ")"); st.setObject(pos, prop.getObjectValue(), sqlType); } } private boolean isBinaryType(int type) { return ( type == Types.BINARY || type == Types.BLOB || type == Types.CLOB || type == Types.DATALINK || type == Types.JAVA_OBJECT || type == Types.LONGVARBINARY || type == Types.OTHER || type == Types.STRUCT || type == Types.VARBINARY); } private static String getVal(SqlDescription description, String key) { ClientProperty prop = description.getAttribute(key); if (prop == null) return null; return prop.getStringValue(); } private static void setVal(SqlDescription description, String key, String val) { ClientProperty prop = description.getAttribute(key); if (prop == null) return; prop.setValue(val); } private static void processMapping(SqlInfo sqlInfo, I_Mapper mapper) throws Exception { SqlDescription description = sqlInfo.getDescription(); // String originalTable = sqlInfo.getDescription().getIdentity(); String originalCatalog = getVal(description, ReplicationConstants.CATALOG_ATTR); String originalSchema = getVal(description, ReplicationConstants.SCHEMA_ATTR); String originalTable = getVal(description, ReplicationConstants.TABLE_NAME_ATTR); String catalog = mapper.getMappedCatalog(originalCatalog, originalSchema, originalTable, null, originalCatalog); String schema = mapper.getMappedSchema(originalCatalog, originalSchema, originalTable, null, originalSchema); String table = mapper.getMappedTable(originalCatalog, originalSchema, originalTable, null, null); if (catalog != null) { setVal(description, ReplicationConstants.CATALOG_ATTR, catalog); } if (schema != null) { setVal(description, ReplicationConstants.SCHEMA_ATTR, schema); } if (table != null) { setVal(description, ReplicationConstants.TABLE_NAME_ATTR, table); description.setIdentity(table); } List rows = sqlInfo.getRows(); if (rows != null) { for (int i=0; i < rows.size(); i++) { SqlRow row = (SqlRow)rows.get(i); String[] colNames = row.getColumnNames(); for (int j=0; j < colNames.length; j++) { String colName = colNames[i]; String newName = mapper.getMappedTable(originalCatalog, originalSchema, originalTable, colName, colName); if (!colName.equals(newName)) row.renameColumn(colName, newName); } } } } /** * Returns the number of entries updated * @param conn * @param row * @return * @throws Exception */ public int update(Connection conn, SqlRow newRow, I_Parser parserForOld) throws Exception { PreparedStatement st = null; String sql = ""; try { ArrayList entries = new ArrayList(); String setSt = createSetStatement(newRow, entries); int setSize = entries.size(); if (setSize < 1) throw new Exception("SqlDescription.update: could not update since the row did generate an empty set of columns to update. Row: " + newRow.toXml("") + " cols: " + toXml("")); if (parserForOld == null) throw new Exception("SqlDescription.update: the parser is null. It is needed to parse the old value"); ClientProperty prop = newRow.getAttribute(ReplicationConstants.OLD_CONTENT_ATTR); if (prop == null || prop.getValueRaw() == null) throw new Exception("The attribute '" + ReplicationConstants.OLD_CONTENT_ATTR + "' was not defined for '" + newRow.toXml("") + "'"); String xmlLiteral = OLD_PREFIX + prop.getStringValue() + OLD_POSTFIX; ByteArrayInputStream bais = new ByteArrayInputStream(xmlLiteral.getBytes()); SqlInfo sqlInfo = parserForOld.parse(bais, this.charSet); // CONVERT if (sqlInfo.getRowCount() < 1) throw new Exception("The string '" + xmlLiteral + "' did not contain any row for '" + newRow.toXml("") + "'"); SqlRow oldRow = (SqlRow)sqlInfo.getRows().get(0); String whereSt = createWhereStatement(oldRow, entries); /** * If it does not have a Primary key it needs to check wether the entry exists * and is really unique. If it is not unique it will warn. If nothing is found * it will warn too. */ if (!hasPk()) { sql = "SELECT count(*) FROM " + getCompleteTableName() + whereSt; ResultSet rs = null; try { st = conn.prepareStatement(sql); for (int i=setSize; i < entries.size(); i++) { insertIntoStatement(st, i-setSize+1, (ClientProperty)entries.get(i)); } rs = st.executeQuery(); if (!rs.next()) throw new Exception("When updating '" + newRow.toXml("") + "' for '" + toXml("") + "' the statement '" + sql + "' returned an emtpy result set. Can not determine what to do"); long entriesFound = rs.getLong(1); if (entriesFound == 0) { log.warning("no entries found for the statement '" + sql + "' I will not do anything. The msg was '" + newRow.toXml("") + "' and the old msg was '" + oldRow.toXml("") + "'"); return 0; } if (entriesFound > 1) log.warning("" + entriesFound + " entries found matching the statement '" + sql + "'. I will continue by updating these entries. You probably will get " + entriesFound + " warnings of entries not found in this same transaction. You can then ignore these"); } finally { if (rs != null) { try { rs.close(); rs = null; } catch (Exception ex) { } } if (st != null) { try { st.close(); st = null; } catch (Exception ex) { } } } } sql = "UPDATE " + getCompleteTableName() + setSt + whereSt; st = conn.prepareStatement(sql); for (int i=0; i < entries.size(); i++) insertIntoStatement(st, i+1, (ClientProperty)entries.get(i)); return st.executeUpdate(); } catch (Throwable ex) { log.severe(" Entry '" + newRow.toXml("", true, false, true) + "' caused a (throwable) exception. Statement was '" + sql + "': " + ex.getMessage()); if (ex instanceof Exception) throw (Exception)ex; else throw new Exception(ex); } finally { if (st != null) st.close(); } } /** * Returns the number of entries deleted * @param conn * @param row * @return * @throws Exception */ public int delete(Connection conn, SqlRow row) throws Exception { PreparedStatement st = null; String sql = ""; try { ArrayList entries = new ArrayList(); String whereSt = createWhereStatement(row, entries); /** * If it does not have a Primary key it needs to check wether the entry exists * and is really unique. If it is not unique it will warn. If nothing is found * it will warn too. */ if (!hasPk()) { sql = "SELECT count(*) FROM " + getCompleteTableName() + whereSt;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -