⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 specificdefault.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
            hasPkAlready = true;         }      }      if (hasPkAlready)         buf.append(", PRIMARY KEY (").append(pkBuf).append(")");      buf.append(")");      return buf.toString();   }   /**    * If force is true, it deletes first all entries from the Tables table (kind of reset).    */   public void addTriggersIfNeeded(boolean force, String[] destinations, boolean forceSend) throws Exception {      if (force) {         try {            this.dbPool.update("DELETE FROM " + this.dbMetaHelper.getIdentifier(this.replPrefix + "TABLES"));         }         catch (Exception ex) {            log.warning("Could not delete tables configuration before adding triggers with 'force' true");            ex.printStackTrace();         }      }      final boolean doFix = true;      checkTriggerConsistency(doFix);      Connection conn = this.dbPool.reserve();      try {         TableToWatchInfo[] tablesToWatch = TableToWatchInfo.getTablesToWatch(conn, this.info);         log.info("there are '" + tablesToWatch.length + "' tables to watch (invoked with forceSend='" + forceSend + "'");         for (int i=0; i < tablesToWatch.length; i++)            addTableToWatch(tablesToWatch[i], force, destinations, forceSend);      }      finally {         if (conn != null)            this.dbPool.release(conn);      }   }   /**    *     * @see org.xmlBlaster.contrib.replication.I_DbSpecific#initiateUpdate(java.lang.String)    */   public void initiateUpdate(String topic, String replManagerAddress, String[] slaveNames, String requestedVersion, String initialFilesLocation) throws Exception {            log.info("initial replication for destinations='" + replManagerAddress + "' and slaves='" + toString(slaveNames) + "' and location '" + initialFilesLocation + "'");      Connection conn = null;      // int oldTransactionIsolation = Connection.TRANSACTION_SERIALIZABLE;      // int oldTransactionIsolation = Connection.TRANSACTION_REPEATABLE_READ;      int oldTransactionIsolation = Connection.TRANSACTION_READ_COMMITTED;      try {         if (this.dbPool == null)            throw new Exception("intitiate update: The Database pool has not been instantiated (yet)");         conn = this.dbPool.reserve();         conn.setAutoCommit(false);         oldTransactionIsolation = conn.getTransactionIsolation();         conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);         // the result must be sent as a high prio message to the real destination         boolean forceFlag = false;         boolean isRequestingCurrentVersion = false;         log.info("current replication version is '" + this.replVersion + "' and requested version is '" + requestedVersion + "'");         if (this.replVersion.equalsIgnoreCase(requestedVersion))            isRequestingCurrentVersion = true;         boolean forceSend = !isRequestingCurrentVersion;         addTriggersIfNeeded(forceFlag, slaveNames, forceSend);         InitialUpdater.ConnectionInfo connInfo = this.initialUpdater.getConnectionInfo(conn);         long minKey = this.incrementReplKey(conn);         String filename = null;         String completeFilename = null;         if (isRequestingCurrentVersion)            filename = this.initialUpdater.initialCommand(slaveNames, completeFilename, connInfo, requestedVersion);         else            filename = VersionTransformerCache.buildFilename(this.replPrefix, requestedVersion);                  long maxKey = this.incrementReplKey(conn);          // if (!connInfo.isCommitted())         conn.commit();         List slavesList = new ArrayList();         for (int i=0; i < slaveNames.length; i++) {            if (!isCancelled(slaveNames[i]))               slavesList.add(slaveNames[i]);         }         slaveNames = (String[])slavesList.toArray(new String[slavesList.size()]);         this.initialUpdater.sendInitialDataResponse(slaveNames, filename, replManagerAddress, minKey, maxKey, requestedVersion, this.replVersion, initialFilesLocation);      }      catch (Exception ex) {         conn = removeFromPool(conn, ROLLBACK_YES);         ex.printStackTrace();      }      finally {         if (conn != null) {            if (oldTransactionIsolation != Connection.TRANSACTION_READ_COMMITTED) {               try {                  conn.setTransactionIsolation(oldTransactionIsolation);               }               catch (SQLException e) {                   e.printStackTrace();                }            }            // we always throw away the connection on initial update (to be on the safe side)            // if rollback was done before this will not execute anything since conn=null             conn = removeFromPool(conn, ROLLBACK_NO);         }      }   }      /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#initialCommand(java.lang.String, java.lang.String)    */   public void initialCommand(String[] slaveNames, String completeFilename, String version) throws Exception {      this.initialUpdater.initialCommand(slaveNames, completeFilename, null, version);   }   /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#initialCommandPre()    */   public void initialCommandPre() throws Exception {      this.initialUpdater.initialCommandPre();   }   /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#broadcastStatement(java.lang.String, long, long, boolean, boolean, String, String)    */   public byte[] broadcastStatement(String sql, long maxResponseEntries, boolean isHighPrio, boolean isMaster, String sqlTopic, String statementId) throws Exception {      Connection conn = this.dbPool.reserve();      byte[] response = null;      try {         conn.setAutoCommit(false);         if (this.isInMaster) {            CallableStatement st = null;            try {               StringBuffer buf = new StringBuffer();               // buf.append("<desc>\n");               buf.append("<attr id='").append(ReplicationConstants.STATEMENT_ATTR).append("'>").append(sql).append("</attr>\n");               buf.append("<attr id='").append(ReplicationConstants.STATEMENT_PRIO_ATTR).append("'>").append(isHighPrio).append("</attr>\n");               buf.append("<attr id='").append(ReplicationConstants.MAX_ENTRIES_ATTR).append("'>").append(maxResponseEntries).append("</attr>\n");               buf.append("<attr id='").append(ReplicationConstants.STATEMENT_ID_ATTR).append("'>").append(statementId).append("</attr>\n");               buf.append("<attr id='").append(ReplicationConstants.SQL_TOPIC_ATTR).append("'>").append(sqlTopic).append("</attr>\n");               // buf.append("</desc>\n");                              String sqlTxt = "{? = call " + this.replPrefix + "prepare_broadcast(?)}";               st = conn.prepareCall(sqlTxt);               String value = buf.toString();               st.setString(2, value);               st.registerOutParameter(1, Types.VARCHAR);               st.executeQuery();            }            finally {               st.close();            }         }                  Statement st2 = conn.createStatement();         try {            if (st2.execute(sql)) {               ResultSet rs = st2.getResultSet();               response = ResultSetToXmlConverter.getResultSetAsXmlLiteral(conn, rs, "statement", "query", maxResponseEntries);            }            else {               int updateCount = st2.getUpdateCount();               StringBuffer buf1 = new StringBuffer();               buf1.append("<sql>\n");               buf1.append("  <desc>\n");               buf1.append("    <command>").append("statement").append("</command>");               buf1.append("    <ident>").append("update").append("</ident>");               buf1.append("    <attr id='").append("updateCount").append("'>").append(updateCount).append("</attr>");               buf1.append("  </desc>\n");               buf1.append("</sql>\n");               response = buf1.toString().getBytes();            }            // TODO make this a fine            log.info("statement to broadcast shall give this response: " + new String(response));         }         finally {            if (st2 != null)               st2.close();         }         return response;      }      catch (Exception ex) {         conn = removeFromPool(conn, ROLLBACK_YES);         throw ex;      }      finally {         conn = releaseIntoPool(conn, COMMIT_YES);      }   }   /**    * Always returns null (to nullify the connection).    * @param conn The connection. Can be null, in which case nothing is done.    * @param doRollback if true, a rollback is done, on false no rollback is done.    * @return always null.    */   protected Connection removeFromPool(Connection conn, boolean doRollback) {      return removeFromPool(conn, doRollback, this.dbPool);   }      /**    * Always returns null (to nullify the connection).    * @param conn The connection. Can be null, in which case nothing is done.    * @param doRollback if true, a rollback is done, on false no rollback is done.    * @param pool the pool to which the connection belongs.    * @return always null.    */   public static Connection removeFromPool(Connection conn, boolean doRollback, I_DbPool pool) {      log.fine("Removing from Database pool of connection (rollback='" + doRollback + "')");      if (conn == null)         return null;      if (doRollback) {         try {            conn.rollback();         }         catch (Throwable ex) {            log.severe("An exception occured when trying to rollback the jdbc connection. " + ex.getMessage());            ex.printStackTrace();         }      }      try {         pool.erase(conn);      }      catch (Throwable ex) {         log.severe("An exception occured when trying to erase the connection from the pool. " + ex.getMessage());         ex.printStackTrace();      }      return null;   }      /**    * Always returns null (to nullify the connection).    * @param conn The connection. Can be null, in which case nothing is done.    * @param doCommit if true, a commit is done, on false no commit is done.    * @return always null.    */   protected Connection releaseIntoPool(Connection conn, boolean doCommit) {      return releaseIntoPool(conn, doCommit, this.dbPool);   }      /**    * Always returns null (to nullify the connection).    * @param conn The connection. Can be null, in which case nothing is done.    * @param doCommit if true, a commit is done, on false no commit is done.    * @param pool the pool to which the connection belongs.    * @return always null.    */   public static Connection releaseIntoPool(Connection conn, boolean doCommit, I_DbPool pool) {      if (conn == null)         return null;      if (doCommit) {         try {            conn.commit();         }         catch (Throwable ex) {            ex.printStackTrace();         }      }      try {         pool.release(conn);      }      catch (Throwable ex) {         log.severe("An exception occured when trying to release the connection into the pool. " + ex.getMessage());         ex.printStackTrace();      }      return null;   }      /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#cancelUpdate(java.lang.String)    */   public void cancelUpdate(String replSlave) {      synchronized(this.cancelledUpdates) {         this.cancelledUpdates.add(replSlave);      }   }   /**    * @see org.xmlBlaster.contrib.replication.I_DbSpecific#clearCancelUpdate(java.lang.String)    */   public void clearCancelUpdate(String replSlave) {      synchronized(this.cancelledUpdates) {         this.cancelledUpdates.remove(replSlave);      }   }      private boolean isCancelled(String replSlave) {      synchronized(this.cancelledUpdates) {         return this.cancelledUpdates.contains(replSlave);      }   }      public static String getReplPrefix(I_Info info) {      String pureVal = info.get(ReplicationConstants.REPL_PREFIX_KEY, ReplicationConstants.REPL_PREFIX_DEFAULT);      String corrected = GlobalInfo.getStrippedString(pureVal);      if (!corrected.equals(pureVal))         log.warning("The " + ReplicationConstants.REPL_PREFIX_KEY + " property has been changed from '" + pureVal + "' to '" + corrected + "' to be able to use it inside a DB");      return corrected;   }      /**    * Example code.    * <p />    * <tt>java -Djava.util.logging.config.file=testlog.properties org.xmlBlaster.contrib.replication.ReplicationManager -db.password secret</tt>    *     * @param args    *           Command line    */   public static void main(String[] args) {      I_DbPool pool = null;      Connection conn = null;      try {                  System.setProperty("java.util.logging.config.file",               "testlog.properties");         // LogManager.getLogManager().readConfiguration();         // Preferences prefs = Preferences.userRoot();         // prefs.node(ReplicationConstants.CONTRIB_PERSISTENT_MAP).clear();         // prefs.clear();         // ---- Database settings -----         if (System.getProperty("jdbc.drivers", null) == null) {            System.setProperty(                        "jdbc.drivers",                        "org.hsqldb.jdbcDriver:oracle.jdbc.driver.OracleDriver:com.microsoft.jdbc.sqlserver.SQLServerDriver:org.postgresql.Driver");         }         if (System.getProperty("db.url", null) == null) {            System.setProperty("db.url", "jdbc:postgresql:test//localhost/test");         }         if (System.getProperty("db.user", null) == null) {            System.setProperty("db.user", "postgres");         }         if (System.getProperty("db.password", null) == null) {            System.setProperty("db.password", "");         }         I_Info info = new PropertiesInfo(System.getProperties());         boolean forceCreationAndInit = true;         I_DbSpecific specific = ReplicationConverter.getDbSpecific(info, forceCreationAndInit);         pool = (I_DbPool) info.getObject("db.pool");         conn = pool.reserve();         conn.setAutoCommit(true);         String schema = info.get("wipeout.schema", null);         String version = info.get("replication.version", "0.0");         if (schema == null) {            String initialUpdateFile = info.get("initialUpdate.file", null);            if (initialUpdateFile != null) {               specific.initialCommand(null, initialUpdateFile, version);            }            else               specific.cleanup(conn, true);         }         else {            specific.wipeoutSchema(null, schema, WIPEOUT_ALL);         }      }       catch (Throwable e) {         System.err.println("SEVERE: " + e.toString());         e.printStackTrace();         conn = SpecificDefault.removeFromPool(conn, ROLLBACK_NO, pool);      }      finally {         if (pool != null) {            conn = releaseIntoPool(conn, COMMIT_NO, pool);         }      }   }   public void setAttributeTransformer(I_AttributeTransformer transformer) {      this.transformer = transformer;   }      public boolean isDatasourceReadonly() {      return false;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -