⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 initialupdater.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
      this.initialCmdPath = this.info.get("replication.path", "${user.home}/tmp");      log.fine("replication.path='" + this.initialCmdPath + "'");      this.initialCmd = this.info.get("replication.initialCmd", null);      if (this.initialCmd != null && this.initialCmd.trim().length() < 1) // if emtpy         this.initialCmd = null;      this.initialCmdPre = info.get("replication.initialCmdPre", null);      this.keepDumpFiles = info.getBoolean("replication.keepDumpFiles", false);      // this.stringToCheck = info.get("replication.initial.stringToCheck", "rows exported");      this.stringToCheck = info.get("replication.initial.stringToCheck", null);      this.initialDataTopic = info.get("replication.initialDataTopic", "replication.initialData");      String currentVersion = this.info.get("replication.version", "0.0");      // this is only needed on the master side      this.info.put(SUPPORTED_VERSIONS, getSupportedVersions(currentVersion));      this.initialDumpAsXml = this.info.getBoolean("replication.initialDumpAsXml", false);      this.initialDumpMaxSize = this.info.getInt("replication.initialDumpMaxSize", 1048576);             if (this.initialDumpAsXml)         this.initialDumpMaxSize = (int)(0.666 * this.initialDumpMaxSize);               boolean needsPublisher = this.info.getBoolean(I_DbSpecific.NEEDS_PUBLISHER_KEY, true);      if (needsPublisher) {         this.info.putObject("_connectionStateListener", this);         this.publisher = DbWatcher.getChangePublisher(this.info);      }            // registering this instance to the Replication Manager      HashMap subscriptionMap = new HashMap();      subscriptionMap.put("ptp", "true");      if (this.publisher != null) {         replSourceEngine = new ReplSourceEngine(replPrefix, publisher, this);          this.publisher.registerAlertListener(this, subscriptionMap);      }      this.initialCmdSleepDelay = this.info.getLong("replication.initialCmd.sleepDelay", 10L);            // rewrite the default behaviour of the timestamp detector to detect even UPDATES (deletes are also updates)      /*      boolean detectUpdates = this.info.getBoolean("detector.detectUpdates", false);      if (detectUpdates)         throw new Exception("You have configured the DbWatcher to have 'detector.detectUpdates=true'. This is not allowed in replication");      log.info("overwriting the default for 'detector.detectUpdates' from 'true' to 'false' since we are in replication");      this.info.put("detector.detectUpdates", "" + false);      */      // used for versioning (shall be passed to the ConnectQos when connecting (make sure this is      // invoked before the mom connects   }   /**    * @see I_DbSpecific#shutdown()    */   public final void shutdown() throws Exception {      try {         if (this.publisher != null) {            log.info("going to shutdown: cleaning up resources");            this.publisher.shutdown();            this.publisher = null;         }      }       catch (Throwable e) {         e.printStackTrace();         log.warning(e.toString());      }   }   /**    * Publishes a 'CREATE TABLE' operation to the XmlBlaster. It is used on the    * DbWatcher side. Note that it is also used to publish the INSERT commands    * related to a CREATE TABLE operation, i.e. if on a CREATE TABLE operation    * it is found that the table is already populated when reading it, then    * these INSERT operations are published with this method.    *     * @param counter    *           The counter indicating which message number it is. The create    *           opeation itself will have '0', the subsequent associated INSERT    *           operations will have an increasing number (it is the number of    *           the message not the number of the associated INSERT operation).    * @param destination in case it is a ptp it is sent only to that destination, otherwise it is sent as a pub/sub    * @return a uniqueId identifying this publish operation.    *     * @throws Exception    */   public final String publishCreate(int counter, SqlInfo updateInfo, long newReplKey, String destination) throws Exception {      log.info("publishCreate invoked for counter '" + counter + "'");      SqlDescription description = updateInfo.getDescription();            description.setAttribute(new ClientProperty(CREATE_COUNTER_KEY, "int",            null, "" + counter));      description.setAttribute(new ClientProperty(EXTRA_REPL_KEY_ATTR, null, null, "" + newReplKey));      if (counter == 0) {         description.setCommand(CREATE_ACTION);         description.setAttribute(new ClientProperty(               ACTION_ATTR, null, null,               CREATE_ACTION));      } else {         description.setCommand(REPLICATION_CMD);         description.setAttribute(new ClientProperty(               ACTION_ATTR, null, null,               INSERT_ACTION));      }      Map map = new HashMap();      map.put("_command", "CREATE");      if (destination != null)         map.put("_destination", destination);      // and later put the part number inside      map.put(ContribConstants.TOPIC_NAME, this.initialDataTopic);      if (this.publisher == null) {         log.warning("SpecificDefaut.publishCreate publisher is null, can not publish. Check your configuration");         return null;      }      else         return this.publisher.publish("createTableMsg", updateInfo.toXml("").getBytes(), map);   }   /**    * Sending this message will reactivate the Dispatcher of the associated slave    * @param topic    * @param filename    * @param replManagerAddress    * @param slaveName    * @param minKey    * @param maxKey    * @throws Exception    */   public final void sendInitialDataResponseOnly(String[] slaveSessionNames, String replManagerAddress, long minKey, long maxKey) throws Exception {      if (replSourceEngine != null)         replSourceEngine.sendInitialDataResponse(slaveSessionNames, replManagerAddress, minKey, maxKey);   }      public final void sendInitialDataResponse(String[] slaveSessionNames, String shortFilename, String replManagerAddress, long minKey, long maxKey, String requestedVersion, String currentVersion, String initialFilesLocation) throws Exception {      // in this case they are just decorators around I_ChangePublisher      if (this.publisher == null) {         if (shortFilename == null)            shortFilename = "no file (since no initial data)";         log.warning("The publisher has not been initialized, can not publish message for '" + shortFilename + "'");         return;      }      XBSession session = this.publisher.getJmsSession();      // XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(topic, null));            XBDestination dest = new XBDestination(this.initialDataTopic, SpecificDefault.toString(slaveSessionNames));            XBMessageProducer producer = new XBMessageProducer(session, dest);      producer.setPriority(PriorityEnum.HIGH_PRIORITY.getInt());      producer.setDeliveryMode(DeliveryMode.PERSISTENT);            String dumpId = "" + new Timestamp().getTimestamp();      // now read the file which has been generated      String filename = null;      if (shortFilename != null) {         log.info("sending initial file '" + shortFilename + "' for user '" + SpecificDefault.toString(slaveSessionNames)  + "'");         if (this.initialCmdPath != null)            filename = this.initialCmdPath + File.separator + shortFilename;         else            filename = shortFilename;         File file = new File(filename);                  FileInputStream fis = new FileInputStream(file);                  XBStreamingMessage msg = session.createStreamingMessage(this);         msg.setIntProperty(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, this.initialDumpMaxSize);         msg.setStringProperty(FILENAME_ATTR, shortFilename);         msg.setLongProperty(REPL_KEY_ATTR, minKey);         msg.setStringProperty(DUMP_ACTION, "true");         if (initialFilesLocation != null) {            msg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);            msg.setStringProperty(INITIAL_DATA_ID, dumpId);         }         msg.setInputStream(fis);         producer.send(msg);         // make a version copy if none exists yet         boolean doDelete = true;         if (currentVersion != null) {            String backupFileName = this.initialCmdPath + File.separator + VersionTransformerCache.buildFilename(this.replPrefix, currentVersion);            File backupFile = new File(backupFileName);            if (!backupFile.exists()) {               final boolean copy = true;               if (copy) {                  BufferedInputStream bis = new BufferedInputStream(file.toURL().openStream());                  FileOutputStream os = new FileOutputStream(backupFileName);                  long length = file.length();                  long remaining = length;                  byte[] buf = new byte[this.initialDumpMaxSize];                  while (remaining > 0) {                     int tot = bis.read(buf);                     remaining -= tot;                     os.write(buf, 0, tot);                  }                  bis.close();                  os.close();               }               else {                  boolean ret = file.renameTo(backupFile);                  if (!ret)                     log.severe("could not move the file '" + filename + "' to '" + backupFileName + "' reason: could it be that the destination is not a local file system ? try the flag 'copyOnMove='true' (see http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.filepoller.html");                  else                     doDelete = false;               }            }         }         else            log.severe("The version is not set. Can not make a backup copy of the version file");                 boolean isRequestingCurrentVersion = currentVersion.equalsIgnoreCase(requestedVersion);          if (!this.keepDumpFiles && doDelete && isRequestingCurrentVersion) {            if (file.exists()) {                boolean ret = file.delete();               if (!ret)                  log.warning("could not delete the file '" + filename + "'");            }         }         fis.close();      }      else         log.info("initial update requested with no real initial data for '" + SpecificDefault.toString(slaveSessionNames) + "' and for replication '" + this.replPrefix + "'");      // send the message for the status change      if (initialFilesLocation != null) {         // then we save it in a file but we must tell it is finished now         TextMessage  endMsg = session.createTextMessage();         endMsg.setText("INITIAL UPDATE WILL BE STORED UNDER '" + initialFilesLocation + "'");         endMsg.setBooleanProperty(INITIAL_DATA_END, true);         endMsg.setStringProperty(INITIAL_DATA_ID, dumpId);         endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);         producer.send(endMsg);         endMsg = session.createTextMessage();         endMsg.setText("INITIAL UPDATE WILL BE STORED UNDER '" + initialFilesLocation + "' (going to remote)");         endMsg.setBooleanProperty(INITIAL_DATA_END_TO_REMOTE, true);         endMsg.setStringProperty(INITIAL_DATA_ID, dumpId);         endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);         producer.send(endMsg);      }      sendInitialDataResponseOnly(slaveSessionNames, replManagerAddress, minKey, maxKey);      if (replSourceEngine != null)         replSourceEngine.sendEndOfTransitionMessage(info, session, initialFilesLocation, shortFilename, dumpId, producer);   }         /**    * This method is used where the end of transition message has to be sent separately (for example for read-only applications without triggers)    * @param slaveSessionNames    * @throws JMSException    */   public void sendEndOfTransitionMessage(String[] slaveSessionNames) throws JMSException {      if (replSourceEngine != null)         replSourceEngine.sendEndOfTransitionMessage(info, initialDataTopic, slaveSessionNames);   }      /**    * Executes an Operating System command.    *     * @param cmd    * @throws Exception    */   private void osExecute(String[] slaveNames, String cmd, ConnectionInfo connInfo) throws Exception {      try {         // if (Execute.isWindows()) cmd = "cmd " + cmd;         String[] args = ReplaceVariable.toArray(cmd, " ");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -