📄 publisher.java
字号:
/*------------------------------------------------------------------------------Name: Publisher.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE filep------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.filewatcher;import java.io.InputStream;import java.util.HashMap;import java.util.Map;import java.util.Set;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.contrib.ContribConstants;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher;import org.xmlBlaster.contrib.replication.I_ReplSource;import org.xmlBlaster.contrib.replication.ReplSourceEngine;import org.xmlBlaster.contrib.replication.ReplicationConstants;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.util.qos.MsgQosData;/** * Publisher * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class Publisher implements I_Timeout { private String ME = "Publisher"; private Global global; private static Logger log = Logger.getLogger(Publisher.class.getName()); private DirectoryManager directoryManager; private I_XmlBlasterAccess access; private String publishKey; private String publishQos; private ConnectQos connectQos; private long pollInterval; private long maximumFileSize; private String fileFilter; private String filterType; private String directoryName; private boolean copyOnMove; private String sent; private String discarded; private String lockExtention; private long delaySinceLastFileChange; public static final String USE_REGEX = "regex"; private Timestamp timeoutHandle; private static Timeout timeout = new Timeout("FileSystem-Watcher"); /** used to identify if it has shut down (to get a new global) */ private boolean isShutdown; /** used to break the loop in doPublish when shutting down */ private boolean forceShutdown; /** only used as a default login name and logging */ private String name; private boolean isActive; private int maximumChunkSize = Integer.MAX_VALUE; // Used for monitoring and to use this as a source for replication private String replPrefix; // private String replPrefixGroup; // private String replVersion; private I_Info info_; private ReplSource replSource; private ReplSourceEngine replSourceEngine; private XmlBlasterPublisher publisher; public class ReplSourceData { private final String replTopic; private final String replManagerAddress; private final String requestedVersion; private final String initialFilesLocation; public ReplSourceData(String topic, String address, String version, String location) { this.replTopic = topic; this.replManagerAddress = address; this.requestedVersion = version; this.initialFilesLocation = location; } public String getInitialFilesLocation() { return initialFilesLocation; } public String getReplManagerAddress() { return replManagerAddress; } public String getReplTopic() { return replTopic; } public String getRequestedVersion() { return requestedVersion; } } public class ReplSource implements I_ReplSource { private Map preparedUpdates = new HashMap(); private boolean collectInitialUpdates; private ReplSourceEngine engine; private I_Info info; private String oid; public ReplSource(I_Info info, String oid) { this.info = info; this.oid = oid; } public void setEngine(ReplSourceEngine engine) { this.engine = engine; } public void cancelUpdate(String slaveName) { synchronized(preparedUpdates) { preparedUpdates.remove(slaveName); } } public void collectInitialUpdate() throws Exception { synchronized(preparedUpdates) { collectInitialUpdates = true; preparedUpdates.clear(); } } public byte[] executeStatement(String sql, long maxResponseEntries, boolean isHighPrio, boolean isMaster, String sqlTopic, String statementId) throws Exception { return "".getBytes(); } public void initialUpdate(String topic, String address, String slaveName, String version, String location, boolean onlyRegister) throws Exception { synchronized(preparedUpdates) { if (collectInitialUpdates || onlyRegister) { ReplSourceData data = new ReplSourceData(topic, address, version, location); preparedUpdates.put(slaveName, data); } else { String[] slaveSessionNames = new String[] { slaveName }; long minKey = 0; long maxKey = 1; this.engine.sendInitialDataResponse(slaveSessionNames, address, minKey, maxKey); this.engine.sendEndOfTransitionMessage(info, topic, slaveSessionNames); } } } public void recreateTriggers() throws Exception { } public void startInitialUpdateBatch() throws Exception { synchronized(preparedUpdates) { if (preparedUpdates.size() > 0) { String[] slaveSessionNames = (String[])preparedUpdates.keySet().toArray(new String[preparedUpdates.size()]); ReplSourceData data = (ReplSourceData)preparedUpdates.get(slaveSessionNames[0]); String replManagerAddress = data.getReplManagerAddress(); String initialDataTopic = data.getReplTopic(); long minKey = 0; long maxKey = 1; this.engine.sendInitialDataResponse(slaveSessionNames, replManagerAddress, minKey, maxKey); this.engine.sendEndOfTransitionMessage(info, initialDataTopic, slaveSessionNames); preparedUpdates.clear(); collectInitialUpdates = false; } } } public String getTopic() { return oid; } } public Publisher(Global globOrig, String name, I_Info info) throws XmlBlasterException { ME += "-" + name; this.name = name; this.isShutdown = false; this.global = globOrig.getClone(globOrig.getNativeConnectArgs()); // sets session.timeout to 0 etc. // this.global = globOrig; // this.pluginConfig = pluginConfig; this.info_ = info; if (log.isLoggable(Level.FINER)) log.finer(ME+": constructor"); // retrieve all necessary properties: String tmp = null; tmp = info_.get("mom.publishKey", null); String topicName = info_.get("mom.topicName", null); replPrefix = info_.get(ReplicationConstants.REPL_PREFIX_KEY, null); if (tmp != null) { this.publishKey = tmp; if (topicName != null) log.warning(ME+": constructor: since 'mom.publishKey' is defined, 'topicName' will be ignored"); } else { if (topicName == null) { if (replPrefix == null) throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "at least one of the properties 'topicName', 'publishKey' or 'replication.prefix' must be defined"); else topicName = "topic." + replPrefix.trim(); } this.publishKey = (new PublishKey(this.global, topicName)).toXml(); } this.publishQos = info_.get("mom.publishQos", "<qos/>"); tmp = info_.get("mom.connectQos", null); if (tmp != null) { ConnectQosData data = this.global.getConnectQosFactory().readObject(tmp); this.connectQos = new ConnectQos(this.global, data); } else { String userId = info_.get("mom.loginName", "_" + name); String password = info_.get("mom.password", null); this.connectQos = new ConnectQos(this.global, userId, password); global.addObjectEntry(Constants.OBJECT_ENTRY_ServerScope, globOrig.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope)); } this.fileFilter = info_.get("filewatcher.fileFilter", null); this.directoryName = info_.get("filewatcher.directoryName", null); if (directoryName == null) throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "constructor: 'filewatcher.directoryName' is mandatory"); this.maximumFileSize = info_.getLong("filewatcher.maximumFileSize", 10000000L); this.maximumChunkSize = info_.getInt("filewatcher.maximumChunkSize", Integer.MAX_VALUE); delaySinceLastFileChange = info_.getLong("filewatcher.delaySinceLastFileChange", 10000L); pollInterval = info_.getLong("filewatcher.pollInterval", 2000L); sent = info_.get("filewatcher.sent", null); discarded = info_.get("filewatcher.discarded", null); lockExtention = info_.get("filewatcher.lockExtention", null); // this would throw an exception and act as a validation if something is not OK in configuration new MsgUnit(this.publishKey, (byte[])null, this.publishQos); this.filterType = info_.get("filewatcher.filterType", "simple"); this.copyOnMove = info_.getBoolean("filewatcher.copyOnMove", true); // replPrefixGroup = info_.get(ReplicationConstants.REPL_PREFIX_GROUP_KEY, null); // replVersion = info_.get(ReplicationConstants.REPLICATION_VERSION, null); if (replPrefix != null) { String replVersion = info_.get(ReplicationConstants.REPLICATION_VERSION, null); if (replVersion == null) info_.put(ReplicationConstants.REPLICATION_VERSION, "1.0"); String tmpKey = "replication.countSingleMsg"; String tmpVal = info_.get(tmpKey, null); if (tmpVal == null) info_.put(tmpKey, "true"); MsgKeyData key = global.getMsgKeyFactory().readObject(publishKey); String oid = key.getOid(); info.put("mom.topicName", oid); // this must ALWAYS be set if using replication prepareReplSource(replPrefix != null); } createDirectoryManager(); } private void prepareReplSource(boolean doFill) throws XmlBlasterException { if (!doFill) return; Set set = info_.getKeys(); String[] keys = (String[])set.toArray(new String[set.size()]); for (int i=0; i < keys.length; i++) connectQos.addClientProperty(keys[i], info_.get(keys[i], null)); publisher = new XmlBlasterPublisher(); String oid = null; if (publishKey != null) { MsgKeyData key = global.getMsgKeyFactory().readObject(publishKey); oid = key.getOid(); } publisher.initWithExistingGlob(global, publishKey, publishQos, 0); replSource = new ReplSource(info_, oid); replSourceEngine = new ReplSourceEngine(replPrefix, publisher, replSource); replSource.setEngine(replSourceEngine); Map attrs = new HashMap(); attrs.put("ptp", "true"); try { publisher.registerAlertListener(replSourceEngine, attrs); } catch (Exception ex) { if (ex instanceof XmlBlasterException) throw (XmlBlasterException)ex; throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "Publisher.prepareReplSource", "occured when registering alert listener", ex); } } /** * Create the file checker instance with the current configuration. * @throws XmlBlasterException */ private void createDirectoryManager() throws XmlBlasterException { boolean isTrueRegex = USE_REGEX.equalsIgnoreCase(filterType); this.directoryManager = new DirectoryManager(this.global, this.name, this.directoryName, this.delaySinceLastFileChange, this.fileFilter, this.sent, this.discarded, this.lockExtention, isTrueRegex, this.copyOnMove); } /** * Useful for JMX invocations */ private void reCreateDirectoryManager() { try { createDirectoryManager(); } catch (XmlBlasterException e) { throw new IllegalArgumentException(e.getMessage()); } } public String toString() { return "FileWatcher " + this.filterType + " directoryName=" + this.directoryName + " fileFilter='" + this.fileFilter + "'"; } /** * Connects to the xmlBlaster. * * @throws XmlBlasterException */ public synchronized void init() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME+": init"); if (this.isShutdown) { // on a second init this.global = this.global.getClone(null); } this.isShutdown = false; this.forceShutdown = false; this.access = this.global.getXmlBlasterAccess(); this.access.connect(this.connectQos, publisher); this.isActive = true; if (this.pollInterval >= 0) this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -