📄 replmanagerplugin.java
字号:
this.replications.clear(); this.replSlaveMap.clear(); this.topicToPrefixMap.clear(); this.counterMap.clear(); synchronized(this.sqlStatementMap) { String[] keys = (String[])this.sqlStatementMap.keySet().toArray(new String[this.sqlStatementMap.size()]); for (int i=0; i < keys.length; i++) unregisterSqlStatement(keys[i]); } getEngineGlobal(this.global).getPluginRegistry().unRegister(getType() + "," + getVersion()); this.pool.shutdown(); } catch (Throwable e) { log.warning("Ignoring shutdown problem: " + e.toString()); } this.shutdown = true; log.info("Stopped DbWatcher plugin '" + getType() + "'"); } /** * Gets the properties associated to this replication. Note that the info is this of the last * registration. This method can return null if no object is found or if the replicationPrefix * was null. * * @param replicationPrefix * @return */ public I_Info getReplicationInfo(String replicationPrefix) { if (replicationPrefix == null) return null; synchronized (this.replications) { return (I_Info)this.replications.get(replicationPrefix); } } /** * Used to register a dbWatcher. This is a request coming directly from the * DbWatcher which registeres himself to this plugin. * Note that if you are using the same id for the replication on several DbWatcher * (several writers) only the first dbWatcher will pass the configuration. You are * responsible of ensuring that the relevant configuration parameters are the same * for all such DbWatcher instances. * * @param senderSession The session requesting this registration. This is needed * to reply to the right requestor. * * @param replId * @param info These are the Configuration of the DbWatcher, for example Table Names and so forth. */ public synchronized void register(String senderSession, String replicationPrefix, I_Info info) { I_Info oldInfo = (I_Info)this.replications.get(replicationPrefix); info.put(SENDER_SESSION, senderSession); String topicName = info.get("mom.topicName", null); if (topicName == null) log.severe("Topic name not found for '" + replicationPrefix + "' can not map the topic to the replication prefix"); else { this.topicToPrefixMap.put(topicName, replicationPrefix); String name = "replication." + replicationPrefix + ".replData"; long[] replData = readOldReplData(this.persistentInfo, name); this.counterMap.put(replicationPrefix, new Counter(replData)); } if (oldInfo != null) { log.info("register '" + replicationPrefix + "' by senderSession='" + senderSession + "'"); String oldSenderSession = oldInfo.get(SENDER_SESSION, senderSession); if (oldSenderSession.equals(senderSession)) { log.info("register '" + replicationPrefix + "' by senderSession='" + senderSession + "' will overwrite old registration done previously"); this.replications.put(replicationPrefix, info); } else { log.info("register '" + replicationPrefix + "' by senderSession='" + senderSession + "' was not done since there is a registration done by '" + oldSenderSession + "'. Will ignore the new one."); } } else this.replications.put(replicationPrefix, info); String initialDataTopic = info.get("replication.initialDataTopic", "replication.initialData"); if (initialDataTopic != null) this.initialDataTopicSet.add(initialDataTopic); else log.severe("The initialDataTopic for replication '" + replicationPrefix + "' was null"); // should never happen this.cachedListOfReplications = null; // clear the cache } public synchronized void unregister(String senderSession, String replicationPrefix) { I_Info oldInfo = (I_Info)this.replications.get(replicationPrefix); if (oldInfo == null) log.info("unregister '" + replicationPrefix + "' by senderSession='" + senderSession + "' is ignored since there is no such registration done"); else { log.info("unregister '" + replicationPrefix + "' by senderSession='" + senderSession + "'"); /* if (log.isLoggable(Level.FINE)) { log.fine("unregister '" + replId + "' by senderSession='" + senderSession + "' the stack trace is:"); Thread.dumpStack(); } */ String oldSenderSession = oldInfo.get(SENDER_SESSION, senderSession); if (oldSenderSession.equals(senderSession)) { this.replications.remove(replicationPrefix); } else { log.warning("unregister '" + replicationPrefix + "' by senderSession='" + senderSession + "' was not done since there is a registration done by '" + oldSenderSession + "'. Please do it with the correct Session"); } String topicName = oldInfo.get("mom.topicName", null); if (topicName != null) { this.topicToPrefixMap.remove(topicName); this.counterMap.remove(replicationPrefix); } } String initialDataTopic = oldInfo.get("replication.initialDataTopic", "replication.initialData"); if (initialDataTopic != null) this.initialDataTopicSet.remove(initialDataTopic); else log.severe("The initialDataTopic for replication '" + replicationPrefix + "' was null"); // should never happen this.cachedListOfReplications = null; // clear the cache } public static byte[] getContent(InputStream is) throws IOException, ClassNotFoundException { int ret = 0; byte[] buf = new byte[1024]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { while ( (ret=is.read(buf)) > -1) { baos.write(buf, 0, ret); } } catch (IOException ex) { ex.printStackTrace(); return new byte[0]; } return baos.toByteArray(); } /** * It receives events from all ReplicationConverter instances which want to register themselves for * administration of initial updates. * * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos) */ public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException { try { InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), updateQos.getClientProperties()); content = getContent(is); SessionName senderSession = updateQos.getSender(); String request = updateQos.getClientProperty("_command", ""); log.info("The master Replicator with session '" + senderSession.getRelativeName() + "' is sending '" + request + "'"); if ("broadcastSql".equalsIgnoreCase(request)) { try { final boolean highPrio = true; String requestId = updateQos.getClientProperty("requestId", (String)null); if (requestId == null) throw new Exception("The requestId has not been defined"); String repl = updateQos.getClientProperty(REPL_PREFIX_KEY, REPL_PREFIX_DEFAULT); String sql = new String(content); sendBroadcastRequest(repl, sql, highPrio, requestId); return "OK"; } catch (Throwable ex) { ex.printStackTrace(); log.severe("An exception occured during an sql broadcast message:" + ex.getMessage() + "' will continue anyway to avoid stopping dispatcher"); return "OK"; // we don't want to stop the dispatcher } } else if ("removeBroadcast".equalsIgnoreCase(request)) { try { removeSqlStatement(new String(content)); return "OK"; } catch (Throwable ex) { ex.printStackTrace(); log.severe("An exception occured when removing an sql broadcast:" + ex.getMessage() + "' will continue anyway to avoid stopping dispatcher"); return "OK"; // we don't want to stop the dispatcher } } // 1. This is a response from an sql statement which has been previously sent to the slaves. else if (this.sqlTopic != null && updateKey.getOid().equals(this.sqlTopic)) { ClientProperty prop = (ClientProperty)updateQos.getClientProperties().get(STATEMENT_ID_ATTR); if (prop == null) { log.severe("The statement id is not specified, can not process it"); return "OK"; // we don't want to stop the dispatcher } String reqId = prop.getStringValue(); SqlStatement sqlStatement = (SqlStatement)this.sqlStatementMap.get(reqId); if (sqlStatement == null) { log.severe("The statement with id '" + reqId + "' has not been found"); return "OK"; // we don't want to stop the dispatcher } prop = (ClientProperty)updateQos.getClientProperties().get(EXCEPTION_ATTR); String response = null; boolean isException = false; if (prop != null) { response = prop.getStringValue(); isException = true; } prop = (ClientProperty)updateQos.getClientProperties().get(MASTER_ATTR); if (prop != null) { // then it is the response from the master String replPrefix = prop.getStringValue(); if (response == null) response = new String(content); sqlStatement.setResponse(replPrefix, response, isException); } else { if (response == null) response = new String(content); sqlStatement.setResponse(senderSession.getRelativeName(), response, isException); } } // 2. This is the response coming from a DbWatcher on a request for initial update which one of the ReplSlaves has previously requested. else if ("INITIAL_DATA_RESPONSE".equals(request)) { long minReplKey = updateQos.getClientProperty("_minReplKey", 0L); long maxReplKey = updateQos.getClientProperty("_maxReplKey", 0L); try { String completeSlaveName = updateQos.getClientProperty("_slaveName", (String)null); if (completeSlaveName == null) log.severe("on initial data response the slave name was not specified. Can not perform operation"); else { String[] slaveNames = StringPairTokenizer.parseLine(completeSlaveName, ','); for (int i=0; i < slaveNames.length; i++) { I_ReplSlave slave = null; synchronized (this.replSlaveMap) { slave = (I_ReplSlave)this.replSlaveMap.get(slaveNames[i]); } if (slave == null) log.severe("on initial data response the slave name '" + slaveNames[i] + "' was not registered (could have already logged out)"); else slave.reactivateDestination(minReplKey, maxReplKey); } } } catch (Exception ex) { log.warning("reactivateDestination encountered an exception '" + ex.getMessage()); } } return "OK"; } catch (Throwable ex) { ex.printStackTrace(); log.severe("Throwable occured in the update method of ReplManagerPlugin"); // throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "XmlBlasterPublisher.update", "user exception", ex); return "OK"; // we don't want to stop the dispatcher } } // enforced by I_MsgDispatchInterceptor /** * This method is invoked always so see sessionAdded javadoc. * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#addDispatchManager(org.xmlBlaster.util.dispatch.DispatchManager) */ public void addDispatchManager(DispatchManager dispatchManager) { /* try { SessionName sessionName = dispatchManager.getSessionName(); if (sessionName == null) { log.severe("The sessionName is null: " + dispatchManager.toXml("")); Thread.dumpStack(); } else { log.info("Adding dispatch Manager for '" + sessionName + "'"); String relativeSessionName = sessionName.getRelativeName(); I_ReplSlave slave = new ReplSlave(this.global, this.pool, this, relativeSessionName); synchronized (this.replSlaveMap) { this.replSlaveMap.put(relativeSessionName, slave); } } } catch (XmlBlasterException ex) { ex.printStackTrace(); } */ } public String getInstanceName() { return this.instanceName; } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#doActivate(org.xmlBlaster.util.dispatch.DispatchManager) */ public boolean doActivate(DispatchManager dispatchManager) { if (dispatchManager.getDispatchConnectionsHandler().isPolling()) { log.fine("Can't send message as connection is lost and we are polling"); return false;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -