⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replslave.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*------------------------------------------------------------------------------Name:      ReplSlave.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication;import java.io.ByteArrayInputStream;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.contrib.ClientPropertiesInfo;import org.xmlBlaster.contrib.GlobalInfo;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.MomEventEngine;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.replication.impl.ReplManagerPlugin;import org.xmlBlaster.util.Global;import org.xmlBlaster.engine.admin.I_AdminSession;import org.xmlBlaster.engine.queuemsg.ReferenceEntry;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.xbformat.MsgInfo;import org.xmlBlaster.util.xbformat.XmlScriptParser;/** * ReplSlave *  * Used Topics: * <ul> *    <li><b>com.avitech-ag.repl.${replName}.data</b><br/> *        This is the topic used to send the replication data to the slaves. *    </li> *    <li><b>com.avitech-ag.repl.${replName}.status</b><br/> *        This is the topic used to send the replication data to the slaves. *    </li> *    <li></li> * </ul>    *   *  * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class ReplSlave implements I_ReplSlave, ReplSlaveMBean, ReplicationConstants {   private static Logger log = Logger.getLogger(ReplSlave.class.getName());   private final static String CONN_STALLED = "stalled";   private final static String CONN_CONNECTED = "connected";   private final static String CONN_DISCONNECTED = "disconnected";      private String slaveSessionId;   private String name;   private String statusTopic;   private String dataTopic;   private Global global;   boolean initialized;    private long minReplKey;   private long maxReplKey;   private int status;   private Object mbeanHandle;   private String sqlResponse;   private boolean forceSending; // temporary Hack to be removed TODO   private I_Info persistentInfo;   private String oldReplKeyPropertyName;   private String dbWatcherSessionName;   private ReplManagerPlugin manager;   private String replPrefix;   private String replPrefixGroup;   private String cascadedReplSlave;   private String cascadedReplPrefix;   private long forcedCounter;  // counter used when forceSending is set to 'true'   private String ownVersion;   private String srcVersion;   private boolean doTransform;   private String initialFilesLocation;   private String lastMessage;   private String lastDispatcherException = "";   private boolean dispatcherActive;   private long queueEntries;   private boolean connected;   private String sessionName = "";   private long[] transactionSeq;   private long messageSeq;   private long transactionSeqVisible;   /** These properties are used to transport the information from the check to the postCheck method. */   private int tmpStatus;   private String lastMessageKey;   private long maxChunkSize = 1024L*1024; // TODO make this configurable   private String masterConn = CONN_DISCONNECTED;      /** we don't want to sync the check method because the jmx will synchronize on the object too */   private Object initSync = new Object();      /** The queue associated to this slave. It is associated on first invocation of check */   private I_Queue queue;   private boolean stalled;   /** used for monitoring: to know how many entries are ptp (normally initial updates) */   private long ptpQueueEntries;   private String initialDataTopic;   /** The real amount of entries in the cb queue (not calculated) */   private long cbQueueEntries;   private boolean countSingleMessages;   public ReplSlave(Global global, ReplManagerPlugin manager, String slaveSessionId) throws XmlBlasterException {      this.forcedCounter = 0L;      this.global = global;      this.manager = manager;      this.slaveSessionId = slaveSessionId;      // this.status = STATUS_UNUSED;      // setStatus(STATUS_NORMAL);      this.status = STATUS_UNCONFIGURED;      this.lastMessage = "";      //final boolean doPersist = false;      //final boolean dispatcherActive = false;      this.lastMessageKey = this.slaveSessionId + ".lastMessage";      try {         //setDispatcher(dispatcherActive, doPersist);         this.persistentInfo = this.manager.getPersistentInfo();         this.lastMessage = this.persistentInfo.get(this.lastMessageKey, "");      }      catch (Exception ex) {         throw new XmlBlasterException(this.global, ErrorCode.RESOURCE, "ReplSlave constructor", "could not instantiate correctly", ex);      }   }   public String getTopic() {      return this.dataTopic;   }      public long getMinReplKey() {      if (this.forceSending)         return this.forcedCounter;      return this.minReplKey;   }      public long getMaxReplKey() {      if (this.forceSending)         return this.forcedCounter;      return this.maxReplKey;   }   public int getStatusAsInt() {      return this.status;   }      public String getStatus() {      switch (this.status) {         case STATUS_INITIAL : return "INITIAL";         case STATUS_TRANSITION : return "TRANSITION";         case STATUS_INCONSISTENT : return "INCONSISTENT";         case STATUS_UNCONFIGURED : return "UNCONFIGURED";         default : return "NORMAL";      }   }   /**    * The info comes as the client properties of the subscription Qos. Avoids double configuration.    */   public void init(I_Info info) throws Exception {      synchronized(this.initSync) {         // we currently allow re-init since we can serve severeal dbWatchers for one DbWriter          this.replPrefix = info.get("_replName", null);         if (this.replPrefix == null)             throw new Exception("The replication name '_replName' has not been defined");         this.replPrefixGroup = info.get(REPL_PREFIX_GROUP_KEY, this.replPrefix);         this.name = "replSlave" + this.replPrefix + slaveSessionId;         this.dataTopic = info.get(DbWatcherConstants.MOM_TOPIC_NAME, "replication." + this.replPrefix);         // only send status messages if it has been configured that way         this.statusTopic = info.get(DbWatcherConstants.MOM_STATUS_TOPIC_NAME, null);                  // TODO Remove this when a better solution is found : several ReplSlaves for same Writer if data comes from several DbWatchers.         boolean forceSending = info.getBoolean(REPLICATION_FORCE_SENDING, false);         if (forceSending)            this.forceSending = true;          String instanceName = this.manager.getInstanceName() + ContextNode.SEP + this.slaveSessionId;         ContextNode contextNode = new ContextNode(ContextNode.CONTRIB_MARKER_TAG, instanceName,               this.global.getContextNode());         this.mbeanHandle = this.global.registerMBean(contextNode, this);                  this.dbWatcherSessionName = info.get(this.slaveSessionId + DBWATCHER_SESSION_NAME, null);         this.cascadedReplPrefix = this.persistentInfo.get(this.slaveSessionId + CASCADED_REPL_PREFIX, null);         this.cascadedReplSlave = this.persistentInfo.get(this.slaveSessionId + CASCADED_REPL_SLAVE, null);         log.info(this.name + ": associated DbWatcher='" + this.dbWatcherSessionName + "' cascaded replication prefix='" + this.cascadedReplPrefix + "' and cascaded repl. slave='" + this.cascadedReplSlave + "'");         int tmpStatus = this.persistentInfo.getInt(this.slaveSessionId + ".status", -1);         if (tmpStatus > -1)            setStatus(tmpStatus);                  final boolean doPersist = false;         setDispatcher(this.persistentInfo.getBoolean(this.slaveSessionId + ".dispatcher", false), doPersist);         this.oldReplKeyPropertyName = this.slaveSessionId + ".oldReplData";         initTransactionSequenceIfNeeded(null);                  this.srcVersion = info.get(REPLICATION_VERSION, "0.0");         this.ownVersion = info.get(REPL_VERSION, null);                  if (this.ownVersion != null) {            this.persistentInfo.put(this.slaveSessionId + "." + ReplicationConstants.REPL_VERSION, this.ownVersion);         }         else {            this.ownVersion = this.persistentInfo.get(this.slaveSessionId + "." + ReplicationConstants.REPL_VERSION, this.srcVersion);         }                  if (this.srcVersion != null && this.ownVersion != null && !this.srcVersion.equalsIgnoreCase(this.ownVersion))            this.doTransform = true;         initialFilesLocation = info.get(ReplicationConstants.INITIAL_FILES_LOCATION, null);         initialDataTopic = info.get("replication.initialDataTopic", "replication.initialData");         countSingleMessages = info.getBoolean("replication.countSingleMsg", false);         this.initialized = true;      }   }   /**    * This method is needed since in some cases writing operations on the counters can occur before the init    * method has been invoked.    * @param warnText if null no warning will be written, otherwise the specified text will be output as a warning.    *    */   private void initTransactionSequenceIfNeeded(String warnText) {      if (this.transactionSeq != null)         return;      if (warnText != null) {         log.warning(warnText);         if (log.isLoggable(Level.FINE))            log.fine(Global.getStackTraceAsString(null));      }      synchronized(this.initSync) {         this.transactionSeq = new long[PriorityEnum.MAX_PRIORITY.getInt()+1]; // 10 priorities [0..9]         long[] replData = ReplManagerPlugin.readOldReplData(this.persistentInfo, this.oldReplKeyPropertyName);         if (replData.length < 5) { // Old Style: REMOVE THIS LATER !!!!            this.maxReplKey = replData[0];            this.minReplKey = replData[3];            for (int i=0; i < this.transactionSeq.length; i++)               this.transactionSeq[i] = replData[1];            this.transactionSeqVisible = this.transactionSeq[5];            this.messageSeq = replData[2];            this.ptpQueueEntries = 0L;         }         else { // NEW STYLE            this.maxReplKey = replData[0];            this.minReplKey = replData[1];            this.messageSeq = replData[2];            this.ptpQueueEntries = replData[3];            for (int i=0; i < this.transactionSeq.length; i++)               this.transactionSeq[i] = replData[i+4];            this.transactionSeqVisible = this.transactionSeq[5];         }      }   }      private final void setStatus(int status) {      boolean doStore = status != this.status;      this.status = status;      if (this.persistentInfo != null && doStore) { // can also be called before init is called.         if (this.status != STATUS_UNCONFIGURED)            this.persistentInfo.put(this.slaveSessionId + ".status", "" + status);      }      // this is a temporary solution for the monitoring      String client = "client/";      int pos = this.slaveSessionId.indexOf(client);      if (pos < 0)         log.warning("session name '" + this.slaveSessionId + "' does not start with '" + client + "'");      else {         String key = "__" + this.slaveSessionId.substring(pos + client.length());         org.xmlBlaster.engine.ServerScope engineGlob = this.getEngineGlobal(this.global);         if (engineGlob == null)            log.warning("Can not write status since no engine global found");         else {            log.info("setting property '" + key + "' to '" + getStatus());            engineGlob.getProperty().getProperties().setProperty(key, getStatus());         }      }   }      /**    * Note that the transKey shall not be the transactionSeq instance otherwise it will never detect a change    * @param replKey    * @param transKey    * @param msgKey    * @param minReplKey    */   private final void setMaxReplKey(long replKey, long[] transKey, long msgKey, long minReplKey, long ptpQueueEntries) {      if (replKey > this.maxReplKey)         this.maxReplKey = replKey;      if (minReplKey > this.minReplKey)         this.minReplKey = minReplKey;      if (msgKey > this.messageSeq)         this.messageSeq = msgKey;      this.ptpQueueEntries = ptpQueueEntries;      long[] data = new long[this.transactionSeq.length+4];      data[0] = replKey;      data[1] = minReplKey;      data[2] = msgKey;      data[3] = ptpQueueEntries;      for (int i=0; i < transKey.length; i++)         data[i+4] = transKey[i];      ReplManagerPlugin.storeReplData(this.persistentInfo, this.oldReplKeyPropertyName, data);      String client = "client/";      if (this.slaveSessionId == null)         return;      int pos = this.slaveSessionId.indexOf(client);      if (pos < 0)         log.warning("session name '" + this.slaveSessionId + "' does not start with '" + client + "'");      else {         String key = "__" + this.slaveSessionId.substring(pos + client.length()) + "_MaxReplKey";         org.xmlBlaster.engine.ServerScope engineGlob = this.getEngineGlobal(this.global);         if (engineGlob == null)            log.warning("Can not write status since no engine global found");         else {            log.finest("setting property '" + key + "' to '" + getMaxReplKey());            engineGlob.getProperty().getProperties().setProperty(key, String.valueOf(getMaxReplKey()));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -