📄 replmanagerplugin.java
字号:
/*------------------------------------------------------------------------------Name: ReplManagerPlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileSwitch on finer logging in xmlBlaster.properties:trace[org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter]=truetrace[org.xmlBlaster.contrib.db.DbPool]=truetrace[org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector]=truetrace[org.xmlBlaster.contrib.dbwatcher.detector.AlertScheduler]=truetrace[org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector]=truetrace[org.xmlBlaster.contrib.dbwatcher.plugin.ReplManagerPlugin]=truetrace[org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher]=truetrace[org.xmlBlaster.contrib.dbwatcher.DbWatcher]=true------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.replication.impl;import org.xmlBlaster.authentication.ClientEvent;import org.xmlBlaster.authentication.I_ClientListener;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.authentication.SubjectInfo;import org.xmlBlaster.client.I_Callback;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.contrib.ClientPropertiesInfo;import org.xmlBlaster.contrib.GlobalInfo;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.InfoHelper;import org.xmlBlaster.contrib.MomEventEngine;import org.xmlBlaster.contrib.PropertiesInfo;import org.xmlBlaster.contrib.VersionTransformerCache;import org.xmlBlaster.contrib.db.DbInfo;import org.xmlBlaster.contrib.db.DbPool;import org.xmlBlaster.contrib.db.I_DbPool;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.replication.I_ReplSlave;import org.xmlBlaster.contrib.replication.ReplSlave;import org.xmlBlaster.contrib.replication.ReplicationConstants;import org.xmlBlaster.contrib.replication.SqlStatement;import org.xmlBlaster.engine.I_SubscriptionListener;import org.xmlBlaster.engine.RequestBroker;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.SubscriptionEvent;import org.xmlBlaster.engine.SubscriptionInfo;import org.xmlBlaster.engine.admin.I_AdminSession;import org.xmlBlaster.engine.admin.I_AdminSubject;import org.xmlBlaster.engine.mime.I_PublishFilter;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.protocol.I_Authenticate;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.StringPairTokenizer;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.plugin.I_Plugin;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.QosData;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.QueuePluginManager;import java.io.BufferedReader;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.File;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.StringTokenizer;import java.util.TreeMap;import java.util.logging.Logger;/** * ReplManagerPlugin is a plugin wrapper if you want to run DbWatcher inside xmlBlaster. * <p /> * DbWatcher checks a database for changes and publishes these to the MoM * <p /> * This plugin needs to be registered in <tt>xmlBlasterPlugins.xml</tt> * to be available on xmlBlaster server startup. * * <p> * This plugin uses <tt>java.util.logging</tt> and redirects the logging to xmlBlasters default * logging framework. You can switch this off by setting the attribute <tt>xmlBlaster/jdk14loggingCapture</tt> to false. * </p> * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class ReplManagerPlugin extends GlobalInfo implements ReplManagerPluginMBean, I_Callback, I_MsgDispatchInterceptor, I_ClientListener, I_SubscriptionListener, I_Timeout, ReplicationConstants, I_Plugin, I_PublishFilter { private class Counter { long msg; long[] trans; /** * @deprecated * @param data */ public Counter(long[] data) { boolean oldStyle = data.length < 5; if (oldStyle) { // TODO remove this once no more old data around this.msg = data[2]; this.trans = new long[10]; for (int i=0; i < this.trans.length; i++) this.trans[i] = data[1]; } else { this.msg = data[2]; this.trans = new long[10]; for (int i=0; i < this.trans.length; i++) this.trans[i] = data[i+4]; } } } public final static String SESSION_ID = "replManager/1"; private final static String SENDER_SESSION = "_senderSession"; private final static String ME = ReplManagerPlugin.class.getName(); private static Logger log = Logger.getLogger(ME); private Object mbeanHandle; private String user = "replManager"; private String password = "secret"; private Map replications; private Map replSlaveMap; /** Keys are requestId Strings, and values are SqlStatement objects */ private Map sqlStatementMap; private boolean shutdown; private volatile boolean initialized; private String instanceName; private long maxSize = 999999L; private String sqlTopic; private long maxResponseEntries; private I_DbPool pool; private VersionTransformerCache transformerCache; private String cachedListOfReplications; private String initialFilesLocation; private Timestamp timeoutHandle; private Timeout timeout = new Timeout("ReplManagerPlugin-StatusPoller"); private final static long STATUS_POLLER_INTERVAL_DEFAULT = 5000L; private long statusPollerInterval = STATUS_POLLER_INTERVAL_DEFAULT; private long statusProcessingTime; private long numRefresh; private int maxNumOfEntries = REPLICATION_MAX_ENTRIES_DEFAULT; private I_Info persistentInfo; private Map topicToPrefixMap; private Map counterMap; private Set initialDataTopicSet; /** * Default constructor, you need to call <tt>init()<tt> thereafter. */ public ReplManagerPlugin() { super(new String[] {}); this.replications = new TreeMap(); this.topicToPrefixMap = new HashMap(); this.counterMap = new HashMap(); this.replSlaveMap = new TreeMap(); this.sqlStatementMap = new TreeMap(); this.transformerCache = new VersionTransformerCache(); this.initialDataTopicSet = new HashSet(); } public byte[] transformVersion(String replPrefix, String srcVersion, String destVersion, String destination, byte[] srcData) throws Exception { if (destVersion == null) return srcData; if (destVersion.equalsIgnoreCase(srcVersion)) return srcData; return this.transformerCache.transform(replPrefix, srcVersion, destVersion, destination, srcData, null); } public byte[] transformVersion(String replPrefix, String destVersion, String destination, byte[] content) throws Exception { I_Info tmpInfo = (I_Info)this.replications.get(replPrefix); if (tmpInfo == null) throw new Exception("The replication with replication.prefix='" + replPrefix + "' was not found"); String srcVersion = tmpInfo.get("replication.version", "0.0").trim(); if (srcVersion.length() < 1) throw new Exception("The replication '" + replPrefix + "' has no version defined"); return transformVersion(replPrefix, srcVersion, destVersion, destination, content); } public String transformVersion(String replPrefix, String destVersion, String destination, String is) throws Exception { I_Info tmpInfo = (I_Info)this.replications.get(replPrefix); if (tmpInfo == null) throw new Exception("The replication with replication.prefix='" + replPrefix + "' was not found"); String srcVersion = tmpInfo.get("replication.version", "0.0").trim(); if (srcVersion.length() < 1) throw new Exception("The replication '" + replPrefix + "' has no version defined"); return new String(transformVersion(replPrefix, srcVersion, destVersion, destination, is.getBytes())); } public void clearVersionCache() { this.transformerCache.clearCache(); this.cachedListOfReplications = null; } /** * * @param name the slave associated with this name or null if none found. * @return */ public I_ReplSlave getSlave(String name) { if (name == null || name.length() < 1) return null; synchronized(this.replSlaveMap) { return (I_ReplSlave)this.replSlaveMap.get(name); } } public String reInitiate(String replPrefix) { I_Info info = (I_Info)this.replications.get(replPrefix); I_ReplSlave[] slaves = (I_ReplSlave[])this.replSlaveMap.values().toArray(new I_ReplSlave[this.replSlaveMap.size()]); StringBuffer buf = new StringBuffer(); for (int i=0; i < slaves.length; i++) { String thisReplPrefix = slaves[i].getReplPrefix(); if (thisReplPrefix != null && thisReplPrefix.equals(replPrefix)) { try { slaves[i].reInitiate(info); } catch (Exception ex) { buf.append(slaves[i].toString()); ex.printStackTrace(); } } } String ret = buf.toString(); if (buf.length() > 0) return "FAILED: the slaves " + ret + " did fail"; return "Success: " + slaves.length + " slaves re-initiated"; } /** * Never returns null. It returns a list of keys identifying the slaves using the replication * manager. * @return */ public String getSlaves() { return InfoHelper.getIteratorAsString(this.replSlaveMap.keySet().iterator()); } /** * Never returns null. It returns a list of keys identifying the ongoing replications. * @return */ public String getReplications() { if (this.cachedListOfReplications != null) return this.cachedListOfReplications; // rebuild the cache Iterator iter = this.replications.values().iterator(); boolean isFirst = true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -