📄 replmanagerplugin.java
字号:
*/ public void sessionPreRemoved(ClientEvent e) throws XmlBlasterException { if (e == null) throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "ReplManagerPlugin.sessionAdded with null event object"); ConnectQosServer connQos = e.getConnectQos(); // code for the DbWatcher String replId = connQos.getData().getClientProperty(REPL_PREFIX_KEY, (String)null); if (replId == null || replId.length() < 1) log.fine("the client property '" + REPL_PREFIX_KEY + "' must be defined but is empty"); else { // then it is a DbWatcher used for replication String relativeName = e.getSessionInfo().getSessionName().getRelativeName(); unregister(relativeName, replId); } // end of code for the DbWatcher if (!hasUsAsDispatchPlugin(connQos)) return; String sessionName = e.getSessionInfo().getSessionName().getRelativeName(); log.info("removal of session for '" + sessionName +"' occured"); synchronized (this.replSlaveMap) { if (!this.replSlaveMap.containsKey(sessionName)) log.warning("The slave '" + sessionName + "' is not registered."); else { I_ReplSlave slave = (I_ReplSlave)this.replSlaveMap.remove(sessionName); if (slave != null) { try { slave.shutdown(); } catch (Exception ex) { log.severe("Could not shut down the slave '" + sessionName + "' properly"); ex.printStackTrace(); } } } } } /** * @see org.xmlBlaster.authentication.I_ClientListener#sessionRemoved(org.xmlBlaster.authentication.ClientEvent) */ public void sessionRemoved(ClientEvent e) throws XmlBlasterException { } /** * @see org.xmlBlaster.authentication.I_ClientListener#subjectAdded(org.xmlBlaster.authentication.ClientEvent) */ public void subjectAdded(ClientEvent e) throws XmlBlasterException { } /** * @see org.xmlBlaster.authentication.I_ClientListener#subjectRemoved(org.xmlBlaster.authentication.ClientEvent) */ public void subjectRemoved(ClientEvent e) throws XmlBlasterException { } /** * @see org.xmlBlaster.engine.I_SubscriptionListener#getPriority() */ public Integer getPriority() { // TODO Check if the priority is correct return new Integer(100); } /** * To make it simpler one could think to put this method together with sessionAdded. * This is however not possible since at the time the initiateReplication is invoked, * the subcription is done for the first time. However if sessionAdded was not invoked * previously, there would no be any chance to know that this is wanting to subscribe. * * It checks if the event is for one of our guys and dispatches the call to them * @see org.xmlBlaster.engine.I_SubscriptionListener#subscriptionAdd(org.xmlBlaster.engine.SubscriptionEvent) */ public synchronized void subscriptionAdd(SubscriptionEvent e) throws XmlBlasterException { ConnectQosServer connQos = e.getSubscriptionInfo().getSessionInfo().getConnectQos(); if (!hasUsAsDispatchPlugin(connQos)) return; String relativeSessionName = e.getSubscriptionInfo().getSessionInfo().getSessionName().getRelativeName(); log.info("addition of subscription for '" + relativeSessionName +"' occured"); I_ReplSlave slave = null; synchronized (this.replSlaveMap) { slave = (I_ReplSlave)this.replSlaveMap.get(relativeSessionName); } if (slave != null) { Map clientProperties = e.getSubscriptionInfo().getSubscribeQosServer().getData().getClientProperties(); try { slave.init(new ClientPropertiesInfo(clientProperties)); } catch (Exception ex) { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "ReplManagerPlugin.subscriptionAdd", "", ex); } } else log.severe("Could not find nor create slave '" + relativeSessionName + "'"); } /** * @see org.xmlBlaster.engine.I_SubscriptionListener#subscriptionRemove(org.xmlBlaster.engine.SubscriptionEvent) */ public void subscriptionRemove(SubscriptionEvent e) throws XmlBlasterException { if (e == null) throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "ReplManagerPlugin.sessionAdded with null event object"); ConnectQosServer connQos = e.getSubscriptionInfo().getSessionInfo().getConnectQos(); if (!hasUsAsDispatchPlugin(connQos)) return; String sessionName = e.getSubscriptionInfo().getSessionInfo().getSessionName().getRelativeName(); log.info("removal of one subscription for '" + sessionName +"' occured"); /* synchronized (this.replSlaveMap) { if (!this.replSlaveMap.containsKey(sessionName)) log.warning("The slave '" + sessionName + "' is not registered."); else { this.replSlaveMap.remove(sessionName); } } */ } void setEngineGlobalProperty(String key, String val) { org.xmlBlaster.engine.ServerScope engineGlobal = (org.xmlBlaster.engine.ServerScope)this.global.getObjectEntry(ORIGINAL_ENGINE_GLOBAL); if (engineGlobal != null) engineGlobal.getProperty().getProperties().setProperty(key, val); } public final String recreateTriggers(String replPrefix) throws Exception { // sending the cancel op to the DbWatcher log.info("'will recreate triggers for source '" + replPrefix + "'"); I_Info individualInfo = (I_Info)this.replications.get(replPrefix); if (individualInfo != null) { String dbWatcherSessionId = individualInfo.get(SENDER_SESSION, null); if (dbWatcherSessionId == null) throw new Exception("The replication source with replication.prefix='" + replPrefix + "' had no '_senderSession' attribute set in its configuration"); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); PublishKey pubKey = new PublishKey(this.global, REQUEST_RECREATE_TRIGGERS); Destination destination = new Destination(new SessionName(this.global, dbWatcherSessionId)); destination.forceQueuing(true); PublishQos pubQos = new PublishQos(this.global, destination); pubQos.setPersistent(false); MsgUnit msg = new MsgUnit(pubKey, REPL_REQUEST_RECREATE_TRIGGERS.getBytes(), pubQos); conn.publish(msg); return "Recreate Triggers for '" + replPrefix + "' is ongoing now"; } else throw new Exception("Could not find a replication source with replication.prefix='" + replPrefix + "'"); } public String getInitialFilesLocation() { return this.initialFilesLocation; } public static File checkExistance(String pathName) throws Exception { File dirWhereToStore = new File(pathName); if (!dirWhereToStore.exists()) throw new Exception("The path '" + pathName + "' does not exist"); if (!dirWhereToStore.isDirectory()) throw new Exception("The path '" + pathName + "' is not a directory"); return dirWhereToStore; } private static void mainUsage() { System.err.println("You must invoke at least java org.xmlBlaster.contrib.replication.impl.ReplManagerPlugin -cmd insert|delete -requestId someId -replication.prefix somePrefix < filename"); System.exit(-1); } public static void main(String[] args) { try { Global global = new Global(args); I_XmlBlasterAccess conn = global.getXmlBlasterAccess(); ConnectQos connectQos = new ConnectQos(global); conn.connect(connectQos, new ReplManagerPlugin()); // just a fake String cmd = global.getProperty().get("cmd", (String)null); if (cmd == null) mainUsage(); String requestId = global.getProperty().get("requestId", (String)null); if (requestId == null) mainUsage(); int count = Integer.parseInt(requestId.trim()); String repl = global.getProperty().get(REPL_PREFIX_KEY, REPL_PREFIX_DEFAULT); if (repl == null) mainUsage(); PublishKey pubKey = new PublishKey(global, "broadcastChecker"); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line = null; while ( (line=br.readLine()) != null) { PublishQos pubQos = new PublishQos(global,new Destination(new SessionName(global, SESSION_ID))); requestId = "" + count++; MsgUnit msg = null; if (cmd.equals("insert")) { pubQos.addClientProperty("_command", "broadcastSql"); pubQos.addClientProperty("requestId", requestId); pubQos.addClientProperty("replication.prefix", repl); msg = new MsgUnit(pubKey, line.trim().getBytes(), pubQos); } else { pubQos.addClientProperty("_command", "removeBroadcast"); msg = new MsgUnit(pubKey, requestId.getBytes(), pubQos); } if (line != null && line.trim().length() > 0) { conn.publish(msg); } } conn.erase(new EraseKey(global, "broadcastChecker"), new EraseQos(global)); conn.disconnect(new DisconnectQos(global)); br.close(); } catch (Throwable ex) { ex.printStackTrace(); } } public void timeout(Object userData) { long start = System.currentTimeMillis(); try { I_ReplSlave[] slaves = null; synchronized(this.replSlaveMap) { slaves = (I_ReplSlave[])this.replSlaveMap.values().toArray(new I_ReplSlave[this.replSlaveMap.size()]); } if (slaves != null) { for (int i=0; i < slaves.length; i++) { slaves[i].checkStatus(); } } } catch (Throwable ex) { log.severe("An exception occurred when retrieving the status for all replication writers: " + ex.getMessage()); ex.printStackTrace(); } finally { this.numRefresh++; if (this.numRefresh > Integer.MAX_VALUE) this.numRefresh = 0; this.statusProcessingTime = System.currentTimeMillis() - start; if (this.statusPollerInterval >= 0) this.timeoutHandle = timeout.addTimeoutListener(this, this.statusPollerInterval, null); } } public long getStatusPollerInterval() { return this.statusPollerInterval; } public long getNumOfRefreshes() { return this.numRefresh; } public void setStatusPollerInterval(long statusPollerInterval) { this.statusPollerInterval = statusPollerInterval; if (this.timeoutHandle != null) { this.timeout.removeTimeoutListener(this.timeoutHandle); this.timeoutHandle = null; } if (this.statusPollerInterval >= 0) this.timeoutHandle = timeout.addTimeoutListener(this, this.statusPollerInterval, null); } public long getStatusProcessingTime() { return this.statusProcessingTime; } /** * Does cleanup, particularly it sets the status and counters. */ public void postHandleNextMessages(DispatchManager dispatchManager, MsgUnit[] processedEntries) throws XmlBlasterException { if (!this.initialized) { synchronized(this) { if (!this.initialized) { log.warning("too early to get messages since not initialized yet"); try { Thread.sleep(500L); } catch (Throwable ex) { ex.printStackTrace(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -