📄 replmanagerplugin.java
字号:
StringBuffer buf = new StringBuffer(); while (iter.hasNext()) { I_Info tmpInfo = (I_Info)iter.next(); String tmp = tmpInfo.get(SUPPORTED_VERSIONS, null); log.info("replications : '" + tmp + "'"); if (tmp != null) { if (!isFirst) buf.append(","); isFirst = false; buf.append(tmp); } else { String replPrefix = SpecificDefault.getReplPrefix(tmpInfo); log.warning("Property '" + SUPPORTED_VERSIONS + "' not found for '" + replPrefix + "'"); if (!isFirst) buf.append(","); isFirst = false; buf.append(replPrefix); } } // return InfoHelper.getIteratorAsString(this.replications.keySet().iterator()); this.cachedListOfReplications = buf.toString(); return this.cachedListOfReplications; } public static String getPluginName() { return "ReplManager,1.0"; } public String getType() { return "ReplManager"; } public String getVersion() { return "1.0"; } /** * Creates a I_DbPool object out of the JDBC,1.0 Queue Properties and initializes the pool. * @return * @throws Exception */ private I_DbPool initializePersistentInfo() throws Exception { QueuePluginManager pluginManager = new QueuePluginManager(this.global); PluginInfo queuePluginInfo = new PluginInfo(this.global, pluginManager, "JDBC", "1.0"); Properties prop = (Properties)queuePluginInfo.getParameters(); String dbUrl = prop.getProperty("url", null); String dbUser = prop.getProperty("user", null); String dbPassword = prop.getProperty("password", null); log.info("db.url='" + dbUrl + "' db.user='" + dbUser + "'"); I_Info tmpInfo = new PropertiesInfo(new Properties()); if (dbUrl != null) tmpInfo.put("db.url", dbUrl); else log.warning("the property 'url' was not set"); if (dbUser != null) tmpInfo.put("db.user", dbUser); else log.warning("the property 'user' was not set"); if (dbPassword != null) tmpInfo.put("db.password", dbPassword); else log.warning("the property 'password' was not set"); I_DbPool pool = new DbPool(); pool.init(tmpInfo); this.persistentInfo = new DbInfo(pool, "replication", tmpInfo); return pool; } /** * Intiates the replication for the given slave. * TODO Specify that the replicationKey (dbmasterid) must be short and DB conform. * Usually called by Humans via JMX Console. * * The cascaded replication is the replication which will be automatically started once the initial update of the first replication is finished. This is * used to concatenate replications. A typical usecase is in two way replication, then the initial update of the back replication can be automatically triggered * once the initial update of the main replication is finished. * * @param slaveSessionName * @param replicationKey This is the dbWatcher replication.prefix attribute. * @param cascadeSlaveSessionName The Name of the session of the dbWriter to be used for the cascaded replication. Can be null. * @param cascadedReplicationPrefix the prefix identifing the DbWatcher for the cascaded replication. Can be null. * @param realInitialFilesLocation the file location where the initial dump is stored. If null or an empty String, then it * is assumed to be transfered the "normal" way, that is over the choosen communication protocol. */ public String initiateReplication(String slaveSessionName, String prefixWithVersion, String cascadeSlaveSessionName, String cascadeReplicationPrefix, String realInitialFilesLocation) { try { return initiateReplicationNonMBean(slaveSessionName, prefixWithVersion, cascadeSlaveSessionName, cascadeReplicationPrefix, realInitialFilesLocation); } catch (Exception ex) { return "error: " + ex.getMessage(); } } /** * Intiates the replication for the given slave. * TODO Specify that the replicationKey (dbmasterid) must be short and DB conform. * The cascaded replication is the replication which will be automatically started once the initial update of the first replication is finished. This is * used to concatenate replications. A typical usecase is in two way replication, then the initial update of the back replication can be automatically triggered * once the initial update of the main replication is finished. * * @param slaveSessionName * @param replicationKey This is the dbWatcher replication.prefix attribute. * @param cascadeSlaveSessionName The Name of the session of the dbWriter to be used for the cascaded replication. Can be null. * @param cascadedReplicationPrefix the prefix identifing the DbWatcher for the cascaded replication. Can be null. * @param realInitialFilesLocation the file location where the initial dump is stored. If null or an empty String, then it * is assumed to be transfered the "normal" way, that is over the choosen communication protocol. * @throws Exception */ public String initiateReplicationNonMBean(String slaveSessionName, String prefixWithVersion, String cascadeSlaveSessionName, String cascadeReplicationPrefix, String realInitialFilesLocation) throws Exception { try { if (slaveSessionName == null || slaveSessionName.trim().length() < 1) throw new Exception("ReplManagerPlugin.initiateReplication: The slave session name is null, please provide one"); if (prefixWithVersion == null || prefixWithVersion.length() < 1) throw new Exception("ReplManagerPlugin.initiateReplication: The replication.prefix is null, please provide one"); slaveSessionName = slaveSessionName.trim(); String ret = "initiateReplication invoked for slave '" + slaveSessionName + "' and on replication '" + prefixWithVersion + "' store location : '" + realInitialFilesLocation + "'"; log.info(ret); String replicationPrefix = VersionTransformerCache.stripReplicationPrefix(prefixWithVersion); String requestedVersion = VersionTransformerCache.stripReplicationVersion(prefixWithVersion); I_Info individualInfo = (I_Info)this.replications.get(replicationPrefix); if (individualInfo != null) { if (realInitialFilesLocation != null && realInitialFilesLocation.trim().length() > 0) { checkExistance(realInitialFilesLocation.trim()); this.initialFilesLocation = realInitialFilesLocation.trim(); individualInfo.put(INITIAL_FILES_LOCATION, this.initialFilesLocation); } else { // individualInfo.putObject(INITIAL_FILES_LOCATION, null); individualInfo.put(INITIAL_FILES_LOCATION, null); } individualInfo.put(REPL_VERSION, requestedVersion); individualInfo.putObject("org.xmlBlaster.engine.Global", this.global); I_ReplSlave slave = null; synchronized (this.replSlaveMap) { slave = (I_ReplSlave)this.replSlaveMap.get(slaveSessionName); } if (slave != null) { individualInfo.put("_replName", replicationPrefix); String dbWatcherSessionId = individualInfo.get(SENDER_SESSION, null); if (dbWatcherSessionId == null) throw new Exception("ReplSlave '" + slave + "' constructor: the master Session Id (which is passed in the properties as '" + SENDER_SESSION + "' are not found. Can not continue with initial update"); if (cascadeSlaveSessionName != null) { // check to avoid loops cascadeSlaveSessionName = cascadeSlaveSessionName.trim(); if (slaveSessionName.equals(cascadeSlaveSessionName)) throw new Exception(ret + " did fail since having the same slave '" + slaveSessionName + "' for both replications would result in a loop"); // return "error: " + ret + " did fail since having the same slave '" + slaveSessionName + "' for both replications would result in a loop"; } boolean isOkToStart = slave.run(individualInfo, dbWatcherSessionId, cascadeReplicationPrefix, cascadeSlaveSessionName, false); if (isOkToStart == false) { ret += " did fail since your status is '" + slave.getStatus() + "'. Please invoke first 'Cancel Update'"; throw new Exception(ret); // return "error: " + ret; // don't throw an exception here since MX4J seems to loose exception msg. } } else throw new Exception("the replication slave '" + slaveSessionName + "' was not found among the list of slaves which is '" + getSlaves() + "'"); } else throw new Exception("initiateReplication failed for '" + slaveSessionName + "' with replication key '" + replicationPrefix + "' since not known. Known are '" + getReplications() + "'"); return ret; } catch (Exception ex) { ex.printStackTrace(); throw ex; } } /** * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, org.xmlBlaster.util.plugin.PluginInfo) */ protected synchronized void doInit(Global global, PluginInfo pluginInfo) throws XmlBlasterException { // if (this.initialized) // return; try { // String momClass = get("mom.class", "org.xmlBlaster.contrib.MomEventEngine").trim(); // String registryName = "mom.publisher"; synchronized (ReplManagerPlugin.class) { this.instanceName = "replication"; } ContextNode contextNode = new ContextNode(ContextNode.CONTRIB_MARKER_TAG, instanceName, this.global.getContextNode()); if (!this.global.isRegisteredMBean(contextNode)) this.mbeanHandle = this.global.registerMBean(contextNode, this); if (this.pool == null) this.pool = initializePersistentInfo(); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); ConnectQos connectQos = new ConnectQos(this.global, this.user, this.password); boolean persistentConnection = true; boolean persistentSubscription = true; connectQos.setPersistent(persistentConnection); connectQos.setMaxSessions(1); connectQos.setPtpAllowed(true); connectQos.setSessionTimeout(0L); String sessionName = REPL_MANAGER_SESSION; connectQos.setSessionName(new SessionName(this.global, sessionName)); conn.connect(connectQos, this); // this is the instance passed from the outside, not a clone, otherwise // it will not find the plugin registry for the MIME plugin putObject("org.xmlBlaster.engine.Global", global); getEngineGlobal(this.global).getPluginRegistry().register(getType() + "," + getVersion(), this); this.sqlTopic = this.get("replication.sqlTopic", "sqlTopic"); if (this.sqlTopic != null) { SubscribeKey subKey = new SubscribeKey(this.global, this.sqlTopic); SubscribeQos subQos = new SubscribeQos(this.global); subQos.setPersistent(persistentSubscription); subQos.setMultiSubscribe(false); conn.subscribe(subKey, subQos); } this.maxResponseEntries = this.getLong("replication.sqlMaxEntries", 10L); RequestBroker rb = getEngineGlobal(this.global).getRequestBroker(); rb.getAuthenticate(null).addClientListener(this); SessionInfo[] sessionInfos = rb.getAuthenticate(null).getSessionInfoArr(); for (int i=0; i < sessionInfos.length; i++) { SessionInfo sessionInfo = sessionInfos[i]; ClientEvent event = new ClientEvent(sessionInfo); sessionAdded(event); I_SubscriptionListener oldListener = rb.getSubscriptionListener(getPriority()); if (oldListener == null) rb.addSubscriptionListener(this); SubscriptionInfo[] subInfos = rb.getClientSubscriptions().getSubscriptions(sessionInfo); for (int j=0; j < subInfos.length; j++) { SubscriptionEvent subEvent = new SubscriptionEvent(subInfos[j]); subscriptionAdd(subEvent); } } this.initialFilesLocation = this.get("replication.initialFilesLocation", "${user.home}/tmp"); this.statusPollerInterval = this.getLong("replication.monitor.statusPollerInterval", STATUS_POLLER_INTERVAL_DEFAULT); if (this.statusPollerInterval > 0) this.timeoutHandle = timeout.addTimeoutListener(this, this.statusPollerInterval, null); else log.warning("The 'replication.monitor.statusPollerInterval' is set to '" + this.statusPollerInterval + "' which is lower than 1 ms, I will not activate it"); this.maxNumOfEntries = this.getInt(REPLICATION_MAX_ENTRIES_KEY, REPLICATION_MAX_ENTRIES_DEFAULT); log.info("Will send a maximum of '" + this.maxNumOfEntries + "' on each sweep"); this.initialized = true; } catch (Throwable e) { throw new XmlBlasterException(this.global, ErrorCode.RESOURCE_CONFIGURATION, "ReplManagerPlugin", "init failed", e); } log.info("Loaded ReplManagerPlugin '" + getType() + "'"); } private org.xmlBlaster.engine.ServerScope getEngineGlobal(Global glob) { return (org.xmlBlaster.engine.ServerScope)glob.getObjectEntry(ORIGINAL_ENGINE_GLOBAL); } /** * @see org.xmlBlaster.util.plugin.I_Plugin#shutdown() */ public synchronized void shutdown() { if (this.shutdown) return; try { super.shutdown(); if (this.timeoutHandle != null) { this.timeout.removeTimeoutListener(this.timeoutHandle); this.timeoutHandle = null; } this.global.unregisterMBean(this.mbeanHandle); getEngineGlobal(this.global).getRequestBroker().getAuthenticate(null).removeClientListener(this); getEngineGlobal(this.global).getRequestBroker().removeSubscriptionListener(this); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); if (this.sqlTopic != null) { UnSubscribeKey key = new UnSubscribeKey(this.global, this.sqlTopic); conn.unSubscribe(key, new UnSubscribeQos(this.global)); } conn.disconnect(new DisconnectQos(this.global));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -