📄 replslave.java
字号:
} } } public boolean reInitiate(I_Info info) throws Exception { final boolean onlyRegister = true; return run(info, this.dbWatcherSessionName, this.cascadedReplPrefix, this.cascadedReplSlave, onlyRegister); } /** * * @param info * @param dbWatcherSessionId * @param cascadeReplPrefix * @param cascadeSlaveSessionName * @param onlyRegister if true it only registers for initial update but does not execute it yet. * It will wait for a further (common) start message. * @return * @throws Exception */ public boolean run(I_Info info, String dbWatcherSessionId, String cascadeReplPrefix, String cascadeSlaveSessionName, boolean onlyRegister) throws Exception { if (this.status != STATUS_NORMAL && this.status != STATUS_INCONSISTENT && this.status != STATUS_UNCONFIGURED) { log.warning("will not start initial update request since one already ongoing for '" + this.name + "'"); return false; } this.persistentInfo.put(this.slaveSessionId + CASCADED_REPL_PREFIX, cascadeReplPrefix); this.persistentInfo.put(this.slaveSessionId + CASCADED_REPL_SLAVE, cascadeSlaveSessionName); info.put(this.slaveSessionId + DBWATCHER_SESSION_NAME, dbWatcherSessionId); init(info); prepareForRequest(info); requestInitialData(dbWatcherSessionId, onlyRegister); return true; } /** * This is the first step in the process of requesting the initial Data. * <ul> * <li>It clears the callback queue of the real slave</li> * <li>It sends a message to the real slave to inform him that * a new initial update has been initiated. This is a PtP * message with a well defined topic, so administrators can * subscribe to it too. * </li> * <li>It then deactivates the callback dispatcher of the real slave</li> * <li>makes a persistent subscription on behalf of the real slave * by passing as a mime access filter an identifier for himself. * </li> * </ul> * @see org.xmlBlaster.contrib.replication.I_ReplSlave#prepareForRequest(I_Info) */ public void prepareForRequest(I_Info individualInfo) throws Exception { if (!this.initialized) throw new Exception("prepareForRequest: '" + this.name + "' has not been initialized properly or is already shutdown, check your logs"); log.info("prepareForRequest"); long clearedMsg = clearQueueSync(); log.info("clearing of callback queue before initiating: '" + clearedMsg + "' where removed since obsolete"); if (this.statusTopic != null) sendStatusInformation("dbInitStart"); final boolean doPersist = true; doPause(doPersist); // stop the dispatcher I_AdminSession session = getSession(); // first unsubscribe (in case it did already an initial update previously, this is needed to remove the subscription // (and thereby its outdate subscription qos from persistence). On a back replication, i.e. where you have more than // one sources you don't want to do this. if (individualInfo.getBoolean("replication.forceNewSubscription", true)) { try { session.unSubscribe(this.dataTopic, ""); } catch (Throwable ex) { } } SubscribeQos subQos = new SubscribeQos(this.global); subQos.setMultiSubscribe(false); subQos.setWantInitialUpdate(false); subQos.setPersistent(true); // this fills the client properties with the contents of the individualInfo object. new ClientPropertiesInfo(subQos.getData().getClientProperties(), individualInfo); session.subscribe(this.dataTopic, subQos.toXml()); synchronized(this.initSync) { setStatus(STATUS_INITIAL); } } private void sendStatusInformation(String status) throws Exception { log.info("send status information '" + status + "'"); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); PublishKey pubKey = new PublishKey(this.global, this.statusTopic); Destination destination = new Destination(new SessionName(this.global, this.slaveSessionId)); destination.forceQueuing(true); PublishQos pubQos = new PublishQos(this.global, destination); pubQos.setPersistent(true); MsgUnit msg = new MsgUnit(pubKey, status.getBytes(), pubQos); conn.publish(msg); } /** * Sends a PtP message to the responsible for the initial update (which is the * DbWatcher or an object running in the DbWatcher jvm) telling a new initial * update has to be initiating. * * @see org.xmlBlaster.contrib.replication.I_ReplSlave#requestInitialData() */ public void requestInitialData(String dbWatcherSessionId, boolean onlyRegister) throws Exception { log.info(this.name + " sends now an initial update request to the Master '" + dbWatcherSessionId + "'"); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); // no oid for this ptp message PublishKey pubKey = new PublishKey(this.global, REQUEST_INITIAL_DATA_TOPIC); Destination destination = new Destination(new SessionName(this.global, dbWatcherSessionId)); destination.forceQueuing(true); PublishQos pubQos = new PublishQos(this.global, destination); pubQos.addClientProperty(ReplicationConstants.SLAVE_NAME, this.slaveSessionId); pubQos.addClientProperty(ReplicationConstants.REPL_VERSION, this.ownVersion); if (this.initialFilesLocation != null) pubQos.addClientProperty(ReplicationConstants.INITIAL_FILES_LOCATION, this.initialFilesLocation); pubQos.setPersistent(true); if (onlyRegister) pubQos.addClientProperty(ReplicationConstants.INITIAL_UPDATE_ONLY_REGISTER, onlyRegister); MsgUnit msg = new MsgUnit(pubKey, ReplicationConstants.REPL_REQUEST_UPDATE.getBytes(), pubQos); conn.publish(msg); } private org.xmlBlaster.engine.ServerScope getEngineGlobal(Global glob) { return (org.xmlBlaster.engine.ServerScope)glob.getObjectEntry(GlobalInfo.ORIGINAL_ENGINE_GLOBAL); } private I_AdminSession getSession() throws Exception { return this.manager.getSession(this.slaveSessionId); } /** * @see org.xmlBlaster.contrib.replication.I_ReplSlave#reactivateDestination() */ public void reactivateDestination(long minReplKey, long maxReplKey) throws Exception { synchronized(this.initSync) { log.info("Initial Operation completed with replication key interval [" + minReplKey + "," + maxReplKey + "]"); if (!this.initialized) throw new Exception("prepareForRequest: '" + this.name + "' has not been initialized properly or is already shutdown, check your logs"); if (STATUS_INCONSISTENT == this.status) { log.warning("Will not change the status to transition since the initialUpdate has been cancelled"); return; } this.minReplKey = minReplKey; this.maxReplKey = maxReplKey; setStatus(STATUS_TRANSITION); final boolean doPersist = true; doContinue(doPersist); } } /** * @see org.xmlBlaster.contrib.dbwriter.I_ContribPlugin#shutdown() */ public void shutdown() { synchronized (this.initSync) { if (!this.initialized) return; this.global.unregisterMBean(this.mbeanHandle); this.initialized = false; } } private final void doTransform(MsgUnit msgUnit) throws Exception { if (this.doTransform) { // ClientProperty prop = msgUnit.getQosData().getClientProperty(ReplicationConstants.DUMP_ACTION); // if (prop == null) { if (msgUnit.getContentMime() != null && msgUnit.getContentMime().equals("text/xml")) { byte[] content = msgUnit.getContent(); InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), msgUnit.getQosData().getClientProperties()); content = ReplManagerPlugin.getContent(is); content = this.manager.transformVersion(this.replPrefix, this.ownVersion, this.slaveSessionId, content); msgUnit.setContent(content); } } } /** * Returns the name of the directory where the entries have been stored. * @param entry The entry to add as a chunk. * @param location The location where to add it. * @param subDirProp * @return * @throws Exception */ private String storeChunkLocally(ReferenceEntry entry, ClientProperty location, ClientProperty subDirProp) throws Exception { if (entry == null) throw new Exception("The entry to store is null, can not store"); MsgUnit msgUnit = entry.getMsgUnit(); if (msgUnit == null) throw new Exception("The msgUnit to store is null, can not store"); if (location == null || location.getStringValue() == null || location.getStringValue().trim().length() < 1) throw new Exception("The location is empty, can not store the message unit '" + msgUnit.getLogId() + "'"); // String fileId = "" + new Timestamp().getTimestamp(); // this way they are automatically sorted and in case of a repeated write it simply would be overwritten. String fileId = entry.getPriority() + "-" + entry.getUniqueId(); String pathName = location.getStringValue().trim(); File dirWhereToStore = ReplManagerPlugin.checkExistance(pathName); if (subDirProp == null) throw new Exception("The property to define the file name (dataId) is not set, can not continue"); String subDirName = subDirProp.getStringValue(); if (subDirName == null || subDirName.trim().length() < 1) throw new Exception("The subdirectory to be used to store the initial data is empty"); File subDir = new File(dirWhereToStore, subDirName); String completeSubdirName = subDir.getAbsolutePath(); if (!subDir.exists()) { if (!subDir.mkdir()) { String txt = "could not make '" + completeSubdirName + "' to be a directory. Check your rights"; log.severe(txt); throw new Exception(txt); } } File file = new File(subDir, fileId); if (file.exists()) log.warning("File '" + file.getAbsolutePath() + "' exists already. Will overwrite it"); FileOutputStream fos = new FileOutputStream(file); MsgUnitRaw msgUnitRaw = new MsgUnitRaw(msgUnit.getKey(), msgUnit.getContent(), msgUnit.getQos()); MsgInfo msgInfo = new MsgInfo(this.global, MsgInfo.INVOKE_BYTE, MethodName.UPDATE_ONEWAY, this.slaveSessionId); msgInfo.addMessage(msgUnitRaw); XmlScriptParser parser = new XmlScriptParser(); parser.init(new Global(), null, null); fos.write(parser.toLiteral(msgInfo).getBytes()); fos.close(); log.info("MsgUnit '" + msgUnit.getQosData().getRcvTimestamp().getTimestamp() + "' has been written to file '" + file.getAbsolutePath() + "'"); return completeSubdirName; } /** * * @param newMsg If newMsg is null, it cleans the message otherwise the behaviour depens on doAdd * @param doAdd if true, the message is added to the current message, if false it is replaced. */ private void changeLastMessage(String newMsg, boolean doAdd) { log.fine("'" + newMsg + "' invoked with add='" + doAdd + "'"); if (newMsg == null) { if (this.lastMessage != null && this.lastMessage.length() > 0) { this.lastMessage = ""; this.persistentInfo.put(this.lastMessageKey, this.lastMessage); } } else { if (doAdd) this.lastMessage += "\n" + newMsg.trim(); else this.lastMessage = newMsg.trim(); this.persistentInfo.put(this.lastMessageKey, this.lastMessage); } } /* private void calculateCounters(MsgQueueEntry[] entries) throws XmlBlasterException { if (entries.length > 0) { for (int i=entries.length-1; i > -1; i--) { ReferenceEntry entry = (ReferenceEntry)entries[i]; if (log.isLoggable(Level.FINEST)) { String txt = new String(decompressQueueEntryContent(entry)); log.finest("Processing entry '" + txt + "' for client '" + this.name + "'"); } MsgUnit msgUnit = entry.getMsgUnit(); long tmpCounter = this.tmpTransSeq + msgUnit.getQosData().getClientProperty(ReplicationConstants.NUM_OF_TRANSACTIONS, 1L); //long tmpCounter = msgUnit.getQosData().getClientProperty(ReplicationConstants.TRANSACTION_SEQ, 0L); if (tmpCounter != 0L) this.tmpTransSeq = tmpCounter; this.tmpReplKey = msgUnit.getQosData().getClientProperty(ReplicationConstants.REPL_KEY_ATTR, -1L); tmpCounter = msgUnit.getQosData().getClientProperty(ReplicationConstants.MESSAGE_SEQ, 0L); if (tmpCounter != 0L) this.tmpMsgSeq = tmpCounter; if (this.tmpReplKey > -1L) { break; // the other messages will have lower numbers (if any) so we break for performance. } } } } */ /** * */ public ArrayList check(ArrayList entries, I_Queue queue) throws Exception { this.queue = queue; synchronized (this.initSync) { this.tmpStatus = -1; this.forcedCounter++; log.info("check invoked with status '" + getStatus() + "' for client '" + this.slaveSessionId + "' (invocation since start is '" + this.forcedCounter + "')"); if (!this.initialized) { log.warning("check invoked without having been initialized. Will repeat operation until the real client connects"); Thread.sleep(250L); // to avoid too fast looping return new ArrayList(); } if (this.status == STATUS_INITIAL && !this.forceSending) { // should not happen since Dispatcher is set to false log.warning("check invoked in INITIAL STATUS. Will stop the dispatcher"); final boolean doPersist = true; doPause(doPersist); return new ArrayList(); } changeLastMessage(null, false); // clean last message // if (entries != null && entries.size() > 1) // log.severe("the entries are '" + entries.size() + "' but we currently only can process one single entry at a time"); // check if already processed ... and at the same time do the versioning transformation (if needed) for (int i=entries.size()-1; i > -1; i--) { ReferenceEntry entry = (ReferenceEntry)entries.get(i); MsgUnit msgUnit = entry.getMsgUnit(); ClientProperty alreadyProcessed = msgUnit.getQosData().getClientProperty(ReplicationConstants.ALREADY_PROCESSED_ATTR); if (alreadyProcessed != null) { log.warning("Received entry for client '" + this.slaveSessionId + "' which was already processed. Will remove it"); queue.removeRandom(entry); entries.remove(i); } else doTransform(msgUnit); } // check if one of the messages is the transition end tag, also check if the total size is exceeded ArrayList remoteEntries = new ArrayList(); long totalSize = 0L; for (int i=0; i < entries.size(); i++) { ReferenceEntry entry = (ReferenceEntry)entries.get(i); MsgUnit msgUnit = entry.getMsgUnit(); ClientProperty endMsg = msgUnit.getQosData().getClientProperty(ReplicationConstants.END_OF_TRANSITION); // check if the message is the end of the data (only sent in case the initial data has to be stored on // file in which case the dispatcher shall return in its waiting state. ClientProperty endOfData = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_DATA_END); ClientProperty initialFilesLocation = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_FILES_LOCATION); ClientProperty subDirName = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_DATA_ID); if (endOfData != null) { final boolean doPersist = true; doPause(doPersist); queue.removeRandom(entry); // entries.remove(i); // endOfData will be kept locally, not sent to slave String dirName = "unknown";
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -