📄 replmanagerplugin.java
字号:
} return true; } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#handleNextMessages(org.xmlBlaster.util.dispatch.DispatchManager, java.util.ArrayList) */ public ArrayList handleNextMessages(DispatchManager dispatchManager, ArrayList pushEntries) throws XmlBlasterException { if (!this.initialized) { synchronized(this) { if (!this.initialized) { log.warning("too early to get messages since not initialized yet"); try { Thread.sleep(500L); } catch (Throwable ex) { ex.printStackTrace(); } } } } if (pushEntries != null) { log.warning("got " + pushEntries.size() + " entries in Dispatcher Sync mode (happens on communication Exceptions. Will ignore this"); // return pushEntries; return null; } I_ReplSlave slave = null; String relativeName = dispatchManager.getSessionName().getRelativeName(); int maxEntriesToRetrieve = this.maxNumOfEntries; synchronized (this.replSlaveMap) { slave = (I_ReplSlave)this.replSlaveMap.get(relativeName); if (slave.getStatusAsInt() != I_ReplSlave.STATUS_NORMAL) { log.info("Setting the number of entries to retreive to '1' since status is '" + slave.getStatus() + "' (otherwise it would be '" + this.maxNumOfEntries + "'"); maxEntriesToRetrieve = 1; } } // take messages from queue (none blocking) ... I_Queue cbQueue = dispatchManager.getQueue(); // ArrayList entryList = cbQueue.peekSamePriority(-1, this.maxSize); ArrayList entryList = cbQueue.peekSamePriority(maxEntriesToRetrieve, this.maxSize); log.info("handleNextMessages invoked with '" + entryList.size() + "' entries (max was '" + maxEntriesToRetrieve + "'"); // filter expired entries etc. ... // you should always call this method after taking messages from queue entryList = dispatchManager.prepareMsgsFromQueue(entryList); log.info("handleNextMessages after cleaning up with '" + entryList.size() + "' entries"); if (slave == null) { log.warning("could not find a slave for replication client '" + relativeName + "'"); return entryList; } try { return slave.check(entryList, cbQueue); } catch (Exception ex) { if (slave != null) slave.handleException(ex); if (ex instanceof XmlBlasterException) throw (XmlBlasterException)ex; throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "exception occured when filtering replication messages", "", ex); } catch (Throwable ex) { if (slave != null) slave.handleException(ex); throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "throwable occured when filtering replication messages. " + Global.getStackTraceAsString(ex), "", ex); } } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#initialize(org.xmlBlaster.util.Global, java.lang.String) */ public void initialize(Global glob, String typeVersion) throws XmlBlasterException { } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#isShutdown() */ public synchronized boolean isShutdown() { return this.shutdown; } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#shutdown(org.xmlBlaster.util.dispatch.DispatchManager) */ public synchronized void shutdown(DispatchManager dispatchManager) throws XmlBlasterException { I_ReplSlave slave = null; String name = dispatchManager.getSessionName().getRelativeName(); synchronized (this.replSlaveMap) { slave = (I_ReplSlave)this.replSlaveMap.remove(name); } if (slave != null) { try { // slave.shutdown(); } catch (Exception ex) { ex.printStackTrace(); log.severe("Exception occured when shutting down slave '" + name + "'"); } } } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#toXml(java.lang.String) */ public String toXml(String extraOffset) { return ""; } /** * @see org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor#usage() */ public String usage() { return ""; } /** * @see org.xmlBlaster.util.dispatch.I_ConnectionStatusListener#toAlive(org.xmlBlaster.util.dispatch.DispatchManager, org.xmlBlaster.util.dispatch.ConnectionStateEnum) */ public void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) { } /** * @see org.xmlBlaster.util.dispatch.I_ConnectionStatusListener#toDead(org.xmlBlaster.util.dispatch.DispatchManager, org.xmlBlaster.util.dispatch.ConnectionStateEnum, java.lang.String) */ public void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) { } /** * @see org.xmlBlaster.util.dispatch.I_ConnectionStatusListener#toPolling(org.xmlBlaster.util.dispatch.DispatchManager, org.xmlBlaster.util.dispatch.ConnectionStateEnum) */ public void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) { } private synchronized void registerSqlStatement(String replPrefix, String reqId, String statement) throws Exception { log.info("registering statement '" + replPrefix + "-" + reqId + "' for statement '" + statement + "'"); ArrayList slaves = new ArrayList(); Iterator iter = this.replSlaveMap.keySet().iterator(); synchronized (this.replSlaveMap) { while (iter.hasNext()) { Object key = iter.next(); ReplSlave replSlave = (ReplSlave)this.replSlaveMap.get(key); String tmpPrefix = replSlave.getReplPrefix(); if (replPrefix.equals(tmpPrefix)) { slaves.add(key); } } } SqlStatement sqlStatement = new SqlStatement(replPrefix, reqId, statement, slaves); this.sqlStatementMap.put(reqId, sqlStatement); String instanceName = getInstanceName() + ContextNode.SEP + replPrefix + ContextNode.SEP + reqId; ContextNode contextNode = new ContextNode(ContextNode.CONTRIB_MARKER_TAG, instanceName, this.global.getContextNode()); sqlStatement.setHandle(this.global.registerMBean(contextNode, sqlStatement)); } private synchronized void unregisterSqlStatement(String reqId) { log.info("unregistering statement '" + reqId + "'"); SqlStatement sqlStatement = (SqlStatement)this.sqlStatementMap.remove(reqId); if (sqlStatement == null) log.warning("The sql statement with request id '" + reqId + "' was not found in the map, can not unregister it"); else log.info("The sql statement with request id '" + reqId + "' will be unregistered now"); this.global.unregisterMBean(sqlStatement.getHandle()); } public void removeSqlStatement(String statementId) { unregisterSqlStatement(statementId); } private void sendBroadcastRequest(String replicationPrefix, String sql, boolean isHighPrio, String requestId) throws Exception { if (replicationPrefix == null) throw new Exception("executeSql: the replication id is null. Can not perform it."); if (sql == null) throw new Exception("executeSql: the sql statement to perform on '" + replicationPrefix + "' is null. Can not perform it."); I_Info individualInfo = (I_Info)this.replications.get(replicationPrefix); if (individualInfo == null) throw new Exception("executeSql: the replication with Id='" + replicationPrefix + "' was not found (has not been registered). Allowed ones are : " + getReplications()); log.info("Sending Broadcast request for repl='" + replicationPrefix + "' and statement='" + sql + "' and requestId='" + requestId + "'"); String dbWatcherSessionId = individualInfo.get(SENDER_SESSION, null); registerSqlStatement(replicationPrefix, requestId, sql); log.info("Broadcasting sql statement '" + sql + "' for master '" + replicationPrefix + "'"); I_XmlBlasterAccess conn = this.global.getXmlBlasterAccess(); // no oid for this ptp message PublishKey pubKey = new PublishKey(this.global, REQUEST_BROADCAST_SQL_TOPIC); Destination destination = new Destination(new SessionName(this.global, dbWatcherSessionId)); destination.forceQueuing(true); PublishQos pubQos = new PublishQos(this.global, destination); pubQos.setPersistent(true); if (isHighPrio) pubQos.setPriority(PriorityEnum.HIGH8_PRIORITY); // pubQos.addClientProperty(ACTION_ATTR, STATEMENT_ACTION); pubQos.addClientProperty(STATEMENT_ATTR, sql); pubQos.addClientProperty(STATEMENT_PRIO_ATTR, isHighPrio); pubQos.addClientProperty(STATEMENT_ID_ATTR, requestId); pubQos.addClientProperty(SQL_TOPIC_ATTR, this.sqlTopic); if (this.maxResponseEntries > -1L) { pubQos.addClientProperty(MAX_ENTRIES_ATTR, this.maxResponseEntries); log.info("Be aware that the number of entries in the result set will be limited to '" + this.maxResponseEntries + "'. To change this use 'replication.sqlMaxEntries'"); } MsgUnit msg = new MsgUnit(pubKey, STATEMENT_ACTION.getBytes(), pubQos); conn.publish(msg); } /** * @see org.xmlBlaster.contrib.replication.impl.ReplManagerPluginMBean#broadcastSql(java.lang.String, java.lang.String) */ public void broadcastSql(String repl, String sql) throws Exception { final boolean highPrio = true; String requestId = "" + new Timestamp().getTimestamp(); String replicationPrefix = VersionTransformerCache.stripReplicationPrefix(repl); sendBroadcastRequest(replicationPrefix, sql, highPrio, requestId); } /** * Convenience method to determine if a connect Qos is for us, i.e. if they have * defined us as the DispatchPlugin in their connect qos. * * @param connQos * @return */ private final boolean hasUsAsDispatchPlugin(ConnectQosServer connQos) { if (connQos == null) return false; CallbackAddress cbAddr = connQos.getData().getCurrentCallbackAddress(); if (cbAddr == null) { log.info("entry '" + connQos.toXml() + "' has no callback address defined"); return false; } String dispatchPluginName = cbAddr.getDispatchPlugin(); if (dispatchPluginName == null) return false; String ownName = getType() + "," + getVersion(); if (ownName.equals(dispatchPluginName)) return true; return false; } // For I_ClientListener ... /** * The part of this code inherent to the slave could be moved to the addDispatchManager since that method would * always invoked too. This method is only invoked on the first connect, which is when the client connects the * very first time, or when recovering sessions from persistence. * @see org.xmlBlaster.authentication.I_ClientListener#sessionAdded(org.xmlBlaster.authentication.ClientEvent) */ public synchronized void sessionAdded(ClientEvent e) throws XmlBlasterException { if (e == null) throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "ReplManagerPlugin.sessionAdded with null event object"); ConnectQosServer connQos = e.getConnectQos(); // code for the DbWatchers here String replId = connQos.getData().getClientProperty(REPL_PREFIX_KEY, (String)null); if (replId == null || replId.length() < 1) log.fine("the client property '" + REPL_PREFIX_KEY + "' must be defined but is empty"); else { // then it is a DbWatcher which is used for replication I_Info info = new ClientPropertiesInfo(connQos.getData().getClientProperties()); String relativeName = e.getSessionInfo().getSessionName().getRelativeName(); register(relativeName, replId, info); } // code for DbWatchers ends here if (!hasUsAsDispatchPlugin(connQos)) return; log.fine("Connecting with qos : " + connQos.toXml()); String sessionName = e.getSessionInfo().getSessionName().getRelativeName(); log.info("addition of session for '" + sessionName +"' occured"); synchronized (this.replSlaveMap) { if (!this.replSlaveMap.containsKey(sessionName)) { I_ReplSlave slave = new ReplSlave(this.global, this, sessionName); try { slave.setDispatcher(false, false); // stop dispatcher without persisting the information } catch (Exception ex) { log.warning("Could not set the dispatcher for '" + sessionName + "' to false when adding the session"); ex.printStackTrace(); } this.replSlaveMap.put(sessionName, slave); } } } /** * @see org.xmlBlaster.authentication.I_ClientListener#sessionPreRemoved(org.xmlBlaster.authentication.ClientEvent)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -