⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replslave.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
               if (subDirName != null) {                  if (initialFilesLocation != null) {                     File base = new File(initialFilesLocation.getStringValue().trim());                     File complete = new File(base, subDirName.getStringValue().trim());                     dirName = complete.getAbsolutePath();                  }               }               changeLastMessage("Manual Data transfer: WAITING (stored on '" + dirName + "')", false);               break; // we need to interrupt here: all subsequent entries will be processed later.            }            // check if the message has to be stored locally            ClientProperty endToRemote = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_DATA_END_TO_REMOTE);            if (initialFilesLocation != null && (endToRemote == null || !endToRemote.getBooleanValue()) && (endMsg == null || !endMsg.getBooleanValue())) {               storeChunkLocally(entry, initialFilesLocation, subDirName);               queue.removeRandom(entry);               // entries.remove(i);               continue;            }                        if (endMsg != null) {               log.info("Received msg marking the end of the initial for client '" + this.slaveSessionId + "' update: '" + this.name + "' going into NORMAL operations");               startCascadedAndChangeStatus();            }            byte[] content = msgUnit.getContent();            if (content != null)               totalSize += content.length;            if (totalSize <= this.maxChunkSize || i == 0)               remoteEntries.add(entry);            else               break;         }         entries = null; // we can free it here since not needed anymore                  if (this.status == STATUS_NORMAL || this.status == STATUS_INCONSISTENT || this.status == STATUS_UNCONFIGURED)            return remoteEntries;                  ArrayList ret = new ArrayList();         for (int i=0; i < remoteEntries.size(); i++) {            ReferenceEntry entry = (ReferenceEntry)remoteEntries.get(i);            MsgUnit msgUnit = entry.getMsgUnit();            long replKey = msgUnit.getQosData().getClientProperty(ReplicationConstants.REPL_KEY_ATTR, -1L);            /* this is done when acknowledge comes            if (replKey > -1L) {               setMaxReplKey(replKey, this.tmpTransSeq, this.tmpMsgSeq);            }            */            log.info("check: processing '" + replKey + "' for client '" + this.slaveSessionId + "' ");            if (replKey < 0L) { // this does not come from the normal replication, so these are other messages which we just deliver               ClientProperty endMsg = msgUnit.getQosData().getClientProperty(ReplicationConstants.END_OF_TRANSITION);               if (endMsg == null) {                  log.warning("the message unit with qos='" + msgUnit.getQosData().toXml() + "' and key '" + msgUnit.getKey() + "'  for client '" + this.slaveSessionId + "' has no 'replKey' Attribute defined.");                  ret.add(entry);                  continue;               }            }            log.info("repl entry '" + replKey + "' for range [" + this.minReplKey + "," + this.maxReplKey + "] for client '" + this.slaveSessionId + "' ");            if (replKey >= this.minReplKey || this.forceSending) {               log.info("repl adding the entry for client '" + this.slaveSessionId + "' ");               doTransform(msgUnit);               ret.add(entry);               /* TODO TEMPORARLY REMOVED FOR TESTING: also test no initial dump and manual transfer               if (replKey > this.maxReplKey || this.forceSending) {                  log.info("entry with replKey='" + replKey + "' is higher than maxReplKey)='" + this.maxReplKey + "' switching to normal operation again for client '" + this.slaveSessionId + "' ");                  startCascadedAndChangeStatus();               }               */            }            else { // such messages have been already from the initial update. (obsolete messages are removed)               log.info("removing entry with replKey='" + replKey + "' since older than minEntry='" + this.minReplKey + "' for client '" + this.slaveSessionId + "' ");               queue.removeRandom(entry);            }         }                  // check if there are more than one entry the keep-transaction-flag has to be set:         if (ret.size() > 1) {            for (int i=0; i < ret.size()-1; i++) {               ReferenceEntry entry = (ReferenceEntry)entries.get(i);               MsgUnit msgUnit = entry.getMsgUnit();               msgUnit.getQosData().addClientProperty(KEEP_TRANSACTION_OPEN, true);            }            log.info("Sending '" + ret.size() + "' entries in one single message");         }         return ret;      }   }   private final void startCascadedAndChangeStatus() throws Exception {      if (this.cascadedReplPrefix != null && this.cascadedReplSlave != null && this.cascadedReplPrefix.trim().length() > 0 && this.cascadedReplSlave.trim().length() > 0) {         log.info("initiating the cascaded replication with replication.prefix='" + this.cascadedReplPrefix + "' for slave='" + this.cascadedReplSlave + "'");         this.manager.initiateReplicationNonMBean(this.cascadedReplSlave, this.cascadedReplPrefix, null, null, null);      }      else {         log.info("will not cascade initiation of any further replication for '" + this.name + "' since no cascading defined");      }      setStatus(STATUS_NORMAL);   }         /**    * @return Returns the sqlResponse.    */   public String getSqlResponse() {      return this.sqlResponse;   }      /**    * @param sqlResponse The sqlResponse to set.    */   public void setSqlResponse(String sqlResponse) {      this.sqlResponse = sqlResponse;   }   /**    * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys()    */   public Set getUsedPropertyKeys() {      return new HashSet();   }   public boolean setDispatcher(boolean status) {      try {         setDispatcher(status, true);         return true;      }      catch (Exception ex) {         log.severe("Exception occured when trying to set the dispatcher to '" + status + "': " + ex.getMessage());         ex.printStackTrace();         return false;      }   }      public final boolean setDispatcher(boolean status, boolean doPersist) throws Exception {      I_AdminSession session = getSession();       session.setDispatcherActive(status);      if (doPersist)         this.persistentInfo.put(this.slaveSessionId + ".dispatcher", "" + status);      // to speed up refresh on monitor      this.dispatcherActive = session.getDispatcherActive();      return this.dispatcherActive;   }      /**    * @see org.xmlBlaster.contrib.replication.ReplSlaveMBean#doContinue()    */   public void doContinue(boolean doPersist) throws Exception {      setDispatcher(true, doPersist);   }   /**    * @see org.xmlBlaster.contrib.replication.ReplSlaveMBean#doPause()    */   public void doPause(boolean doPersist) throws Exception {      setDispatcher(false, doPersist);   }      public void handleException(Throwable ex) {      try {         final boolean add = true;         if (ex instanceof XmlBlasterException) {            XmlBlasterException xmlblEx = ((XmlBlasterException)ex);            log.warning(xmlblEx.toXml());            if (xmlblEx.getEmbeddedException() != null)               changeLastMessage(xmlblEx.getEmbeddedMessage(), add);            else               changeLastMessage(ex.getMessage(), add);         }         else            changeLastMessage(ex.getMessage(), add);         final boolean doPersist = true;         doPause(doPersist);      }      catch (Throwable e) {         log.severe("An exception occured when trying to pause the connection: " + e.getMessage());         ex.printStackTrace();      }   }      /**    * Toggles the dispatcher from active to inactive or vice versa.    * Returns the actual state.    * @see org.xmlBlaster.contrib.replication.ReplSlaveMBean#toggleActive()    * @return the actual state.    */   public boolean toggleActive() throws Exception {      synchronized(this.initSync) {         I_AdminSession session = getSession();         final boolean doPersist = true;         return setDispatcher(!session.getDispatcherActive(), doPersist);      }   }      /**    * TODO fix this since it potentially could delete request from other slaves since the DbWatcher is serving    * several slaves.    * Cancels an ongoing initialUpdate Request.    */   public void cancelInitialUpdate() throws Exception {      if (this.status == STATUS_NORMAL)         return;      if (!this.initialized)         throw new Exception("cancelInitialUpdate: '" + this.name + "' has not been initialized properly or is already shutdown, check your logs");      if (this.dbWatcherSessionName == null)         throw new Exception("The DbWatcher Session Id is null, can not cancel");      (new Thread() {         public void run() {            cancelUpdateAsyncPart();         }      }).start();   }   /**    * The cancelUpdate is invoked asynchronously to avoid log blocking of the monitor     * when the cancel operation is going on.    */   private void cancelUpdateAsyncPart() {      try {         I_AdminSession session = getSession();         long clearedMsg = session.clearCallbackQueue();         log.info("clearing of callback queue: '" + clearedMsg + "' where removed since a cancel request was done");         // sending the cancel op to the DbWatcher         log.info(this.name + " sends now a cancel request to the Master '" + this.dbWatcherSessionName + "'");         I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess();         // no oid for this ptp message          PublishKey pubKey = new PublishKey(this.global, REQUEST_CANCEL_UPDATE_TOPIC);         Destination destination = new Destination(new SessionName(this.global, this.dbWatcherSessionName));         destination.forceQueuing(true);         PublishQos pubQos = new PublishQos(this.global, destination);         pubQos.addClientProperty(ReplicationConstants.SLAVE_NAME, this.slaveSessionId);         pubQos.setPersistent(false);         MsgUnit msg = new MsgUnit(pubKey, ReplicationConstants.REPL_REQUEST_CANCEL_UPDATE.getBytes(), pubQos);         conn.publish(msg);         // TODO Check this since it could mess up the current status if one is exaclty finished now         //setStatus(STATUS_NORMAL);         setStatus(STATUS_INCONSISTENT);      }      catch (Exception ex) {         log.severe("An exception occured when trying to cancel the initial update for '" + this.replPrefix + "'");         ex.printStackTrace();      }   }      private long clearQueueSync() {      long ret = 0L;      initTransactionSequenceIfNeeded("clearQueueSync has been invoked before init");      try {         ret = getSession().clearCallbackQueue();         transactionSeq = (long[])manager.getCurrentTransactionCount(replPrefix).clone();         ptpQueueEntries = 0L;         setMaxReplKey(maxReplKey, transactionSeq, messageSeq, minReplKey, ptpQueueEntries);      }      catch (Exception ex) {         ex.printStackTrace();      }      return ret;   }      public void clearQueue() throws Exception {      setStatus(STATUS_INCONSISTENT);      log.warning("has been invoked");      (new Thread() {         public void run() {            clearQueueSync();         }      }).start();   }   public long removeQueueEntries(long entries) throws Exception {      setStatus(STATUS_INCONSISTENT);      log.warning("has been invoked with entries='" + entries + "'");      return getSession().removeFromCallbackQueue(entries);   }      public void kill() throws Exception {      getSession().killSession();   }   public String reInitiateReplication() throws Exception {      return this.manager.initiateReplication(this.slaveSessionId, this.replPrefix + "_Ver_" + this.ownVersion, this.cascadedReplSlave, this.cascadedReplPrefix, this.initialFilesLocation);   }      public String getReplPrefix() {      return this.replPrefix;   }      public String getReplPrefixGroup() {      return this.replPrefixGroup;   }      public String getVersion() {      return this.ownVersion;   }   /**    * Convenience method enforced by the MBean which returns true if the dispatcher of    * the slave session is active, false otherwise.    */   public boolean isActive() {      return this.dispatcherActive;   }      /**    * Convenience method enforced by the MBean which returns the number of entries in     * the queue.    */   public long getQueueEntries() {      if (countSingleMessages)         return this.cbQueueEntries;      else         return this.queueEntries;   }   /**    * Convenience method enforced by the MBean which returns true if the real slave is    * connected or false otherwise.    */   public boolean isConnected() {      return this.connected;   }   /**    * Convenience method enforced by the MBean which returns true if the connection to the    * real slave is stalled or false otherwise.    */   public boolean isStalled() {      return this.stalled;   }   public String getSessionName() {      return this.sessionName;   }   public String getLastMessage() {      return this.lastMessage;   }   public synchronized void checkStatus() {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -