📄 initialupdater.java
字号:
this.initialCmdPath = this.info.get("replication.path", "${user.home}/tmp"); log.fine("replication.path='" + this.initialCmdPath + "'"); this.initialCmd = this.info.get("replication.initialCmd", null); if (this.initialCmd != null && this.initialCmd.trim().length() < 1) // if emtpy this.initialCmd = null; this.initialCmdPre = info.get("replication.initialCmdPre", null); this.keepDumpFiles = info.getBoolean("replication.keepDumpFiles", false); // this.stringToCheck = info.get("replication.initial.stringToCheck", "rows exported"); this.stringToCheck = info.get("replication.initial.stringToCheck", null); this.initialDataTopic = info.get("replication.initialDataTopic", "replication.initialData"); String currentVersion = this.info.get("replication.version", "0.0"); // this is only needed on the master side this.info.put(SUPPORTED_VERSIONS, getSupportedVersions(currentVersion)); this.initialDumpAsXml = this.info.getBoolean("replication.initialDumpAsXml", false); this.initialDumpMaxSize = this.info.getInt("replication.initialDumpMaxSize", 1048576); if (this.initialDumpAsXml) this.initialDumpMaxSize = (int)(0.666 * this.initialDumpMaxSize); boolean needsPublisher = this.info.getBoolean(I_DbSpecific.NEEDS_PUBLISHER_KEY, true); if (needsPublisher) { this.info.putObject("_connectionStateListener", this); this.publisher = DbWatcher.getChangePublisher(this.info); } // registering this instance to the Replication Manager HashMap subscriptionMap = new HashMap(); subscriptionMap.put("ptp", "true"); if (this.publisher != null) { replSourceEngine = new ReplSourceEngine(replPrefix, publisher, this); this.publisher.registerAlertListener(this, subscriptionMap); } this.initialCmdSleepDelay = this.info.getLong("replication.initialCmd.sleepDelay", 10L); // rewrite the default behaviour of the timestamp detector to detect even UPDATES (deletes are also updates) /* boolean detectUpdates = this.info.getBoolean("detector.detectUpdates", false); if (detectUpdates) throw new Exception("You have configured the DbWatcher to have 'detector.detectUpdates=true'. This is not allowed in replication"); log.info("overwriting the default for 'detector.detectUpdates' from 'true' to 'false' since we are in replication"); this.info.put("detector.detectUpdates", "" + false); */ // used for versioning (shall be passed to the ConnectQos when connecting (make sure this is // invoked before the mom connects } /** * @see I_DbSpecific#shutdown() */ public final void shutdown() throws Exception { try { if (this.publisher != null) { log.info("going to shutdown: cleaning up resources"); this.publisher.shutdown(); this.publisher = null; } } catch (Throwable e) { e.printStackTrace(); log.warning(e.toString()); } } /** * Publishes a 'CREATE TABLE' operation to the XmlBlaster. It is used on the * DbWatcher side. Note that it is also used to publish the INSERT commands * related to a CREATE TABLE operation, i.e. if on a CREATE TABLE operation * it is found that the table is already populated when reading it, then * these INSERT operations are published with this method. * * @param counter * The counter indicating which message number it is. The create * opeation itself will have '0', the subsequent associated INSERT * operations will have an increasing number (it is the number of * the message not the number of the associated INSERT operation). * @param destination in case it is a ptp it is sent only to that destination, otherwise it is sent as a pub/sub * @return a uniqueId identifying this publish operation. * * @throws Exception */ public final String publishCreate(int counter, SqlInfo updateInfo, long newReplKey, String destination) throws Exception { log.info("publishCreate invoked for counter '" + counter + "'"); SqlDescription description = updateInfo.getDescription(); description.setAttribute(new ClientProperty(CREATE_COUNTER_KEY, "int", null, "" + counter)); description.setAttribute(new ClientProperty(EXTRA_REPL_KEY_ATTR, null, null, "" + newReplKey)); if (counter == 0) { description.setCommand(CREATE_ACTION); description.setAttribute(new ClientProperty( ACTION_ATTR, null, null, CREATE_ACTION)); } else { description.setCommand(REPLICATION_CMD); description.setAttribute(new ClientProperty( ACTION_ATTR, null, null, INSERT_ACTION)); } Map map = new HashMap(); map.put("_command", "CREATE"); if (destination != null) map.put("_destination", destination); // and later put the part number inside map.put(ContribConstants.TOPIC_NAME, this.initialDataTopic); if (this.publisher == null) { log.warning("SpecificDefaut.publishCreate publisher is null, can not publish. Check your configuration"); return null; } else return this.publisher.publish("createTableMsg", updateInfo.toXml("").getBytes(), map); } /** * Sending this message will reactivate the Dispatcher of the associated slave * @param topic * @param filename * @param replManagerAddress * @param slaveName * @param minKey * @param maxKey * @throws Exception */ public final void sendInitialDataResponseOnly(String[] slaveSessionNames, String replManagerAddress, long minKey, long maxKey) throws Exception { if (replSourceEngine != null) replSourceEngine.sendInitialDataResponse(slaveSessionNames, replManagerAddress, minKey, maxKey); } public final void sendInitialDataResponse(String[] slaveSessionNames, String shortFilename, String replManagerAddress, long minKey, long maxKey, String requestedVersion, String currentVersion, String initialFilesLocation) throws Exception { // in this case they are just decorators around I_ChangePublisher if (this.publisher == null) { if (shortFilename == null) shortFilename = "no file (since no initial data)"; log.warning("The publisher has not been initialized, can not publish message for '" + shortFilename + "'"); return; } XBSession session = this.publisher.getJmsSession(); // XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(topic, null)); XBDestination dest = new XBDestination(this.initialDataTopic, SpecificDefault.toString(slaveSessionNames)); XBMessageProducer producer = new XBMessageProducer(session, dest); producer.setPriority(PriorityEnum.HIGH_PRIORITY.getInt()); producer.setDeliveryMode(DeliveryMode.PERSISTENT); String dumpId = "" + new Timestamp().getTimestamp(); // now read the file which has been generated String filename = null; if (shortFilename != null) { log.info("sending initial file '" + shortFilename + "' for user '" + SpecificDefault.toString(slaveSessionNames) + "'"); if (this.initialCmdPath != null) filename = this.initialCmdPath + File.separator + shortFilename; else filename = shortFilename; File file = new File(filename); FileInputStream fis = new FileInputStream(file); XBStreamingMessage msg = session.createStreamingMessage(this); msg.setIntProperty(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, this.initialDumpMaxSize); msg.setStringProperty(FILENAME_ATTR, shortFilename); msg.setLongProperty(REPL_KEY_ATTR, minKey); msg.setStringProperty(DUMP_ACTION, "true"); if (initialFilesLocation != null) { msg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation); msg.setStringProperty(INITIAL_DATA_ID, dumpId); } msg.setInputStream(fis); producer.send(msg); // make a version copy if none exists yet boolean doDelete = true; if (currentVersion != null) { String backupFileName = this.initialCmdPath + File.separator + VersionTransformerCache.buildFilename(this.replPrefix, currentVersion); File backupFile = new File(backupFileName); if (!backupFile.exists()) { final boolean copy = true; if (copy) { BufferedInputStream bis = new BufferedInputStream(file.toURL().openStream()); FileOutputStream os = new FileOutputStream(backupFileName); long length = file.length(); long remaining = length; byte[] buf = new byte[this.initialDumpMaxSize]; while (remaining > 0) { int tot = bis.read(buf); remaining -= tot; os.write(buf, 0, tot); } bis.close(); os.close(); } else { boolean ret = file.renameTo(backupFile); if (!ret) log.severe("could not move the file '" + filename + "' to '" + backupFileName + "' reason: could it be that the destination is not a local file system ? try the flag 'copyOnMove='true' (see http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.filepoller.html"); else doDelete = false; } } } else log.severe("The version is not set. Can not make a backup copy of the version file"); boolean isRequestingCurrentVersion = currentVersion.equalsIgnoreCase(requestedVersion); if (!this.keepDumpFiles && doDelete && isRequestingCurrentVersion) { if (file.exists()) { boolean ret = file.delete(); if (!ret) log.warning("could not delete the file '" + filename + "'"); } } fis.close(); } else log.info("initial update requested with no real initial data for '" + SpecificDefault.toString(slaveSessionNames) + "' and for replication '" + this.replPrefix + "'"); // send the message for the status change if (initialFilesLocation != null) { // then we save it in a file but we must tell it is finished now TextMessage endMsg = session.createTextMessage(); endMsg.setText("INITIAL UPDATE WILL BE STORED UNDER '" + initialFilesLocation + "'"); endMsg.setBooleanProperty(INITIAL_DATA_END, true); endMsg.setStringProperty(INITIAL_DATA_ID, dumpId); endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation); producer.send(endMsg); endMsg = session.createTextMessage(); endMsg.setText("INITIAL UPDATE WILL BE STORED UNDER '" + initialFilesLocation + "' (going to remote)"); endMsg.setBooleanProperty(INITIAL_DATA_END_TO_REMOTE, true); endMsg.setStringProperty(INITIAL_DATA_ID, dumpId); endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation); producer.send(endMsg); } sendInitialDataResponseOnly(slaveSessionNames, replManagerAddress, minKey, maxKey); if (replSourceEngine != null) replSourceEngine.sendEndOfTransitionMessage(info, session, initialFilesLocation, shortFilename, dumpId, producer); } /** * This method is used where the end of transition message has to be sent separately (for example for read-only applications without triggers) * @param slaveSessionNames * @throws JMSException */ public void sendEndOfTransitionMessage(String[] slaveSessionNames) throws JMSException { if (replSourceEngine != null) replSourceEngine.sendEndOfTransitionMessage(info, initialDataTopic, slaveSessionNames); } /** * Executes an Operating System command. * * @param cmd * @throws Exception */ private void osExecute(String[] slaveNames, String cmd, ConnectionInfo connInfo) throws Exception { try { // if (Execute.isWindows()) cmd = "cmd " + cmd; String[] args = ReplaceVariable.toArray(cmd, " ");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -