📄 replslave.java
字号:
if (subDirName != null) { if (initialFilesLocation != null) { File base = new File(initialFilesLocation.getStringValue().trim()); File complete = new File(base, subDirName.getStringValue().trim()); dirName = complete.getAbsolutePath(); } } changeLastMessage("Manual Data transfer: WAITING (stored on '" + dirName + "')", false); break; // we need to interrupt here: all subsequent entries will be processed later. } // check if the message has to be stored locally ClientProperty endToRemote = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_DATA_END_TO_REMOTE); if (initialFilesLocation != null && (endToRemote == null || !endToRemote.getBooleanValue()) && (endMsg == null || !endMsg.getBooleanValue())) { storeChunkLocally(entry, initialFilesLocation, subDirName); queue.removeRandom(entry); // entries.remove(i); continue; } if (endMsg != null) { log.info("Received msg marking the end of the initial for client '" + this.slaveSessionId + "' update: '" + this.name + "' going into NORMAL operations"); startCascadedAndChangeStatus(); } byte[] content = msgUnit.getContent(); if (content != null) totalSize += content.length; if (totalSize <= this.maxChunkSize || i == 0) remoteEntries.add(entry); else break; } entries = null; // we can free it here since not needed anymore if (this.status == STATUS_NORMAL || this.status == STATUS_INCONSISTENT || this.status == STATUS_UNCONFIGURED) return remoteEntries; ArrayList ret = new ArrayList(); for (int i=0; i < remoteEntries.size(); i++) { ReferenceEntry entry = (ReferenceEntry)remoteEntries.get(i); MsgUnit msgUnit = entry.getMsgUnit(); long replKey = msgUnit.getQosData().getClientProperty(ReplicationConstants.REPL_KEY_ATTR, -1L); /* this is done when acknowledge comes if (replKey > -1L) { setMaxReplKey(replKey, this.tmpTransSeq, this.tmpMsgSeq); } */ log.info("check: processing '" + replKey + "' for client '" + this.slaveSessionId + "' "); if (replKey < 0L) { // this does not come from the normal replication, so these are other messages which we just deliver ClientProperty endMsg = msgUnit.getQosData().getClientProperty(ReplicationConstants.END_OF_TRANSITION); if (endMsg == null) { log.warning("the message unit with qos='" + msgUnit.getQosData().toXml() + "' and key '" + msgUnit.getKey() + "' for client '" + this.slaveSessionId + "' has no 'replKey' Attribute defined."); ret.add(entry); continue; } } log.info("repl entry '" + replKey + "' for range [" + this.minReplKey + "," + this.maxReplKey + "] for client '" + this.slaveSessionId + "' "); if (replKey >= this.minReplKey || this.forceSending) { log.info("repl adding the entry for client '" + this.slaveSessionId + "' "); doTransform(msgUnit); ret.add(entry); /* TODO TEMPORARLY REMOVED FOR TESTING: also test no initial dump and manual transfer if (replKey > this.maxReplKey || this.forceSending) { log.info("entry with replKey='" + replKey + "' is higher than maxReplKey)='" + this.maxReplKey + "' switching to normal operation again for client '" + this.slaveSessionId + "' "); startCascadedAndChangeStatus(); } */ } else { // such messages have been already from the initial update. (obsolete messages are removed) log.info("removing entry with replKey='" + replKey + "' since older than minEntry='" + this.minReplKey + "' for client '" + this.slaveSessionId + "' "); queue.removeRandom(entry); } } // check if there are more than one entry the keep-transaction-flag has to be set: if (ret.size() > 1) { for (int i=0; i < ret.size()-1; i++) { ReferenceEntry entry = (ReferenceEntry)entries.get(i); MsgUnit msgUnit = entry.getMsgUnit(); msgUnit.getQosData().addClientProperty(KEEP_TRANSACTION_OPEN, true); } log.info("Sending '" + ret.size() + "' entries in one single message"); } return ret; } } private final void startCascadedAndChangeStatus() throws Exception { if (this.cascadedReplPrefix != null && this.cascadedReplSlave != null && this.cascadedReplPrefix.trim().length() > 0 && this.cascadedReplSlave.trim().length() > 0) { log.info("initiating the cascaded replication with replication.prefix='" + this.cascadedReplPrefix + "' for slave='" + this.cascadedReplSlave + "'"); this.manager.initiateReplicationNonMBean(this.cascadedReplSlave, this.cascadedReplPrefix, null, null, null); } else { log.info("will not cascade initiation of any further replication for '" + this.name + "' since no cascading defined"); } setStatus(STATUS_NORMAL); } /** * @return Returns the sqlResponse. */ public String getSqlResponse() { return this.sqlResponse; } /** * @param sqlResponse The sqlResponse to set. */ public void setSqlResponse(String sqlResponse) { this.sqlResponse = sqlResponse; } /** * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys() */ public Set getUsedPropertyKeys() { return new HashSet(); } public boolean setDispatcher(boolean status) { try { setDispatcher(status, true); return true; } catch (Exception ex) { log.severe("Exception occured when trying to set the dispatcher to '" + status + "': " + ex.getMessage()); ex.printStackTrace(); return false; } } public final boolean setDispatcher(boolean status, boolean doPersist) throws Exception { I_AdminSession session = getSession(); session.setDispatcherActive(status); if (doPersist) this.persistentInfo.put(this.slaveSessionId + ".dispatcher", "" + status); // to speed up refresh on monitor this.dispatcherActive = session.getDispatcherActive(); return this.dispatcherActive; } /** * @see org.xmlBlaster.contrib.replication.ReplSlaveMBean#doContinue() */ public void doContinue(boolean doPersist) throws Exception { setDispatcher(true, doPersist); } /** * @see org.xmlBlaster.contrib.replication.ReplSlaveMBean#doPause() */ public void doPause(boolean doPersist) throws Exception { setDispatcher(false, doPersist); } public void handleException(Throwable ex) { try { final boolean add = true; if (ex instanceof XmlBlasterException) { XmlBlasterException xmlblEx = ((XmlBlasterException)ex); log.warning(xmlblEx.toXml()); if (xmlblEx.getEmbeddedException() != null) changeLastMessage(xmlblEx.getEmbeddedMessage(), add); else changeLastMessage(ex.getMessage(), add); } else changeLastMessage(ex.getMessage(), add); final boolean doPersist = true; doPause(doPersist); } catch (Throwable e) { log.severe("An exception occured when trying to pause the connection: " + e.getMessage()); ex.printStackTrace(); } } /** * Toggles the dispatcher from active to inactive or vice versa. * Returns the actual state. * @see org.xmlBlaster.contrib.replication.ReplSlaveMBean#toggleActive() * @return the actual state. */ public boolean toggleActive() throws Exception { synchronized(this.initSync) { I_AdminSession session = getSession(); final boolean doPersist = true; return setDispatcher(!session.getDispatcherActive(), doPersist); } } /** * TODO fix this since it potentially could delete request from other slaves since the DbWatcher is serving * several slaves. * Cancels an ongoing initialUpdate Request. */ public void cancelInitialUpdate() throws Exception { if (this.status == STATUS_NORMAL) return; if (!this.initialized) throw new Exception("cancelInitialUpdate: '" + this.name + "' has not been initialized properly or is already shutdown, check your logs"); if (this.dbWatcherSessionName == null) throw new Exception("The DbWatcher Session Id is null, can not cancel"); (new Thread() { public void run() { cancelUpdateAsyncPart(); } }).start(); } /** * The cancelUpdate is invoked asynchronously to avoid log blocking of the monitor * when the cancel operation is going on. */ private void cancelUpdateAsyncPart() { try { I_AdminSession session = getSession(); long clearedMsg = session.clearCallbackQueue(); log.info("clearing of callback queue: '" + clearedMsg + "' where removed since a cancel request was done"); // sending the cancel op to the DbWatcher log.info(this.name + " sends now a cancel request to the Master '" + this.dbWatcherSessionName + "'"); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); // no oid for this ptp message PublishKey pubKey = new PublishKey(this.global, REQUEST_CANCEL_UPDATE_TOPIC); Destination destination = new Destination(new SessionName(this.global, this.dbWatcherSessionName)); destination.forceQueuing(true); PublishQos pubQos = new PublishQos(this.global, destination); pubQos.addClientProperty(ReplicationConstants.SLAVE_NAME, this.slaveSessionId); pubQos.setPersistent(false); MsgUnit msg = new MsgUnit(pubKey, ReplicationConstants.REPL_REQUEST_CANCEL_UPDATE.getBytes(), pubQos); conn.publish(msg); // TODO Check this since it could mess up the current status if one is exaclty finished now //setStatus(STATUS_NORMAL); setStatus(STATUS_INCONSISTENT); } catch (Exception ex) { log.severe("An exception occured when trying to cancel the initial update for '" + this.replPrefix + "'"); ex.printStackTrace(); } } private long clearQueueSync() { long ret = 0L; initTransactionSequenceIfNeeded("clearQueueSync has been invoked before init"); try { ret = getSession().clearCallbackQueue(); transactionSeq = (long[])manager.getCurrentTransactionCount(replPrefix).clone(); ptpQueueEntries = 0L; setMaxReplKey(maxReplKey, transactionSeq, messageSeq, minReplKey, ptpQueueEntries); } catch (Exception ex) { ex.printStackTrace(); } return ret; } public void clearQueue() throws Exception { setStatus(STATUS_INCONSISTENT); log.warning("has been invoked"); (new Thread() { public void run() { clearQueueSync(); } }).start(); } public long removeQueueEntries(long entries) throws Exception { setStatus(STATUS_INCONSISTENT); log.warning("has been invoked with entries='" + entries + "'"); return getSession().removeFromCallbackQueue(entries); } public void kill() throws Exception { getSession().killSession(); } public String reInitiateReplication() throws Exception { return this.manager.initiateReplication(this.slaveSessionId, this.replPrefix + "_Ver_" + this.ownVersion, this.cascadedReplSlave, this.cascadedReplPrefix, this.initialFilesLocation); } public String getReplPrefix() { return this.replPrefix; } public String getReplPrefixGroup() { return this.replPrefixGroup; } public String getVersion() { return this.ownVersion; } /** * Convenience method enforced by the MBean which returns true if the dispatcher of * the slave session is active, false otherwise. */ public boolean isActive() { return this.dispatcherActive; } /** * Convenience method enforced by the MBean which returns the number of entries in * the queue. */ public long getQueueEntries() { if (countSingleMessages) return this.cbQueueEntries; else return this.queueEntries; } /** * Convenience method enforced by the MBean which returns true if the real slave is * connected or false otherwise. */ public boolean isConnected() { return this.connected; } /** * Convenience method enforced by the MBean which returns true if the connection to the * real slave is stalled or false otherwise. */ public boolean isStalled() { return this.stalled; } public String getSessionName() { return this.sessionName; } public String getLastMessage() { return this.lastMessage; } public synchronized void checkStatus() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -