📄 specificdefault.java
字号:
hasPkAlready = true; } } if (hasPkAlready) buf.append(", PRIMARY KEY (").append(pkBuf).append(")"); buf.append(")"); return buf.toString(); } /** * If force is true, it deletes first all entries from the Tables table (kind of reset). */ public void addTriggersIfNeeded(boolean force, String[] destinations, boolean forceSend) throws Exception { if (force) { try { this.dbPool.update("DELETE FROM " + this.dbMetaHelper.getIdentifier(this.replPrefix + "TABLES")); } catch (Exception ex) { log.warning("Could not delete tables configuration before adding triggers with 'force' true"); ex.printStackTrace(); } } final boolean doFix = true; checkTriggerConsistency(doFix); Connection conn = this.dbPool.reserve(); try { TableToWatchInfo[] tablesToWatch = TableToWatchInfo.getTablesToWatch(conn, this.info); log.info("there are '" + tablesToWatch.length + "' tables to watch (invoked with forceSend='" + forceSend + "'"); for (int i=0; i < tablesToWatch.length; i++) addTableToWatch(tablesToWatch[i], force, destinations, forceSend); } finally { if (conn != null) this.dbPool.release(conn); } } /** * * @see org.xmlBlaster.contrib.replication.I_DbSpecific#initiateUpdate(java.lang.String) */ public void initiateUpdate(String topic, String replManagerAddress, String[] slaveNames, String requestedVersion, String initialFilesLocation) throws Exception { log.info("initial replication for destinations='" + replManagerAddress + "' and slaves='" + toString(slaveNames) + "' and location '" + initialFilesLocation + "'"); Connection conn = null; // int oldTransactionIsolation = Connection.TRANSACTION_SERIALIZABLE; // int oldTransactionIsolation = Connection.TRANSACTION_REPEATABLE_READ; int oldTransactionIsolation = Connection.TRANSACTION_READ_COMMITTED; try { if (this.dbPool == null) throw new Exception("intitiate update: The Database pool has not been instantiated (yet)"); conn = this.dbPool.reserve(); conn.setAutoCommit(false); oldTransactionIsolation = conn.getTransactionIsolation(); conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); // the result must be sent as a high prio message to the real destination boolean forceFlag = false; boolean isRequestingCurrentVersion = false; log.info("current replication version is '" + this.replVersion + "' and requested version is '" + requestedVersion + "'"); if (this.replVersion.equalsIgnoreCase(requestedVersion)) isRequestingCurrentVersion = true; boolean forceSend = !isRequestingCurrentVersion; addTriggersIfNeeded(forceFlag, slaveNames, forceSend); InitialUpdater.ConnectionInfo connInfo = this.initialUpdater.getConnectionInfo(conn); long minKey = this.incrementReplKey(conn); String filename = null; String completeFilename = null; if (isRequestingCurrentVersion) filename = this.initialUpdater.initialCommand(slaveNames, completeFilename, connInfo, requestedVersion); else filename = VersionTransformerCache.buildFilename(this.replPrefix, requestedVersion); long maxKey = this.incrementReplKey(conn); // if (!connInfo.isCommitted()) conn.commit(); List slavesList = new ArrayList(); for (int i=0; i < slaveNames.length; i++) { if (!isCancelled(slaveNames[i])) slavesList.add(slaveNames[i]); } slaveNames = (String[])slavesList.toArray(new String[slavesList.size()]); this.initialUpdater.sendInitialDataResponse(slaveNames, filename, replManagerAddress, minKey, maxKey, requestedVersion, this.replVersion, initialFilesLocation); } catch (Exception ex) { conn = removeFromPool(conn, ROLLBACK_YES); ex.printStackTrace(); } finally { if (conn != null) { if (oldTransactionIsolation != Connection.TRANSACTION_READ_COMMITTED) { try { conn.setTransactionIsolation(oldTransactionIsolation); } catch (SQLException e) { e.printStackTrace(); } } // we always throw away the connection on initial update (to be on the safe side) // if rollback was done before this will not execute anything since conn=null conn = removeFromPool(conn, ROLLBACK_NO); } } } /** * @see org.xmlBlaster.contrib.replication.I_DbSpecific#initialCommand(java.lang.String, java.lang.String) */ public void initialCommand(String[] slaveNames, String completeFilename, String version) throws Exception { this.initialUpdater.initialCommand(slaveNames, completeFilename, null, version); } /** * @see org.xmlBlaster.contrib.replication.I_DbSpecific#initialCommandPre() */ public void initialCommandPre() throws Exception { this.initialUpdater.initialCommandPre(); } /** * @see org.xmlBlaster.contrib.replication.I_DbSpecific#broadcastStatement(java.lang.String, long, long, boolean, boolean, String, String) */ public byte[] broadcastStatement(String sql, long maxResponseEntries, boolean isHighPrio, boolean isMaster, String sqlTopic, String statementId) throws Exception { Connection conn = this.dbPool.reserve(); byte[] response = null; try { conn.setAutoCommit(false); if (this.isInMaster) { CallableStatement st = null; try { StringBuffer buf = new StringBuffer(); // buf.append("<desc>\n"); buf.append("<attr id='").append(ReplicationConstants.STATEMENT_ATTR).append("'>").append(sql).append("</attr>\n"); buf.append("<attr id='").append(ReplicationConstants.STATEMENT_PRIO_ATTR).append("'>").append(isHighPrio).append("</attr>\n"); buf.append("<attr id='").append(ReplicationConstants.MAX_ENTRIES_ATTR).append("'>").append(maxResponseEntries).append("</attr>\n"); buf.append("<attr id='").append(ReplicationConstants.STATEMENT_ID_ATTR).append("'>").append(statementId).append("</attr>\n"); buf.append("<attr id='").append(ReplicationConstants.SQL_TOPIC_ATTR).append("'>").append(sqlTopic).append("</attr>\n"); // buf.append("</desc>\n"); String sqlTxt = "{? = call " + this.replPrefix + "prepare_broadcast(?)}"; st = conn.prepareCall(sqlTxt); String value = buf.toString(); st.setString(2, value); st.registerOutParameter(1, Types.VARCHAR); st.executeQuery(); } finally { st.close(); } } Statement st2 = conn.createStatement(); try { if (st2.execute(sql)) { ResultSet rs = st2.getResultSet(); response = ResultSetToXmlConverter.getResultSetAsXmlLiteral(conn, rs, "statement", "query", maxResponseEntries); } else { int updateCount = st2.getUpdateCount(); StringBuffer buf1 = new StringBuffer(); buf1.append("<sql>\n"); buf1.append(" <desc>\n"); buf1.append(" <command>").append("statement").append("</command>"); buf1.append(" <ident>").append("update").append("</ident>"); buf1.append(" <attr id='").append("updateCount").append("'>").append(updateCount).append("</attr>"); buf1.append(" </desc>\n"); buf1.append("</sql>\n"); response = buf1.toString().getBytes(); } // TODO make this a fine log.info("statement to broadcast shall give this response: " + new String(response)); } finally { if (st2 != null) st2.close(); } return response; } catch (Exception ex) { conn = removeFromPool(conn, ROLLBACK_YES); throw ex; } finally { conn = releaseIntoPool(conn, COMMIT_YES); } } /** * Always returns null (to nullify the connection). * @param conn The connection. Can be null, in which case nothing is done. * @param doRollback if true, a rollback is done, on false no rollback is done. * @return always null. */ protected Connection removeFromPool(Connection conn, boolean doRollback) { return removeFromPool(conn, doRollback, this.dbPool); } /** * Always returns null (to nullify the connection). * @param conn The connection. Can be null, in which case nothing is done. * @param doRollback if true, a rollback is done, on false no rollback is done. * @param pool the pool to which the connection belongs. * @return always null. */ public static Connection removeFromPool(Connection conn, boolean doRollback, I_DbPool pool) { log.fine("Removing from Database pool of connection (rollback='" + doRollback + "')"); if (conn == null) return null; if (doRollback) { try { conn.rollback(); } catch (Throwable ex) { log.severe("An exception occured when trying to rollback the jdbc connection. " + ex.getMessage()); ex.printStackTrace(); } } try { pool.erase(conn); } catch (Throwable ex) { log.severe("An exception occured when trying to erase the connection from the pool. " + ex.getMessage()); ex.printStackTrace(); } return null; } /** * Always returns null (to nullify the connection). * @param conn The connection. Can be null, in which case nothing is done. * @param doCommit if true, a commit is done, on false no commit is done. * @return always null. */ protected Connection releaseIntoPool(Connection conn, boolean doCommit) { return releaseIntoPool(conn, doCommit, this.dbPool); } /** * Always returns null (to nullify the connection). * @param conn The connection. Can be null, in which case nothing is done. * @param doCommit if true, a commit is done, on false no commit is done. * @param pool the pool to which the connection belongs. * @return always null. */ public static Connection releaseIntoPool(Connection conn, boolean doCommit, I_DbPool pool) { if (conn == null) return null; if (doCommit) { try { conn.commit(); } catch (Throwable ex) { ex.printStackTrace(); } } try { pool.release(conn); } catch (Throwable ex) { log.severe("An exception occured when trying to release the connection into the pool. " + ex.getMessage()); ex.printStackTrace(); } return null; } /** * @see org.xmlBlaster.contrib.replication.I_DbSpecific#cancelUpdate(java.lang.String) */ public void cancelUpdate(String replSlave) { synchronized(this.cancelledUpdates) { this.cancelledUpdates.add(replSlave); } } /** * @see org.xmlBlaster.contrib.replication.I_DbSpecific#clearCancelUpdate(java.lang.String) */ public void clearCancelUpdate(String replSlave) { synchronized(this.cancelledUpdates) { this.cancelledUpdates.remove(replSlave); } } private boolean isCancelled(String replSlave) { synchronized(this.cancelledUpdates) { return this.cancelledUpdates.contains(replSlave); } } public static String getReplPrefix(I_Info info) { String pureVal = info.get(ReplicationConstants.REPL_PREFIX_KEY, ReplicationConstants.REPL_PREFIX_DEFAULT); String corrected = GlobalInfo.getStrippedString(pureVal); if (!corrected.equals(pureVal)) log.warning("The " + ReplicationConstants.REPL_PREFIX_KEY + " property has been changed from '" + pureVal + "' to '" + corrected + "' to be able to use it inside a DB"); return corrected; } /** * Example code. * <p /> * <tt>java -Djava.util.logging.config.file=testlog.properties org.xmlBlaster.contrib.replication.ReplicationManager -db.password secret</tt> * * @param args * Command line */ public static void main(String[] args) { I_DbPool pool = null; Connection conn = null; try { System.setProperty("java.util.logging.config.file", "testlog.properties"); // LogManager.getLogManager().readConfiguration(); // Preferences prefs = Preferences.userRoot(); // prefs.node(ReplicationConstants.CONTRIB_PERSISTENT_MAP).clear(); // prefs.clear(); // ---- Database settings ----- if (System.getProperty("jdbc.drivers", null) == null) { System.setProperty( "jdbc.drivers", "org.hsqldb.jdbcDriver:oracle.jdbc.driver.OracleDriver:com.microsoft.jdbc.sqlserver.SQLServerDriver:org.postgresql.Driver"); } if (System.getProperty("db.url", null) == null) { System.setProperty("db.url", "jdbc:postgresql:test//localhost/test"); } if (System.getProperty("db.user", null) == null) { System.setProperty("db.user", "postgres"); } if (System.getProperty("db.password", null) == null) { System.setProperty("db.password", ""); } I_Info info = new PropertiesInfo(System.getProperties()); boolean forceCreationAndInit = true; I_DbSpecific specific = ReplicationConverter.getDbSpecific(info, forceCreationAndInit); pool = (I_DbPool) info.getObject("db.pool"); conn = pool.reserve(); conn.setAutoCommit(true); String schema = info.get("wipeout.schema", null); String version = info.get("replication.version", "0.0"); if (schema == null) { String initialUpdateFile = info.get("initialUpdate.file", null); if (initialUpdateFile != null) { specific.initialCommand(null, initialUpdateFile, version); } else specific.cleanup(conn, true); } else { specific.wipeoutSchema(null, schema, WIPEOUT_ALL); } } catch (Throwable e) { System.err.println("SEVERE: " + e.toString()); e.printStackTrace(); conn = SpecificDefault.removeFromPool(conn, ROLLBACK_NO, pool); } finally { if (pool != null) { conn = releaseIntoPool(conn, COMMIT_NO, pool); } } } public void setAttributeTransformer(I_AttributeTransformer transformer) { this.transformer = transformer; } public boolean isDatasourceReadonly() { return false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -