📄 replslave.java
字号:
if (this.replPrefix == null) return; log.finest("invoked for '" + this.sessionName + "'"); I_AdminSession session = null; try { session = getSession(); } catch (Exception ex) { log.severe("an exception occured when retieving the session for '" + this.sessionName + "':" + ex.getMessage()); ex.printStackTrace(); return; } try { this.cbQueueEntries = session.getCbQueueNumMsgs(); // this.messageSeq, long[] transactionCountBeforeQueue = this.manager.getCurrentTransactionCount(this.replPrefix); // check if the numbers in the queue are correct and fix it long pubSubQueueEntries = 0L; long maxTransSeq = transactionCountBeforeQueue[0]; for (int i=0; i < this.transactionSeq.length; i++) { pubSubQueueEntries += (transactionCountBeforeQueue[i] - this.transactionSeq[i]); if (maxTransSeq < transactionCountBeforeQueue[i]) maxTransSeq = transactionCountBeforeQueue[i]; } this.queueEntries = pubSubQueueEntries + this.ptpQueueEntries; this.transactionSeqVisible = maxTransSeq - pubSubQueueEntries; if (this.queueEntries != 0 && session != null && session.getCbQueueNumMsgs() == 0) { log.warning("Detected wrong number of queue entries: correcting: ptp entries='" + this.ptpQueueEntries + "' total='" + this.queueEntries + "'"); this.ptpQueueEntries = 0L; this.transactionSeq = (long[])transactionCountBeforeQueue.clone(); } } catch (Exception ex) { log.severe("an exception occured when retieving the number of queue entries for '" + this.sessionName + "':" + ex.getMessage()); ex.printStackTrace(); this.queueEntries = -1L; } // isActive try { this.dispatcherActive = session.getDispatcherActive(); } catch (Exception ex) { log.severe("an exception occured when retieving the status of the dispatcher for '" + this.sessionName + "':" + ex.getMessage()); ex.printStackTrace(); this.dispatcherActive = false; } try { I_AdminSession masterSession = this.manager.getMasterSession(this.replPrefix); if (masterSession != null) { if (masterSession.isStalled()) this.masterConn = CONN_STALLED; else if (masterSession.getConnectionState().equals(ConnectionStateEnum.ALIVE.toString())) this.masterConn = CONN_CONNECTED; else this.masterConn = CONN_DISCONNECTED; } else { this.masterConn = CONN_DISCONNECTED; } } catch (Exception ex) { this.masterConn = CONN_DISCONNECTED; ex.printStackTrace(); } // isConnected try { this.connected = session.getConnectionState().equals(ConnectionStateEnum.ALIVE.toString()); } catch (Exception ex) { log.severe("an exception occured when checking if connected for '" + this.sessionName + "':" + ex.getMessage()); ex.printStackTrace(); this.connected = false; } // isStalled try { this.stalled = session.isStalled(); } catch (Exception ex) { log.severe("an exception occured when checking if stalled for '" + this.sessionName + "':" + ex.getMessage()); ex.printStackTrace(); this.stalled = false; } // sessionName try { this.sessionName = session.getLoginName() + "/" + session.getPublicSessionId(); } catch (Exception ex) { log.severe("an exception occured when getting the session name:" + ex.getMessage()); ex.printStackTrace(); } // lastMessage try { String tmp = session.getLastCallbackException(); if (!this.lastDispatcherException.equals(tmp)) { this.lastDispatcherException = tmp; final boolean add = true; changeLastMessage(tmp, add); } } catch (Exception ex) { log.severe("an exception occured when getting the last dispatcher exception for '" + this.sessionName + "':" + ex.getMessage()); ex.printStackTrace(); } } public void postCheck(MsgUnit[] processedMsgUnits) throws Exception { initTransactionSequenceIfNeeded("postCheck has been invoked before init"); if (processedMsgUnits == null) { log.severe("The processed Message Units are null"); return; } synchronized(this) { long msgSeq = 0L; long tmpReplKey = -1L; if (processedMsgUnits.length > 0) { for (int i=0; i < processedMsgUnits.length; i++) { MsgUnit msgUnit = processedMsgUnits[i]; long numOfTransactions = msgUnit.getQosData().getClientProperty(ReplicationConstants.NUM_OF_TRANSACTIONS, 1L); if (numOfTransactions > 0L) { long tmpTransactionSeq = msgUnit.getQosData().getClientProperty(ReplicationConstants.TRANSACTION_SEQ, -1L); int prio = ((MsgQosData)msgUnit.getQosData()).getPriority().getInt(); boolean absoluteCount = msgUnit.getQosData().getClientProperty(ReplicationConstants.ABSOLUTE_COUNT, false); if (tmpTransactionSeq != -1L && absoluteCount) { // in case the ReplManagerPlugin is not configured as a MimePlugin this.transactionSeq[prio] = tmpTransactionSeq; } else { if (tmpTransactionSeq > this.transactionSeq[5]) // Hack to be removed later (needs always MIME Plugin) TODO this.transactionSeq[prio] += numOfTransactions; } msgSeq = msgUnit.getQosData().getClientProperty(ReplicationConstants.MESSAGE_SEQ, 0L); tmpReplKey = msgUnit.getQosData().getClientProperty(ReplicationConstants.REPL_KEY_ATTR, -1L); } else { // check if an initial data if (numOfTransactions < 0L) { String topicName = msgUnit.getKeyData().getOid(); if (this.initialDataTopic != null && this.initialDataTopic.equalsIgnoreCase(topicName)) { this.ptpQueueEntries += numOfTransactions; // negative number so it will decrement } } } } } setMaxReplKey(tmpReplKey, this.transactionSeq, msgSeq, this.minReplKey, this.ptpQueueEntries); if (this.tmpStatus > -1) setStatus(this.tmpStatus); } } public long getTransactionSeq() { if (countSingleMessages) return this.maxReplKey; else return this.transactionSeqVisible; } public static byte[] decompressQueueEntryContent(ReferenceEntry entry) { try { MsgUnit msgUnit = entry.getMsgUnit(); if (msgUnit.getContent() == null) return new byte[0]; byte[] content = (byte[])msgUnit.getContent().clone(); Map cp = new HashMap(msgUnit.getQosData().getClientProperties()); return ReplManagerPlugin.getContent(MomEventEngine.decompress(new ByteArrayInputStream(content), cp)); } catch (Exception ex) { ex.printStackTrace(); return new byte[0]; } } public String dumpEntries(int maxNum, long maxSize, String fileName) { if (this.queue == null) return "The queue is null, the replication must first try to deliver one entry before you can invoke this method"; if (this.queue.getNumOfEntries() == 0) return "The queue for the slave '" + this.name + "' is empty: not dumping anything"; try { ArrayList list = this.queue.peek(maxNum, maxSize); FileOutputStream out = new FileOutputStream(fileName); for (int i=0; i < list.size(); i++) { ReferenceEntry entry = (ReferenceEntry)list.get(i); byte[] ret = decompressQueueEntryContent(entry); out.write(ret); } out.close(); String txt = "successfully dumped " + list.size() + " entries on file '" + fileName + "'"; log.info(txt); return txt; } catch (IOException ex) { String txt = "Could not dump entries because of exception: " + ex.getMessage(); log.severe(txt); ex.printStackTrace(); return txt; } catch (Exception ex) { String txt = "Could not dump entries because of exception: " + ex.getMessage(); log.severe(txt); ex.printStackTrace(); return txt; } } public String dumpFirstEntry() { String prefix = this.initialFilesLocation; if (prefix == null) prefix = System.getProperty("user.home"); String name = this.name.replace('/', '-'); String filename = prefix + "/" + name + ".qdmp"; return dumpEntries(1, -1L, filename); } // The following methods are used for JMX to represent the associated / cascaded MBean /** * Returns null if the manager is null or if the cascaded object does not exist. */ private ReplSlave getCascaded() { if (this.manager == null) return null; return (ReplSlave)this.manager.getSlave(this.cascadedReplSlave); } public boolean isCascading() { return getCascaded() != null; } /** * */ public String getCascadedSessionName() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.getSessionName(); return ""; } public long getCascadedQueueEntries() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.getQueueEntries(); return 0L; } public long getCascadedTransactionSeq() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.getTransactionSeq(); return -1L; } public String getCascadedStatus() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.getStatus(); return "empty"; } public boolean isCascadedActive() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.isActive(); return false; } public boolean isCascadedConnected() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.isConnected(); return false; } public String getCascadedVersion() { ReplSlave cascaded = getCascaded(); if (cascaded != null) return cascaded.getVersion(); return ""; } public String toString() { return this.sessionName; } /** * Returns a string telling in which state the connection is. It can be stalled, connected or disconnected. * @return */ public String getConnection() { if (isStalled()) return CONN_STALLED; if (isConnected()) return CONN_CONNECTED; return CONN_DISCONNECTED; } public String getMasterConnection() { return this.masterConn; } public String getCascadedConnection() { ReplSlave cascadedSlave = getCascaded(); if (cascadedSlave == null) return CONN_DISCONNECTED; return cascadedSlave.getConnection(); } public String getCascadedMasterConnection() { ReplSlave cascadedSlave = getCascaded(); if (cascadedSlave == null) return CONN_DISCONNECTED; return cascadedSlave.getMasterConnection(); } public void incrementPtPEntries(long numOfTransactions) { initTransactionSequenceIfNeeded("incrementPtPEntries has been invoked before init with numOfTransactions='" + numOfTransactions + "'"); synchronized(this) { this.ptpQueueEntries += numOfTransactions; // we want to store it setMaxReplKey(this.maxReplKey, this.transactionSeq, this.messageSeq, this.minReplKey, this.ptpQueueEntries); } } public void setCountSingleMsg(boolean countSingleMsg) { this.countSingleMessages = countSingleMsg; } public boolean isCountSingleMsg() { return countSingleMessages; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -