⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replslave.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
         }      }   }   public boolean reInitiate(I_Info info) throws Exception {      final boolean onlyRegister = true;      return run(info, this.dbWatcherSessionName, this.cascadedReplPrefix, this.cascadedReplSlave, onlyRegister);   }      /**    *     * @param info    * @param dbWatcherSessionId    * @param cascadeReplPrefix    * @param cascadeSlaveSessionName    * @param onlyRegister if true it only registers for initial update but does not execute it yet.    * It will wait for a further (common) start message.    * @return    * @throws Exception    */   public boolean run(I_Info info, String dbWatcherSessionId, String cascadeReplPrefix, String cascadeSlaveSessionName, boolean onlyRegister) throws Exception {      if (this.status != STATUS_NORMAL && this.status != STATUS_INCONSISTENT && this.status != STATUS_UNCONFIGURED) {         log.warning("will not start initial update request since one already ongoing for '" + this.name + "'");         return false;      }      this.persistentInfo.put(this.slaveSessionId + CASCADED_REPL_PREFIX, cascadeReplPrefix);      this.persistentInfo.put(this.slaveSessionId + CASCADED_REPL_SLAVE, cascadeSlaveSessionName);            info.put(this.slaveSessionId + DBWATCHER_SESSION_NAME, dbWatcherSessionId);      init(info);      prepareForRequest(info);      requestInitialData(dbWatcherSessionId, onlyRegister);      return true;   }      /**    * This is the first step in the process of requesting the initial Data.    * <ul>    *   <li>It clears the callback queue of the real slave</li>    *   <li>It sends a message to the real slave to inform him that     *       a new initial update has been initiated. This is a PtP     *       message with a well defined topic, so administrators can     *       subscribe to it too.    *   </li>    *   <li>It then deactivates the callback dispatcher of the real slave</li>    *   <li>makes a persistent subscription on behalf of the real slave    *       by passing as a mime access filter an identifier for himself.    *   </li>    * </ul>    * @see org.xmlBlaster.contrib.replication.I_ReplSlave#prepareForRequest(I_Info)    */   public void prepareForRequest(I_Info individualInfo) throws Exception {      if (!this.initialized)         throw new Exception("prepareForRequest: '" + this.name + "' has not been initialized properly or is already shutdown, check your logs");      log.info("prepareForRequest");      long clearedMsg = clearQueueSync();      log.info("clearing of callback queue before initiating: '" + clearedMsg + "' where removed since obsolete");      if (this.statusTopic != null)         sendStatusInformation("dbInitStart");      final boolean doPersist = true;      doPause(doPersist); // stop the dispatcher            I_AdminSession session = getSession();      // first unsubscribe (in case it did already an initial update previously, this is needed to remove the subscription      // (and thereby its outdate subscription qos from persistence). On a back replication, i.e. where you have more than      // one sources you don't want to do this.      if (individualInfo.getBoolean("replication.forceNewSubscription", true)) {         try {            session.unSubscribe(this.dataTopic, "");         }         catch (Throwable ex) {         }      }            SubscribeQos subQos = new SubscribeQos(this.global);      subQos.setMultiSubscribe(false);      subQos.setWantInitialUpdate(false);      subQos.setPersistent(true);      // this fills the client properties with the contents of the individualInfo object.      new ClientPropertiesInfo(subQos.getData().getClientProperties(), individualInfo);      session.subscribe(this.dataTopic, subQos.toXml());      synchronized(this.initSync) {         setStatus(STATUS_INITIAL);      }   }      private void sendStatusInformation(String status) throws Exception {      log.info("send status information '" + status + "'");      I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess();      PublishKey pubKey = new PublishKey(this.global, this.statusTopic);      Destination destination = new Destination(new SessionName(this.global, this.slaveSessionId));      destination.forceQueuing(true);      PublishQos pubQos = new PublishQos(this.global, destination);      pubQos.setPersistent(true);      MsgUnit msg = new MsgUnit(pubKey, status.getBytes(), pubQos);      conn.publish(msg);   }      /**    * Sends a PtP message to the responsible for the initial update (which is the    * DbWatcher or an object running in the DbWatcher jvm) telling a new initial    * update has to be initiating.     *     * @see org.xmlBlaster.contrib.replication.I_ReplSlave#requestInitialData()    */   public void requestInitialData(String dbWatcherSessionId, boolean onlyRegister) throws Exception {      log.info(this.name + " sends now an initial update request to the Master '" + dbWatcherSessionId + "'");      I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess();      // no oid for this ptp message       PublishKey pubKey = new PublishKey(this.global, REQUEST_INITIAL_DATA_TOPIC);      Destination destination = new Destination(new SessionName(this.global, dbWatcherSessionId));      destination.forceQueuing(true);      PublishQos pubQos = new PublishQos(this.global, destination);      pubQos.addClientProperty(ReplicationConstants.SLAVE_NAME, this.slaveSessionId);      pubQos.addClientProperty(ReplicationConstants.REPL_VERSION, this.ownVersion);      if (this.initialFilesLocation != null)         pubQos.addClientProperty(ReplicationConstants.INITIAL_FILES_LOCATION, this.initialFilesLocation);      pubQos.setPersistent(true);      if (onlyRegister)         pubQos.addClientProperty(ReplicationConstants.INITIAL_UPDATE_ONLY_REGISTER, onlyRegister);      MsgUnit msg = new MsgUnit(pubKey, ReplicationConstants.REPL_REQUEST_UPDATE.getBytes(), pubQos);      conn.publish(msg);   }   private org.xmlBlaster.engine.ServerScope getEngineGlobal(Global glob) {      return (org.xmlBlaster.engine.ServerScope)glob.getObjectEntry(GlobalInfo.ORIGINAL_ENGINE_GLOBAL);   }      private I_AdminSession getSession() throws Exception {      return this.manager.getSession(this.slaveSessionId);   }      /**    * @see org.xmlBlaster.contrib.replication.I_ReplSlave#reactivateDestination()    */   public void reactivateDestination(long minReplKey, long maxReplKey) throws Exception {      synchronized(this.initSync) {         log.info("Initial Operation completed with replication key interval [" + minReplKey + "," + maxReplKey + "]");         if (!this.initialized)            throw new Exception("prepareForRequest: '" + this.name + "' has not been initialized properly or is already shutdown, check your logs");         if (STATUS_INCONSISTENT == this.status) {            log.warning("Will not change the status to transition since the initialUpdate has been cancelled");            return;         }                     this.minReplKey = minReplKey;         this.maxReplKey = maxReplKey;         setStatus(STATUS_TRANSITION);         final boolean doPersist = true;         doContinue(doPersist);      }   }   /**    * @see org.xmlBlaster.contrib.dbwriter.I_ContribPlugin#shutdown()    */   public void shutdown() {      synchronized (this.initSync) {         if (!this.initialized)            return;         this.global.unregisterMBean(this.mbeanHandle);         this.initialized = false;      }   }   private final void doTransform(MsgUnit msgUnit) throws Exception {      if (this.doTransform) {         // ClientProperty prop = msgUnit.getQosData().getClientProperty(ReplicationConstants.DUMP_ACTION);         // if (prop == null) {         if (msgUnit.getContentMime() != null && msgUnit.getContentMime().equals("text/xml")) {            byte[] content = msgUnit.getContent();            InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), msgUnit.getQosData().getClientProperties());            content = ReplManagerPlugin.getContent(is);            content = this.manager.transformVersion(this.replPrefix, this.ownVersion, this.slaveSessionId, content);            msgUnit.setContent(content);         }      }   }      /**    * Returns the name of the directory where the entries have been stored.    * @param entry The entry to add as a chunk.    * @param location The location where to add it.    * @param subDirProp    * @return    * @throws Exception    */   private String storeChunkLocally(ReferenceEntry entry, ClientProperty location, ClientProperty subDirProp) throws Exception {      if (entry == null)         throw new Exception("The entry to store is null, can not store");      MsgUnit msgUnit = entry.getMsgUnit();      if (msgUnit == null)         throw new Exception("The msgUnit to store is null, can not store");      if (location == null || location.getStringValue() == null || location.getStringValue().trim().length() < 1)         throw new Exception("The location is empty, can not store the message unit '" + msgUnit.getLogId() + "'");      // String fileId = "" + new Timestamp().getTimestamp();      // this way they are automatically sorted and in case of a repeated write it simply would be overwritten.      String fileId = entry.getPriority() + "-" + entry.getUniqueId();            String pathName = location.getStringValue().trim();      File dirWhereToStore = ReplManagerPlugin.checkExistance(pathName);            if (subDirProp == null)         throw new Exception("The property to define the file name (dataId) is not set, can not continue");      String subDirName = subDirProp.getStringValue();      if (subDirName == null || subDirName.trim().length() < 1)         throw new Exception("The subdirectory to be used to store the initial data is empty");      File subDir = new File(dirWhereToStore, subDirName);      String completeSubdirName = subDir.getAbsolutePath();      if (!subDir.exists()) {         if (!subDir.mkdir()) {            String txt = "could not make '" + completeSubdirName + "' to be a directory. Check your rights";            log.severe(txt);            throw new Exception(txt);         }      }            File file = new File(subDir, fileId);      if (file.exists())         log.warning("File '" + file.getAbsolutePath() + "' exists already. Will overwrite it");      FileOutputStream fos = new FileOutputStream(file);      MsgUnitRaw msgUnitRaw = new MsgUnitRaw(msgUnit.getKey(), msgUnit.getContent(), msgUnit.getQos());      MsgInfo msgInfo = new MsgInfo(this.global, MsgInfo.INVOKE_BYTE, MethodName.UPDATE_ONEWAY, this.slaveSessionId);      msgInfo.addMessage(msgUnitRaw);      XmlScriptParser parser = new XmlScriptParser();      parser.init(new Global(), null, null);      fos.write(parser.toLiteral(msgInfo).getBytes());      fos.close();      log.info("MsgUnit '" + msgUnit.getQosData().getRcvTimestamp().getTimestamp() + "' has been written to file '" + file.getAbsolutePath() + "'");      return completeSubdirName;   }      /**    *     * @param newMsg If newMsg is null, it cleans the message otherwise the behaviour depens on doAdd    * @param doAdd if true, the message is added to the current message, if false it is replaced.    */   private void changeLastMessage(String newMsg, boolean doAdd) {      log.fine("'" + newMsg + "' invoked with add='" + doAdd + "'");      if (newMsg == null) {         if (this.lastMessage != null && this.lastMessage.length() > 0) {            this.lastMessage = "";            this.persistentInfo.put(this.lastMessageKey, this.lastMessage);         }      }      else {         if (doAdd)            this.lastMessage += "\n" + newMsg.trim();         else            this.lastMessage = newMsg.trim();         this.persistentInfo.put(this.lastMessageKey, this.lastMessage);      }   }      /*   private void calculateCounters(MsgQueueEntry[] entries) throws XmlBlasterException {      if (entries.length > 0) {         for (int i=entries.length-1; i > -1; i--) {            ReferenceEntry entry = (ReferenceEntry)entries[i];            if (log.isLoggable(Level.FINEST)) {               String txt = new String(decompressQueueEntryContent(entry));               log.finest("Processing entry '" + txt + "' for client '"  + this.name + "'");            }            MsgUnit msgUnit = entry.getMsgUnit();            long tmpCounter = this.tmpTransSeq + msgUnit.getQosData().getClientProperty(ReplicationConstants.NUM_OF_TRANSACTIONS, 1L);            //long tmpCounter = msgUnit.getQosData().getClientProperty(ReplicationConstants.TRANSACTION_SEQ, 0L);            if (tmpCounter != 0L)               this.tmpTransSeq = tmpCounter;            this.tmpReplKey = msgUnit.getQosData().getClientProperty(ReplicationConstants.REPL_KEY_ATTR, -1L);            tmpCounter = msgUnit.getQosData().getClientProperty(ReplicationConstants.MESSAGE_SEQ, 0L);            if (tmpCounter != 0L)               this.tmpMsgSeq =  tmpCounter;            if (this.tmpReplKey > -1L) {               break; // the other messages will have lower numbers (if any) so we break for performance.            }         }            }   }   */      /**    *     */   public ArrayList check(ArrayList entries, I_Queue queue) throws Exception {      this.queue = queue;      synchronized (this.initSync) {         this.tmpStatus = -1;         this.forcedCounter++;         log.info("check invoked with status '" + getStatus() + "' for client '" + this.slaveSessionId + "' (invocation since start is '" + this.forcedCounter + "')");         if (!this.initialized) {            log.warning("check invoked without having been initialized. Will repeat operation until the real client connects");            Thread.sleep(250L); // to avoid too fast looping            return new ArrayList();         }         if (this.status == STATUS_INITIAL && !this.forceSending) { // should not happen since Dispatcher is set to false            log.warning("check invoked in INITIAL STATUS. Will stop the dispatcher");            final boolean doPersist = true;            doPause(doPersist);            return new ArrayList();         }         changeLastMessage(null, false); // clean last message         // if (entries != null && entries.size() > 1)         //    log.severe("the entries are '" + entries.size() + "' but we currently only can process one single entry at a time");                  // check if already processed ... and at the same time do the versioning transformation (if needed)         for (int i=entries.size()-1; i > -1; i--) {            ReferenceEntry entry = (ReferenceEntry)entries.get(i);            MsgUnit msgUnit = entry.getMsgUnit();            ClientProperty alreadyProcessed = msgUnit.getQosData().getClientProperty(ReplicationConstants.ALREADY_PROCESSED_ATTR);            if (alreadyProcessed != null) {               log.warning("Received entry for client '" + this.slaveSessionId + "' which was already processed. Will remove it");               queue.removeRandom(entry);               entries.remove(i);            }            else               doTransform(msgUnit);         }                  // check if one of the messages is the transition end tag, also check if the total size is exceeded         ArrayList remoteEntries = new ArrayList();         long totalSize = 0L;         for (int i=0; i < entries.size(); i++) {            ReferenceEntry entry = (ReferenceEntry)entries.get(i);            MsgUnit msgUnit = entry.getMsgUnit();            ClientProperty endMsg = msgUnit.getQosData().getClientProperty(ReplicationConstants.END_OF_TRANSITION);                        // check if the message is the end of the data (only sent in case the initial data has to be stored on             // file in which case the dispatcher shall return in its waiting state.            ClientProperty endOfData = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_DATA_END);            ClientProperty initialFilesLocation = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_FILES_LOCATION);            ClientProperty subDirName = msgUnit.getQosData().getClientProperty(ReplicationConstants.INITIAL_DATA_ID);            if (endOfData != null) {               final boolean doPersist = true;               doPause(doPersist);               queue.removeRandom(entry);               // entries.remove(i); // endOfData will be kept locally, not sent to slave               String dirName = "unknown";

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -