⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 publisher.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      Publisher.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE filep------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.filewatcher;import java.io.InputStream;import java.util.HashMap;import java.util.Map;import java.util.Set;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.contrib.ContribConstants;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher;import org.xmlBlaster.contrib.replication.I_ReplSource;import org.xmlBlaster.contrib.replication.ReplSourceEngine;import org.xmlBlaster.contrib.replication.ReplicationConstants;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.util.qos.MsgQosData;/** * Publisher * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class Publisher implements I_Timeout {   private String ME = "Publisher";   private Global global;   private static Logger log = Logger.getLogger(Publisher.class.getName());   private DirectoryManager directoryManager;   private I_XmlBlasterAccess access;   private String publishKey;   private String publishQos;   private ConnectQos connectQos;   private long pollInterval;   private long maximumFileSize;   private String fileFilter;   private String filterType;   private String directoryName;   private boolean copyOnMove;   private String sent;   private String discarded;   private String lockExtention;   private long delaySinceLastFileChange;      public static final String USE_REGEX = "regex";   private Timestamp timeoutHandle;   private static Timeout timeout = new Timeout("FileSystem-Watcher");      /** used to identify if it has shut down (to get a new global) */   private boolean isShutdown;   /** used to break the loop in doPublish when shutting down */   private boolean forceShutdown;      /** only used as a default login name and logging */   private String name;      private boolean isActive;   private int maximumChunkSize = Integer.MAX_VALUE;      // Used for monitoring and to use this as a source for replication   private String replPrefix;   // private String replPrefixGroup;   // private String replVersion;   private I_Info info_;   private ReplSource replSource;   private ReplSourceEngine replSourceEngine;   private XmlBlasterPublisher publisher;      public class ReplSourceData {            private final String replTopic;      private final String replManagerAddress;      private final String requestedVersion;      private final String initialFilesLocation;            public ReplSourceData(String topic, String address, String version, String location) {         this.replTopic = topic;         this.replManagerAddress = address;         this.requestedVersion = version;         this.initialFilesLocation = location;      }      public String getInitialFilesLocation() {         return initialFilesLocation;      }      public String getReplManagerAddress() {         return replManagerAddress;      }      public String getReplTopic() {         return replTopic;      }      public String getRequestedVersion() {         return requestedVersion;      }   }      public class ReplSource implements I_ReplSource {            private Map preparedUpdates = new HashMap();      private boolean collectInitialUpdates;      private ReplSourceEngine engine;      private I_Info info;      private String oid;            public ReplSource(I_Info info, String oid) {         this.info = info;         this.oid = oid;      }            public void setEngine(ReplSourceEngine engine) {         this.engine = engine;      }      public void cancelUpdate(String slaveName) {         synchronized(preparedUpdates) {            preparedUpdates.remove(slaveName);         }      }      public void collectInitialUpdate() throws Exception {         synchronized(preparedUpdates) {            collectInitialUpdates = true;            preparedUpdates.clear();         }      }      public byte[] executeStatement(String sql, long maxResponseEntries, boolean isHighPrio, boolean isMaster, String sqlTopic, String statementId) throws Exception {         return "".getBytes();      }      public void initialUpdate(String topic, String address, String slaveName, String version, String location, boolean onlyRegister) throws Exception {         synchronized(preparedUpdates) {            if (collectInitialUpdates || onlyRegister) {               ReplSourceData data = new ReplSourceData(topic, address, version, location);               preparedUpdates.put(slaveName, data);            }            else {               String[] slaveSessionNames = new String[] { slaveName };               long minKey = 0;               long maxKey = 1;               this.engine.sendInitialDataResponse(slaveSessionNames, address, minKey, maxKey);               this.engine.sendEndOfTransitionMessage(info, topic, slaveSessionNames);            }         }      }      public void recreateTriggers() throws Exception {      }      public void startInitialUpdateBatch() throws Exception {         synchronized(preparedUpdates) {            if (preparedUpdates.size() > 0) {               String[] slaveSessionNames = (String[])preparedUpdates.keySet().toArray(new String[preparedUpdates.size()]);               ReplSourceData data = (ReplSourceData)preparedUpdates.get(slaveSessionNames[0]);               String replManagerAddress = data.getReplManagerAddress();               String initialDataTopic = data.getReplTopic();               long minKey = 0;               long maxKey = 1;               this.engine.sendInitialDataResponse(slaveSessionNames, replManagerAddress, minKey, maxKey);               this.engine.sendEndOfTransitionMessage(info, initialDataTopic, slaveSessionNames);               preparedUpdates.clear();               collectInitialUpdates = false;            }         }      }      public String getTopic() {         return oid;      }   }         public Publisher(Global globOrig, String name, I_Info info) throws XmlBlasterException {      ME += "-" + name;      this.name = name;      this.isShutdown = false;      this.global = globOrig.getClone(globOrig.getNativeConnectArgs()); // sets session.timeout to 0 etc.      // this.global = globOrig;      // this.pluginConfig = pluginConfig;      this.info_ = info;      if (log.isLoggable(Level.FINER))          log.finer(ME+": constructor");      // retrieve all necessary properties:      String tmp = null;      tmp = info_.get("mom.publishKey", null);      String topicName =  info_.get("mom.topicName", null);      replPrefix = info_.get(ReplicationConstants.REPL_PREFIX_KEY, null);      if (tmp != null) {         this.publishKey = tmp;         if (topicName != null)            log.warning(ME+": constructor: since 'mom.publishKey' is defined, 'topicName' will be ignored");      }      else {         if (topicName == null) {            if (replPrefix == null)               throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "at least one of the properties 'topicName', 'publishKey' or 'replication.prefix' must be defined");            else               topicName = "topic." + replPrefix.trim();         }         this.publishKey = (new PublishKey(this.global, topicName)).toXml();       }            this.publishQos = info_.get("mom.publishQos", "<qos/>");      tmp  = info_.get("mom.connectQos", null);      if (tmp != null) {         ConnectQosData data = this.global.getConnectQosFactory().readObject(tmp);         this.connectQos = new ConnectQos(this.global, data);      }      else {         String userId = info_.get("mom.loginName", "_" + name);         String password = info_.get("mom.password", null);         this.connectQos = new ConnectQos(this.global, userId, password);         global.addObjectEntry(Constants.OBJECT_ENTRY_ServerScope, globOrig.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope));      }      this.fileFilter =  info_.get("filewatcher.fileFilter", null);      this.directoryName = info_.get("filewatcher.directoryName", null);      if (directoryName == null)         throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "constructor: 'filewatcher.directoryName' is mandatory");            this.maximumFileSize = info_.getLong("filewatcher.maximumFileSize", 10000000L);      this.maximumChunkSize = info_.getInt("filewatcher.maximumChunkSize", Integer.MAX_VALUE);      delaySinceLastFileChange = info_.getLong("filewatcher.delaySinceLastFileChange", 10000L);      pollInterval = info_.getLong("filewatcher.pollInterval", 2000L);      sent =  info_.get("filewatcher.sent", null);      discarded =  info_.get("filewatcher.discarded", null);      lockExtention =  info_.get("filewatcher.lockExtention", null);           // this would throw an exception and act as a validation if something is not OK in configuration      new MsgUnit(this.publishKey, (byte[])null, this.publishQos);      this.filterType = info_.get("filewatcher.filterType", "simple");      this.copyOnMove = info_.getBoolean("filewatcher.copyOnMove", true);      // replPrefixGroup = info_.get(ReplicationConstants.REPL_PREFIX_GROUP_KEY, null);      // replVersion = info_.get(ReplicationConstants.REPLICATION_VERSION, null);      if (replPrefix != null) {         String replVersion = info_.get(ReplicationConstants.REPLICATION_VERSION, null);         if (replVersion == null)            info_.put(ReplicationConstants.REPLICATION_VERSION, "1.0");         String tmpKey = "replication.countSingleMsg";         String tmpVal = info_.get(tmpKey, null);         if (tmpVal == null)            info_.put(tmpKey, "true");         MsgKeyData key = global.getMsgKeyFactory().readObject(publishKey);         String oid = key.getOid();         info.put("mom.topicName", oid); // this must ALWAYS be set if using replication         prepareReplSource(replPrefix != null);      }      createDirectoryManager();   }      private void prepareReplSource(boolean doFill) throws XmlBlasterException {      if (!doFill)         return;      Set set = info_.getKeys();      String[] keys = (String[])set.toArray(new String[set.size()]);      for (int i=0; i < keys.length; i++)         connectQos.addClientProperty(keys[i], info_.get(keys[i], null));      publisher = new XmlBlasterPublisher();      String oid = null;      if (publishKey != null) {         MsgKeyData key = global.getMsgKeyFactory().readObject(publishKey);         oid = key.getOid();      }      publisher.initWithExistingGlob(global, publishKey, publishQos, 0);      replSource = new ReplSource(info_, oid);      replSourceEngine = new ReplSourceEngine(replPrefix, publisher, replSource);      replSource.setEngine(replSourceEngine);      Map attrs = new HashMap();      attrs.put("ptp", "true");      try {         publisher.registerAlertListener(replSourceEngine, attrs);      }      catch (Exception ex) {         if (ex instanceof XmlBlasterException)            throw (XmlBlasterException)ex;         throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_UNKNOWN, "Publisher.prepareReplSource", "occured when registering alert listener", ex);      }   }      /**    * Create the file checker instance with the current configuration.     * @throws XmlBlasterException    */   private void createDirectoryManager() throws XmlBlasterException {      boolean isTrueRegex = USE_REGEX.equalsIgnoreCase(filterType);      this.directoryManager = new DirectoryManager(this.global,            this.name, this.directoryName, this.delaySinceLastFileChange,             this.fileFilter, this.sent, this.discarded, this.lockExtention, isTrueRegex,            this.copyOnMove);   }   /**    * Useful for JMX invocations    */   private void reCreateDirectoryManager() {      try {         createDirectoryManager();      } catch (XmlBlasterException e) {         throw new IllegalArgumentException(e.getMessage());      }   }   public String toString() {      return "FileWatcher " + this.filterType + " directoryName=" + this.directoryName + " fileFilter='" + this.fileFilter + "'";   }      /**    * Connects to the xmlBlaster.    *     * @throws XmlBlasterException    */   public synchronized void init() throws XmlBlasterException {      if (log.isLoggable(Level.FINER))          log.finer(ME+": init");      if (this.isShutdown) { // on a second init         this.global = this.global.getClone(null);      }      this.isShutdown = false;      this.forceShutdown = false;      this.access = this.global.getXmlBlasterAccess();            this.access.connect(this.connectQos, publisher);      this.isActive = true;      if (this.pollInterval >= 0)         this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);   }   

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -