⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replmanagerplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      }      return true;   }   /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#handleNextMessages(org.xmlBlaster.util.dispatch.DispatchManager, java.util.ArrayList)    */   public ArrayList handleNextMessages(DispatchManager dispatchManager, ArrayList pushEntries) throws XmlBlasterException {      if (!this.initialized) {         synchronized(this) {            if (!this.initialized) {               log.warning("too early to get messages since not initialized yet");               try {                  Thread.sleep(500L);               }               catch (Throwable ex) {                  ex.printStackTrace();               }            }         }      }            if (pushEntries != null) {         log.warning("got " + pushEntries.size() + " entries in Dispatcher Sync mode (happens on communication Exceptions. Will ignore this");         // return pushEntries;         return null;      }      I_ReplSlave slave = null;      String relativeName = dispatchManager.getSessionName().getRelativeName();      int maxEntriesToRetrieve = this.maxNumOfEntries;      synchronized (this.replSlaveMap) {         slave = (I_ReplSlave)this.replSlaveMap.get(relativeName);         if (slave.getStatusAsInt() != I_ReplSlave.STATUS_NORMAL) {            log.info("Setting the number of entries to retreive to '1' since status is '" + slave.getStatus() + "' (otherwise it would be '" + this.maxNumOfEntries + "'");            maxEntriesToRetrieve = 1;         }      }      // take messages from queue (none blocking) ...      I_Queue cbQueue = dispatchManager.getQueue();      // ArrayList entryList = cbQueue.peekSamePriority(-1, this.maxSize);      ArrayList entryList = cbQueue.peekSamePriority(maxEntriesToRetrieve, this.maxSize);      log.info("handleNextMessages invoked with '" + entryList.size() + "' entries (max was '" + maxEntriesToRetrieve + "'");      // filter expired entries etc. ...      // you should always call this method after taking messages from queue      entryList = dispatchManager.prepareMsgsFromQueue(entryList);      log.info("handleNextMessages after cleaning up with '" + entryList.size() + "' entries");      if (slave == null) {         log.warning("could not find a slave for replication client '" + relativeName + "'");         return entryList;      }      try {         return slave.check(entryList, cbQueue);      }      catch (Exception ex) {         if (slave != null)            slave.handleException(ex);         if (ex instanceof XmlBlasterException)            throw (XmlBlasterException)ex;         throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "exception occured when filtering replication messages", "", ex);      }      catch (Throwable ex) {         if (slave != null)            slave.handleException(ex);         throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "throwable occured when filtering replication messages. " + Global.getStackTraceAsString(ex), "", ex);      }   }   /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#initialize(org.xmlBlaster.util.Global, java.lang.String)    */   public void initialize(Global glob, String typeVersion) throws XmlBlasterException {   }   /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#isShutdown()    */   public synchronized boolean isShutdown() {      return this.shutdown;   }   /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#shutdown(org.xmlBlaster.util.dispatch.DispatchManager)    */   public synchronized void shutdown(DispatchManager dispatchManager) throws XmlBlasterException {      I_ReplSlave slave = null;      String name = dispatchManager.getSessionName().getRelativeName();      synchronized (this.replSlaveMap) {         slave = (I_ReplSlave)this.replSlaveMap.remove(name);      }      if (slave != null) {         try {            // slave.shutdown();         }         catch (Exception ex) {            ex.printStackTrace();            log.severe("Exception occured when shutting down slave '" + name + "'");         }      }   }   /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#toXml(java.lang.String)    */   public String toXml(String extraOffset) {      return "";   }   /**    * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#usage()    */   public String usage() {      return "";   }   /**    * @see org.xmlBlaster.util.dispatch.I_ConnectionStatusListener#toAlive(org.xmlBlaster.util.dispatch.DispatchManager, org.xmlBlaster.util.dispatch.ConnectionStateEnum)    */   public void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) {   }   /**    * @see org.xmlBlaster.util.dispatch.I_ConnectionStatusListener#toDead(org.xmlBlaster.util.dispatch.DispatchManager, org.xmlBlaster.util.dispatch.ConnectionStateEnum, java.lang.String)    */   public void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) {   }   /**    * @see org.xmlBlaster.util.dispatch.I_ConnectionStatusListener#toPolling(org.xmlBlaster.util.dispatch.DispatchManager, org.xmlBlaster.util.dispatch.ConnectionStateEnum)    */   public void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) {   }   private synchronized void registerSqlStatement(String replPrefix, String reqId, String statement) throws Exception {      log.info("registering statement '" + replPrefix + "-" + reqId + "' for statement '" + statement + "'");      ArrayList slaves = new ArrayList();      Iterator iter = this.replSlaveMap.keySet().iterator();      synchronized (this.replSlaveMap) {         while (iter.hasNext()) {            Object key = iter.next();            ReplSlave replSlave = (ReplSlave)this.replSlaveMap.get(key);            String tmpPrefix = replSlave.getReplPrefix();            if (replPrefix.equals(tmpPrefix)) {               slaves.add(key);            }         }      }      SqlStatement sqlStatement = new SqlStatement(replPrefix, reqId, statement, slaves);      this.sqlStatementMap.put(reqId, sqlStatement);      String instanceName = getInstanceName() + ContextNode.SEP + replPrefix + ContextNode.SEP + reqId;      ContextNode contextNode = new ContextNode(ContextNode.CONTRIB_MARKER_TAG, instanceName, this.global.getContextNode());      sqlStatement.setHandle(this.global.registerMBean(contextNode, sqlStatement));   }   private synchronized void unregisterSqlStatement(String reqId) {      log.info("unregistering statement '" + reqId + "'");      SqlStatement sqlStatement = (SqlStatement)this.sqlStatementMap.remove(reqId);      if (sqlStatement == null)         log.warning("The sql statement with request id '" + reqId + "' was not found in the map, can not unregister it");      else         log.info("The sql statement with request id '" + reqId + "' will be unregistered now");      this.global.unregisterMBean(sqlStatement.getHandle());   }      public void removeSqlStatement(String statementId) {      unregisterSqlStatement(statementId);   }      private void sendBroadcastRequest(String replicationPrefix, String sql, boolean isHighPrio, String requestId) throws Exception {      if (replicationPrefix == null)         throw new Exception("executeSql: the replication id is null. Can not perform it.");      if (sql == null)         throw new Exception("executeSql: the sql statement to perform on  '" + replicationPrefix + "' is null. Can not perform it.");      I_Info individualInfo = (I_Info)this.replications.get(replicationPrefix);      if (individualInfo == null)         throw new Exception("executeSql: the replication with Id='" + replicationPrefix + "' was not found (has not been registered). Allowed ones are : " + getReplications());            log.info("Sending Broadcast request for repl='" + replicationPrefix + "' and statement='" + sql + "' and requestId='" + requestId + "'");      String dbWatcherSessionId = individualInfo.get(SENDER_SESSION, null);      registerSqlStatement(replicationPrefix, requestId, sql);            log.info("Broadcasting sql statement '" + sql + "' for master '" + replicationPrefix + "'");      I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess();      // no oid for this ptp message       PublishKey pubKey = new PublishKey(this.global, REQUEST_BROADCAST_SQL_TOPIC);      Destination destination = new Destination(new SessionName(this.global, dbWatcherSessionId));      destination.forceQueuing(true);      PublishQos pubQos = new PublishQos(this.global, destination);      pubQos.setPersistent(true);      if (isHighPrio)         pubQos.setPriority(PriorityEnum.HIGH8_PRIORITY);      // pubQos.addClientProperty(ACTION_ATTR, STATEMENT_ACTION);      pubQos.addClientProperty(STATEMENT_ATTR, sql);      pubQos.addClientProperty(STATEMENT_PRIO_ATTR, isHighPrio);      pubQos.addClientProperty(STATEMENT_ID_ATTR, requestId);      pubQos.addClientProperty(SQL_TOPIC_ATTR, this.sqlTopic);      if (this.maxResponseEntries > -1L) {         pubQos.addClientProperty(MAX_ENTRIES_ATTR, this.maxResponseEntries);         log.info("Be aware that the number of entries in the result set will be limited to '" + this.maxResponseEntries + "'. To change this use 'replication.sqlMaxEntries'");      }      MsgUnit msg = new MsgUnit(pubKey, STATEMENT_ACTION.getBytes(), pubQos);      conn.publish(msg);         }      /**    * @see org.xmlBlaster.contrib.replication.impl.ReplManagerPluginMBean#broadcastSql(java.lang.String, java.lang.String)    */   public void broadcastSql(String repl, String sql) throws Exception {      final boolean highPrio = true;      String requestId = "" + new Timestamp().getTimestamp();      String replicationPrefix = VersionTransformerCache.stripReplicationPrefix(repl);      sendBroadcastRequest(replicationPrefix, sql, highPrio, requestId);   }   /**    * Convenience method to determine if a connect Qos is for us, i.e. if they have    * defined us as the DispatchPlugin in their connect qos.    *     * @param connQos    * @return    */   private final boolean hasUsAsDispatchPlugin(ConnectQosServer connQos) {      if (connQos == null)         return false;      CallbackAddress cbAddr = connQos.getData().getCurrentCallbackAddress();      if (cbAddr == null) {         log.info("entry '" + connQos.toXml() + "' has no callback address defined");         return false;      }      String dispatchPluginName = cbAddr.getDispatchPlugin();      if (dispatchPluginName == null)         return false;      String ownName = getType() + "," + getVersion();      if (ownName.equals(dispatchPluginName))         return true;      return false;   }      // For I_ClientListener ...   /**    * The part of this code inherent to the slave could be moved to the addDispatchManager since that method would     * always invoked too. This method is only invoked on the first connect, which is when the client connects the    * very first time, or when recovering sessions from persistence.    * @see org.xmlBlaster.authentication.I_ClientListener#sessionAdded(org.xmlBlaster.authentication.ClientEvent)    */   public synchronized void sessionAdded(ClientEvent e) throws XmlBlasterException {      if (e == null)         throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "ReplManagerPlugin.sessionAdded with null event object");      ConnectQosServer connQos = e.getConnectQos();            // code for the DbWatchers here      String replId = connQos.getData().getClientProperty(REPL_PREFIX_KEY, (String)null);      if (replId == null || replId.length() < 1)         log.fine("the client property '" + REPL_PREFIX_KEY + "' must be defined but is empty");      else { // then it is a DbWatcher which is used for replication         I_Info info = new ClientPropertiesInfo(connQos.getData().getClientProperties());         String relativeName = e.getSessionInfo().getSessionName().getRelativeName();         register(relativeName, replId, info);      }      // code for DbWatchers ends here            if (!hasUsAsDispatchPlugin(connQos))         return;      log.fine("Connecting with qos : " + connQos.toXml());      String sessionName = e.getSessionInfo().getSessionName().getRelativeName();      log.info("addition of session for '" + sessionName +"' occured");      synchronized (this.replSlaveMap) {         if (!this.replSlaveMap.containsKey(sessionName)) {            I_ReplSlave slave = new ReplSlave(this.global, this, sessionName);            try {               slave.setDispatcher(false, false); // stop dispatcher without persisting the information            }            catch (Exception ex) {               log.warning("Could not set the dispatcher for '" + sessionName + "' to false when adding the session");               ex.printStackTrace();            }            this.replSlaveMap.put(sessionName, slave);         }      }   }   /**    * @see org.xmlBlaster.authentication.I_ClientListener#sessionPreRemoved(org.xmlBlaster.authentication.ClientEvent)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -