📄 replslave.java
字号:
/*------------------------------------------------------------------------------Name: ReplSlave.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.io.ByteArrayInputStream;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.contrib.ClientPropertiesInfo;import org.xmlBlaster.contrib.GlobalInfo;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.MomEventEngine;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.replication.impl.ReplManagerPlugin;import org.xmlBlaster.util.Global;import org.xmlBlaster.engine.admin.I_AdminSession;import org.xmlBlaster.engine.queuemsg.ReferenceEntry;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.xbformat.MsgInfo;import org.xmlBlaster.util.xbformat.XmlScriptParser;/** * ReplSlave * * Used Topics: * <ul> * <li><b>com.avitech-ag.repl.${replName}.data</b><br/> * This is the topic used to send the replication data to the slaves. * </li> * <li><b>com.avitech-ag.repl.${replName}.status</b><br/> * This is the topic used to send the replication data to the slaves. * </li> * <li></li> * </ul> * * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class ReplSlave implements I_ReplSlave, ReplSlaveMBean, ReplicationConstants { private static Logger log = Logger.getLogger(ReplSlave.class.getName()); private final static String CONN_STALLED = "stalled"; private final static String CONN_CONNECTED = "connected"; private final static String CONN_DISCONNECTED = "disconnected"; private String slaveSessionId; private String name; private String statusTopic; private String dataTopic; private Global global; boolean initialized; private long minReplKey; private long maxReplKey; private int status; private Object mbeanHandle; private String sqlResponse; private boolean forceSending; // temporary Hack to be removed TODO private I_Info persistentInfo; private String oldReplKeyPropertyName; private String dbWatcherSessionName; private ReplManagerPlugin manager; private String replPrefix; private String replPrefixGroup; private String cascadedReplSlave; private String cascadedReplPrefix; private long forcedCounter; // counter used when forceSending is set to 'true' private String ownVersion; private String srcVersion; private boolean doTransform; private String initialFilesLocation; private String lastMessage; private String lastDispatcherException = ""; private boolean dispatcherActive; private long queueEntries; private boolean connected; private String sessionName = ""; private long[] transactionSeq; private long messageSeq; private long transactionSeqVisible; /** These properties are used to transport the information from the check to the postCheck method. */ private int tmpStatus; private String lastMessageKey; private long maxChunkSize = 1024L*1024; // TODO make this configurable private String masterConn = CONN_DISCONNECTED; /** we don't want to sync the check method because the jmx will synchronize on the object too */ private Object initSync = new Object(); /** The queue associated to this slave. It is associated on first invocation of check */ private I_Queue queue; private boolean stalled; /** used for monitoring: to know how many entries are ptp (normally initial updates) */ private long ptpQueueEntries; private String initialDataTopic; /** The real amount of entries in the cb queue (not calculated) */ private long cbQueueEntries; private boolean countSingleMessages; public ReplSlave(Global global, ReplManagerPlugin manager, String slaveSessionId) throws XmlBlasterException { this.forcedCounter = 0L; this.global = global; this.manager = manager; this.slaveSessionId = slaveSessionId; // this.status = STATUS_UNUSED; // setStatus(STATUS_NORMAL); this.status = STATUS_UNCONFIGURED; this.lastMessage = ""; //final boolean doPersist = false; //final boolean dispatcherActive = false; this.lastMessageKey = this.slaveSessionId + ".lastMessage"; try { //setDispatcher(dispatcherActive, doPersist); this.persistentInfo = this.manager.getPersistentInfo(); this.lastMessage = this.persistentInfo.get(this.lastMessageKey, ""); } catch (Exception ex) { throw new XmlBlasterException(this.global, ErrorCode.RESOURCE, "ReplSlave constructor", "could not instantiate correctly", ex); } } public String getTopic() { return this.dataTopic; } public long getMinReplKey() { if (this.forceSending) return this.forcedCounter; return this.minReplKey; } public long getMaxReplKey() { if (this.forceSending) return this.forcedCounter; return this.maxReplKey; } public int getStatusAsInt() { return this.status; } public String getStatus() { switch (this.status) { case STATUS_INITIAL : return "INITIAL"; case STATUS_TRANSITION : return "TRANSITION"; case STATUS_INCONSISTENT : return "INCONSISTENT"; case STATUS_UNCONFIGURED : return "UNCONFIGURED"; default : return "NORMAL"; } } /** * The info comes as the client properties of the subscription Qos. Avoids double configuration. */ public void init(I_Info info) throws Exception { synchronized(this.initSync) { // we currently allow re-init since we can serve severeal dbWatchers for one DbWriter this.replPrefix = info.get("_replName", null); if (this.replPrefix == null) throw new Exception("The replication name '_replName' has not been defined"); this.replPrefixGroup = info.get(REPL_PREFIX_GROUP_KEY, this.replPrefix); this.name = "replSlave" + this.replPrefix + slaveSessionId; this.dataTopic = info.get(DbWatcherConstants.MOM_TOPIC_NAME, "replication." + this.replPrefix); // only send status messages if it has been configured that way this.statusTopic = info.get(DbWatcherConstants.MOM_STATUS_TOPIC_NAME, null); // TODO Remove this when a better solution is found : several ReplSlaves for same Writer if data comes from several DbWatchers. boolean forceSending = info.getBoolean(REPLICATION_FORCE_SENDING, false); if (forceSending) this.forceSending = true; String instanceName = this.manager.getInstanceName() + ContextNode.SEP + this.slaveSessionId; ContextNode contextNode = new ContextNode(ContextNode.CONTRIB_MARKER_TAG, instanceName, this.global.getContextNode()); this.mbeanHandle = this.global.registerMBean(contextNode, this); this.dbWatcherSessionName = info.get(this.slaveSessionId + DBWATCHER_SESSION_NAME, null); this.cascadedReplPrefix = this.persistentInfo.get(this.slaveSessionId + CASCADED_REPL_PREFIX, null); this.cascadedReplSlave = this.persistentInfo.get(this.slaveSessionId + CASCADED_REPL_SLAVE, null); log.info(this.name + ": associated DbWatcher='" + this.dbWatcherSessionName + "' cascaded replication prefix='" + this.cascadedReplPrefix + "' and cascaded repl. slave='" + this.cascadedReplSlave + "'"); int tmpStatus = this.persistentInfo.getInt(this.slaveSessionId + ".status", -1); if (tmpStatus > -1) setStatus(tmpStatus); final boolean doPersist = false; setDispatcher(this.persistentInfo.getBoolean(this.slaveSessionId + ".dispatcher", false), doPersist); this.oldReplKeyPropertyName = this.slaveSessionId + ".oldReplData"; initTransactionSequenceIfNeeded(null); this.srcVersion = info.get(REPLICATION_VERSION, "0.0"); this.ownVersion = info.get(REPL_VERSION, null); if (this.ownVersion != null) { this.persistentInfo.put(this.slaveSessionId + "." + ReplicationConstants.REPL_VERSION, this.ownVersion); } else { this.ownVersion = this.persistentInfo.get(this.slaveSessionId + "." + ReplicationConstants.REPL_VERSION, this.srcVersion); } if (this.srcVersion != null && this.ownVersion != null && !this.srcVersion.equalsIgnoreCase(this.ownVersion)) this.doTransform = true; initialFilesLocation = info.get(ReplicationConstants.INITIAL_FILES_LOCATION, null); initialDataTopic = info.get("replication.initialDataTopic", "replication.initialData"); countSingleMessages = info.getBoolean("replication.countSingleMsg", false); this.initialized = true; } } /** * This method is needed since in some cases writing operations on the counters can occur before the init * method has been invoked. * @param warnText if null no warning will be written, otherwise the specified text will be output as a warning. * */ private void initTransactionSequenceIfNeeded(String warnText) { if (this.transactionSeq != null) return; if (warnText != null) { log.warning(warnText); if (log.isLoggable(Level.FINE)) log.fine(Global.getStackTraceAsString(null)); } synchronized(this.initSync) { this.transactionSeq = new long[PriorityEnum.MAX_PRIORITY.getInt()+1]; // 10 priorities [0..9] long[] replData = ReplManagerPlugin.readOldReplData(this.persistentInfo, this.oldReplKeyPropertyName); if (replData.length < 5) { // Old Style: REMOVE THIS LATER !!!! this.maxReplKey = replData[0]; this.minReplKey = replData[3]; for (int i=0; i < this.transactionSeq.length; i++) this.transactionSeq[i] = replData[1]; this.transactionSeqVisible = this.transactionSeq[5]; this.messageSeq = replData[2]; this.ptpQueueEntries = 0L; } else { // NEW STYLE this.maxReplKey = replData[0]; this.minReplKey = replData[1]; this.messageSeq = replData[2]; this.ptpQueueEntries = replData[3]; for (int i=0; i < this.transactionSeq.length; i++) this.transactionSeq[i] = replData[i+4]; this.transactionSeqVisible = this.transactionSeq[5]; } } } private final void setStatus(int status) { boolean doStore = status != this.status; this.status = status; if (this.persistentInfo != null && doStore) { // can also be called before init is called. if (this.status != STATUS_UNCONFIGURED) this.persistentInfo.put(this.slaveSessionId + ".status", "" + status); } // this is a temporary solution for the monitoring String client = "client/"; int pos = this.slaveSessionId.indexOf(client); if (pos < 0) log.warning("session name '" + this.slaveSessionId + "' does not start with '" + client + "'"); else { String key = "__" + this.slaveSessionId.substring(pos + client.length()); org.xmlBlaster.engine.ServerScope engineGlob = this.getEngineGlobal(this.global); if (engineGlob == null) log.warning("Can not write status since no engine global found"); else { log.info("setting property '" + key + "' to '" + getStatus()); engineGlob.getProperty().getProperties().setProperty(key, getStatus()); } } } /** * Note that the transKey shall not be the transactionSeq instance otherwise it will never detect a change * @param replKey * @param transKey * @param msgKey * @param minReplKey */ private final void setMaxReplKey(long replKey, long[] transKey, long msgKey, long minReplKey, long ptpQueueEntries) { if (replKey > this.maxReplKey) this.maxReplKey = replKey; if (minReplKey > this.minReplKey) this.minReplKey = minReplKey; if (msgKey > this.messageSeq) this.messageSeq = msgKey; this.ptpQueueEntries = ptpQueueEntries; long[] data = new long[this.transactionSeq.length+4]; data[0] = replKey; data[1] = minReplKey; data[2] = msgKey; data[3] = ptpQueueEntries; for (int i=0; i < transKey.length; i++) data[i+4] = transKey[i]; ReplManagerPlugin.storeReplData(this.persistentInfo, this.oldReplKeyPropertyName, data); String client = "client/"; if (this.slaveSessionId == null) return; int pos = this.slaveSessionId.indexOf(client); if (pos < 0) log.warning("session name '" + this.slaveSessionId + "' does not start with '" + client + "'"); else { String key = "__" + this.slaveSessionId.substring(pos + client.length()) + "_MaxReplKey"; org.xmlBlaster.engine.ServerScope engineGlob = this.getEngineGlobal(this.global); if (engineGlob == null) log.warning("Can not write status since no engine global found"); else { log.finest("setting property '" + key + "' to '" + getMaxReplKey()); engineGlob.getProperty().getProperties().setProperty(key, String.valueOf(getMaxReplKey()));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -