⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replmanagerplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
         this.replications.clear();         this.replSlaveMap.clear();         this.topicToPrefixMap.clear();         this.counterMap.clear();                  synchronized(this.sqlStatementMap) {            String[] keys = (String[])this.sqlStatementMap.keySet().toArray(new String[this.sqlStatementMap.size()]);            for (int i=0; i < keys.length; i++)               unregisterSqlStatement(keys[i]);         }                  getEngineGlobal(this.global).getPluginRegistry().unRegister(getType() + "," + getVersion());         this.pool.shutdown();      }      catch (Throwable e) {         log.warning("Ignoring shutdown problem: " + e.toString());      }      this.shutdown = true;      log.info("Stopped DbWatcher plugin '" + getType() + "'");   }      /**    * Gets the properties associated to this replication. Note that the info is this of the last    * registration. This method can return null if no object is found or if the replicationPrefix    * was null.    *     * @param replicationPrefix    * @return    */   public I_Info getReplicationInfo(String replicationPrefix) {      if (replicationPrefix == null)         return null;      synchronized (this.replications) {         return (I_Info)this.replications.get(replicationPrefix);      }   }      /**    * Used to register a dbWatcher. This is a request coming directly from the    * DbWatcher which registeres himself to this plugin.    * Note that if you are using the same id for the replication on several DbWatcher    * (several writers) only the first dbWatcher will pass the configuration. You are    * responsible of ensuring that the relevant configuration parameters are the same    * for all such DbWatcher instances.    *     * @param senderSession The session requesting this registration. This is needed    * to reply to the right requestor.    *     * @param replId    * @param info These are the Configuration of the DbWatcher, for example Table Names and so forth.    */   public synchronized void register(String senderSession, String replicationPrefix, I_Info info) {      I_Info oldInfo = (I_Info)this.replications.get(replicationPrefix);      info.put(SENDER_SESSION, senderSession);      String topicName = info.get("mom.topicName", null);      if (topicName == null)         log.severe("Topic name not found for '" + replicationPrefix + "' can not map the topic to the replication prefix");      else {         this.topicToPrefixMap.put(topicName, replicationPrefix);         String name = "replication." + replicationPrefix + ".replData";         long[] replData = readOldReplData(this.persistentInfo, name);         this.counterMap.put(replicationPrefix, new Counter(replData));      }            if (oldInfo != null) {         log.info("register '" + replicationPrefix + "' by senderSession='" + senderSession + "'");         String oldSenderSession = oldInfo.get(SENDER_SESSION, senderSession);         if (oldSenderSession.equals(senderSession)) {            log.info("register '" + replicationPrefix + "' by senderSession='" + senderSession + "' will overwrite old registration done previously");            this.replications.put(replicationPrefix, info);         }         else {            log.info("register '" + replicationPrefix + "' by senderSession='" + senderSession + "' was not done since there is a registration done by '" + oldSenderSession + "'. Will ignore the new one.");         }      }      else         this.replications.put(replicationPrefix, info);            String initialDataTopic = info.get("replication.initialDataTopic", "replication.initialData");      if (initialDataTopic != null)         this.initialDataTopicSet.add(initialDataTopic);      else         log.severe("The initialDataTopic for replication '" + replicationPrefix + "' was null"); // should never happen      this.cachedListOfReplications = null; // clear the cache   }      public synchronized void unregister(String senderSession, String replicationPrefix) {      I_Info oldInfo = (I_Info)this.replications.get(replicationPrefix);      if (oldInfo == null)         log.info("unregister '" + replicationPrefix + "' by senderSession='" + senderSession + "' is ignored since there is no such registration done");      else {         log.info("unregister '" + replicationPrefix + "' by senderSession='" + senderSession + "'");         /*         if (log.isLoggable(Level.FINE)) {            log.fine("unregister '" + replId + "' by senderSession='" + senderSession + "' the stack trace is:");            Thread.dumpStack();         }         */         String oldSenderSession = oldInfo.get(SENDER_SESSION, senderSession);         if (oldSenderSession.equals(senderSession)) {            this.replications.remove(replicationPrefix);         }         else {            log.warning("unregister '" + replicationPrefix + "' by senderSession='" + senderSession + "' was not done since there is a registration done by '" + oldSenderSession + "'. Please do it with the correct Session");         }         String topicName = oldInfo.get("mom.topicName", null);         if (topicName != null) {            this.topicToPrefixMap.remove(topicName);            this.counterMap.remove(replicationPrefix);         }      }      String initialDataTopic = oldInfo.get("replication.initialDataTopic", "replication.initialData");      if (initialDataTopic != null)         this.initialDataTopicSet.remove(initialDataTopic);      else         log.severe("The initialDataTopic for replication '" + replicationPrefix + "' was null"); // should never happen      this.cachedListOfReplications = null; // clear the cache   }      public static byte[] getContent(InputStream is) throws IOException, ClassNotFoundException {      int ret = 0;      byte[] buf = new byte[1024];      ByteArrayOutputStream baos = new ByteArrayOutputStream();      try {         while ( (ret=is.read(buf)) > -1) {            baos.write(buf, 0, ret);         }      }      catch (IOException ex) {         ex.printStackTrace();         return new byte[0];      }      return baos.toByteArray();   }      /**    * It receives events from all ReplicationConverter instances which want to register themselves for    * administration of initial updates.    *     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)    */   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {      try {         InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), updateQos.getClientProperties());         content = getContent(is);         SessionName senderSession = updateQos.getSender();         String request = updateQos.getClientProperty("_command", "");         log.info("The master Replicator with session '" + senderSession.getRelativeName() + "' is sending '" + request + "'");         if ("broadcastSql".equalsIgnoreCase(request)) {            try {               final boolean highPrio = true;               String requestId = updateQos.getClientProperty("requestId", (String)null);               if (requestId == null)                  throw new Exception("The requestId has not been defined");               String repl =  updateQos.getClientProperty(REPL_PREFIX_KEY, REPL_PREFIX_DEFAULT);               String sql =  new String(content);               sendBroadcastRequest(repl, sql, highPrio, requestId);               return "OK";            }            catch (Throwable ex) {               ex.printStackTrace();               log.severe("An exception occured during an sql broadcast message:" + ex.getMessage() + "' will continue anyway to avoid stopping dispatcher");               return "OK"; // we don't want to stop the dispatcher            }         }         else if ("removeBroadcast".equalsIgnoreCase(request)) {            try {               removeSqlStatement(new String(content));               return "OK";            }            catch (Throwable ex) {               ex.printStackTrace();               log.severe("An exception occured when removing an sql broadcast:" + ex.getMessage() + "' will continue anyway to avoid stopping dispatcher");               return "OK"; // we don't want to stop the dispatcher            }         }         // 1. This is a response from an sql statement which has been previously sent to the slaves.         else if (this.sqlTopic != null && updateKey.getOid().equals(this.sqlTopic)) {            ClientProperty prop = (ClientProperty)updateQos.getClientProperties().get(STATEMENT_ID_ATTR);            if (prop == null) {               log.severe("The statement id is not specified, can not process it");               return "OK"; // we don't want to stop the dispatcher            }            String reqId = prop.getStringValue();            SqlStatement sqlStatement = (SqlStatement)this.sqlStatementMap.get(reqId);            if (sqlStatement == null) {               log.severe("The statement with id '" + reqId + "' has not been found");               return "OK"; // we don't want to stop the dispatcher            }            prop = (ClientProperty)updateQos.getClientProperties().get(EXCEPTION_ATTR);            String response = null;            boolean isException = false;            if (prop != null) {               response = prop.getStringValue();               isException = true;            }            prop = (ClientProperty)updateQos.getClientProperties().get(MASTER_ATTR);            if (prop != null) { // then it is the response from the master               String replPrefix = prop.getStringValue();               if (response == null)                  response = new String(content);               sqlStatement.setResponse(replPrefix, response, isException);            }            else {               if (response == null)                  response = new String(content);               sqlStatement.setResponse(senderSession.getRelativeName(), response, isException);            }         }         // 2. This is the response coming from a DbWatcher on a request for initial update which one of the ReplSlaves has previously requested.         else if ("INITIAL_DATA_RESPONSE".equals(request)) {            long minReplKey = updateQos.getClientProperty("_minReplKey", 0L);            long maxReplKey = updateQos.getClientProperty("_maxReplKey", 0L);            try {               String completeSlaveName = updateQos.getClientProperty("_slaveName", (String)null);               if (completeSlaveName == null)                  log.severe("on initial data response the slave name was not specified. Can not perform operation");               else {                  String[] slaveNames = StringPairTokenizer.parseLine(completeSlaveName, ',');                  for (int i=0; i < slaveNames.length; i++) {                     I_ReplSlave slave = null;                     synchronized (this.replSlaveMap) {                        slave = (I_ReplSlave)this.replSlaveMap.get(slaveNames[i]);                     }                     if (slave == null)                        log.severe("on initial data response the slave name '" + slaveNames[i] + "' was not registered (could have already logged out)");                     else                        slave.reactivateDestination(minReplKey, maxReplKey);                  }               }            }            catch (Exception ex) {               log.warning("reactivateDestination encountered an exception '" + ex.getMessage());            }         }         return "OK";      }      catch (Throwable ex) {         ex.printStackTrace();         log.severe("Throwable occured in the update method of ReplManagerPlugin");         // throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "XmlBlasterPublisher.update", "user exception", ex);         return "OK"; // we don't want to stop the dispatcher      }   }         // enforced by I_MsgDispatchInterceptor       /**    * This method is invoked always so see sessionAdded javadoc.    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#addDispatchManager(org.xmlBlaster.util.dispatch.DispatchManager)    */   public void addDispatchManager(DispatchManager dispatchManager) {      /*      try {         SessionName sessionName = dispatchManager.getSessionName();         if (sessionName == null) {            log.severe("The sessionName is null: "   + dispatchManager.toXml(""));            Thread.dumpStack();         }         else {            log.info("Adding dispatch Manager for '" + sessionName + "'");            String relativeSessionName = sessionName.getRelativeName();            I_ReplSlave slave = new ReplSlave(this.global, this.pool, this, relativeSessionName);            synchronized (this.replSlaveMap) {               this.replSlaveMap.put(relativeSessionName, slave);            }         }      }      catch (XmlBlasterException ex) {         ex.printStackTrace();      }      */   }   public String getInstanceName() {      return this.instanceName;   }      /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#doActivate(org.xmlBlaster.util.dispatch.DispatchManager)    */   public boolean doActivate(DispatchManager dispatchManager) {      if (dispatchManager.getDispatchConnectionsHandler().isPolling()) {         log.fine("Can't send message as connection is lost and we are polling");         return false;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -