⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replsourceengine.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
package org.xmlBlaster.contrib.replication;import java.io.InputStream;import java.util.HashMap;import java.util.Map;import java.util.logging.Logger;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.TextMessage;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.dbwriter.info.SqlDescription;import org.xmlBlaster.contrib.dbwriter.info.SqlInfo;import org.xmlBlaster.contrib.replication.impl.ReplManagerPlugin;import org.xmlBlaster.contrib.replication.impl.SpecificDefault;import org.xmlBlaster.jms.XBDestination;import org.xmlBlaster.jms.XBMessageProducer;import org.xmlBlaster.jms.XBSession;import org.xmlBlaster.util.I_ReplaceContent;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;public class ReplSourceEngine implements I_Update, ReplicationConstants, I_ReplaceContent {      private final static Logger log = Logger.getLogger(ReplSourceEngine.class.getName());      private String replPrefix;   private I_ChangePublisher publisher;   private I_ReplSource source;   private long messageSeq;      public ReplSourceEngine(String replPrefix, I_ChangePublisher publisher, I_ReplSource source) {      this.replPrefix = replPrefix;      this.publisher = publisher;      this.source = source;   }      /**    * @see org.xmlBlaster.contrib.I_Update#update(java.lang.String, byte[], java.util.Map)    */   public final void update(String topic, InputStream is, Map attrMap) {      String msg = new String();      try {         if (is != null)            msg = new String(ReplManagerPlugin.getContent(is));         // this comes from the requesting ReplSlave         log.info("update for '" + topic + "' and msg='" + msg + "'");         if (REPL_REQUEST_UPDATE.equals(msg)) {            ClientProperty prop = (ClientProperty)attrMap.get("_sender");            if (prop == null)               throw new Exception("update for '" + msg + "' failed since no '_sender' specified");            String replManagerAddress = prop.getStringValue();            String replTopic = source.getTopic();            if (replTopic == null)               throw new Exception("update for '" + msg + "' failed since the property 'mom.topicName' has not been defined. Check your DbWatcher Configuration file");            prop = (ClientProperty)attrMap.get(SLAVE_NAME);            if (prop == null)               throw new Exception("update for '" + msg + "' failed since no '_slaveName' specified");            String slaveName = prop.getStringValue();                        prop = (ClientProperty)attrMap.get(REPL_VERSION);            String requestedVersion = null;            if (prop != null)               requestedVersion = prop.getStringValue();            // this.dbSpecific.initiateUpdate(replTopic, destination, slaveName);            prop = (ClientProperty)attrMap.get(INITIAL_FILES_LOCATION);            String initialFilesLocation = null;            if (prop != null)               initialFilesLocation = prop.getStringValue();                        prop = (ClientProperty)attrMap.get(INITIAL_UPDATE_ONLY_REGISTER);            boolean onlyRegister = false;            if (prop != null)               onlyRegister = prop.getBooleanValue();                        // String slaveName, String replVersion, String initialFilesLocation, boolean onlyRegister            source.initialUpdate(replTopic, replManagerAddress, slaveName, requestedVersion, initialFilesLocation, onlyRegister);                     }         else if (REPL_REQUEST_CANCEL_UPDATE.equals(msg)) {            // do cancel            ClientProperty prop = (ClientProperty)attrMap.get(SLAVE_NAME);            if (prop == null)               throw new Exception("update for '" + msg + "' failed since no '_slaveName' specified");            String slaveName = prop.getStringValue();            source.cancelUpdate(slaveName);         }         else if (REPL_REQUEST_RECREATE_TRIGGERS.equals(msg)) {            source.recreateTriggers();         }         else if (STATEMENT_ACTION.equals(msg)) {            String sql = ((ClientProperty)attrMap.get(STATEMENT_ATTR)).getStringValue();            boolean isHighPrio = ((ClientProperty)attrMap.get(STATEMENT_PRIO_ATTR)).getBooleanValue();            long maxResponseEntries = ((ClientProperty)attrMap.get(MAX_ENTRIES_ATTR)).getLongValue();            String statementId = ((ClientProperty)attrMap.get(STATEMENT_ID_ATTR)).getStringValue();            String sqlTopic =  ((ClientProperty)attrMap.get(SQL_TOPIC_ATTR)).getStringValue();            log.info("Be aware that the number of entries in the result set will be limited to '" + maxResponseEntries + "'. To change this use 'replication.sqlMaxEntries'");            final boolean isMaster = true;            byte[] response  = null;            Exception ex = null;            try {               response = source.executeStatement(sql, maxResponseEntries, isHighPrio, isMaster, sqlTopic, statementId);            }            catch (Exception e) {               response = "".getBytes();               ex = e;            }                        if (this.publisher != null) {               Map map = new HashMap();               map.put(MASTER_ATTR, this.replPrefix);               map.put(STATEMENT_ID_ATTR, statementId);               map.put("_command", STATEMENT_ACTION);               if (ex != null)                  map.put(EXCEPTION_ATTR, ex.getMessage());               this.publisher.publish(sqlTopic, response, map);            }            if (ex != null)               throw ex;         }         else if (INITIAL_UPDATE_START_BATCH.equals(msg)) {            source.startInitialUpdateBatch();         }         else if (INITIAL_UPDATE_COLLECT.equals(msg)) {            source.collectInitialUpdate();         }         else {            log.warning("update from '" + topic + "' with request '" + msg + "'");         }      }      catch (Throwable ex) {         log.severe("An exception occured when processing the received update '" + msg + "': " + ex.getMessage());         ex.printStackTrace();      }   }         /**    * Sending this message will reactivate the Dispatcher of the associated slave    * @param topic    * @param filename    * @param replManagerAddress    * @param slaveName    * @param minKey    * @param maxKey    * @throws Exception    */   public final void sendInitialDataResponse(String[] slaveSessionNames, String replManagerAddress, long minKey, long maxKey) throws Exception {      HashMap attrs = new HashMap();      attrs.put("_destination", replManagerAddress);      attrs.put("_command", "INITIAL_DATA_RESPONSE");      attrs.put("_minReplKey", "" + minKey);      attrs.put("_maxReplKey", "" + maxKey);      attrs.put(SLAVE_NAME, SpecificDefault.toString(slaveSessionNames));      if (this.publisher != null)         this.publisher.publish("", "INITIAL_DATA_RESPONSE".getBytes(), attrs);      else         log.warning("request for sending initial response can not be done since no publisher configured");   }   public void sendEndOfTransitionMessage(I_Info info, String initialDataTopic, String[] slaveSessionNames) throws JMSException {      XBSession session = this.publisher.getJmsSession();      XBDestination dest = new XBDestination(initialDataTopic, SpecificDefault.toString(slaveSessionNames));      XBMessageProducer producer = new XBMessageProducer(session, dest);      producer.setPriority(PriorityEnum.HIGH_PRIORITY.getInt());      producer.setDeliveryMode(DeliveryMode.PERSISTENT);      String dumpId = "" + new Timestamp().getTimestamp();      sendEndOfTransitionMessage(info, session, null, null, dumpId, producer);   }      public void sendEndOfTransitionMessage(I_Info info, XBSession session, String initialFilesLocation, String shortFilename, String dumpId, XBMessageProducer producer) throws JMSException {      TextMessage  endMsg = session.createTextMessage();      SqlInfo sqlInfo = new SqlInfo(info);      SqlDescription description = new SqlDescription(info);      description.setAttribute(END_OF_TRANSITION , "" + true);      endMsg.setBooleanProperty(END_OF_TRANSITION , true);      description.setAttribute(FILENAME_ATTR, shortFilename);      endMsg.setStringProperty(FILENAME_ATTR, shortFilename);      if (initialFilesLocation != null) {         description.setAttribute(INITIAL_FILES_LOCATION, initialFilesLocation);         endMsg.setStringProperty(INITIAL_FILES_LOCATION, initialFilesLocation);         description.setAttribute(INITIAL_DATA_ID, dumpId);         endMsg.setStringProperty(INITIAL_DATA_ID, dumpId);      }      sqlInfo.setDescription(description);      endMsg.setText(sqlInfo.toXml(""));      producer.send(endMsg);   }      public MsgQosData preparePubQos(MsgQosData qosData) {      if (qosData == null)         return qosData;      if (qosData.getDestinations() != null && qosData.getDestinations().size() > 0) {         qosData.addClientProperty(NUM_OF_TRANSACTIONS, -1L);         return qosData;      }      prepareQosMap(qosData.getClientProperties());      return qosData;   }      private void prepareQosMap(Map props) {      messageSeq++;      ClientProperty prop = new ClientProperty(MESSAGE_SEQ, Constants.TYPE_LONG, null, "" + messageSeq);      props.put(MESSAGE_SEQ, prop);      prop = new ClientProperty(REPL_KEY_ATTR, Constants.TYPE_LONG, null, "" + messageSeq);      props.put(REPL_KEY_ATTR, prop);      prop = new ClientProperty(TRANSACTION_SEQ, Constants.TYPE_LONG, null, "" + messageSeq);      props.put(TRANSACTION_SEQ, prop);      prop = new ClientProperty(ABSOLUTE_COUNT, Constants.TYPE_BOOLEAN, null, "true");      props.put(ABSOLUTE_COUNT, prop);   }      public byte[] replace(byte[] oldContent, Map clientProperties) {      prepareQosMap(clientProperties);      return oldContent;   }   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -