📄 xmlblasterpublisher.java
字号:
/*------------------------------------------------------------------------------Name: XmlBlasterPublisher.javaProject: org.xmlBlasterProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.dbwatcher.mom;import java.io.ByteArrayInputStream;import java.io.InputStream;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.StringTokenizer;import java.util.TreeMap;import java.util.Iterator;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.I_ConnectionStateListener;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.I_Callback;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.contrib.ClientPropertiesInfo;import org.xmlBlaster.contrib.ContribConstants;import org.xmlBlaster.contrib.I_ChangePublisher;import org.xmlBlaster.contrib.I_Info;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.MomEventEngine;import org.xmlBlaster.contrib.dbwatcher.DbWatcher;import org.xmlBlaster.contrib.dbwatcher.DbWatcherConstants;import org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer;import org.xmlBlaster.contrib.dbwatcher.detector.I_ChangeDetector;import org.xmlBlaster.jms.XBSession;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.MsgUnit;/** * Implementation to send change events to xmlBlaster. * <p> * This plugin plays two roles, first it is the gateway to xmlBlaster and second * if can be configured to listen on a alert topic and use incoming messages as * alerts to check the database again. * </p> * <p> * Supported configuration: * </p> * <ul> * <li><tt>mom.topicName</tt> for example <i>db.change.event.${colGroupValue}</i> * where ${} variables will be replaced by the current * <tt>colGroupValue</tt>. * </li> * <li><tt>mom.loginName</tt> the login name * </li> * <li><tt>mom.password</tt> the password * </li> * <li><tt>mom.publishKey</tt> for example <tt><key oid='db.change.event.${colGroupValue}'/></tt></li> * <li><tt>mom.publishQos</tt> for example <tt><qos/></tt></li> * <li><tt>mom.alertSubscribeKey</tt> for example <tt><key oid='db.change.alert'/></tt><br /> * To use XmlBlasterPublisher as an alert notifier register with <tt>alertProducer.class=org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher</tt></li> * <li><tt>mom.alertSubscribeQos</tt> for example <tt><qos/></tt></li> * <li><tt>mom.connectQos</tt> if given it is stronger than the <tt>mom.loginName</tt> * and <tt>mom.password</tt> settings</li> * </ul> * * @author Marcel Ruff */public class XmlBlasterPublisher implements I_ChangePublisher, I_AlertProducer, I_Callback, I_ConnectionStateListener, DbWatcherConstants, XmlBlasterPublisherMBean { private static Logger log = Logger.getLogger(XmlBlasterPublisher.class.getName()); protected I_ChangeDetector changeDetector; protected Global glob; protected I_XmlBlasterAccess con; protected String topicNameTemplate; protected String loginName; protected String password; protected String publishKey; protected String publishQos; protected String alertSubscribeKey; protected String alertSubscribeQos; protected String alertSubscriptionId; protected ConnectQos connectQos; protected boolean eraseOnDrop; protected boolean eraseOnDelete; private int initCount = 0; private I_Update defaultUpdate; private String adminKey = "<key oid='mom.publisher.adminMsg'/>"; private int compressSize; private boolean throwAwayMessages; private long lastPublishTime; /** * Can be null, taken out of the info object if the owner of this object has set the * parameter _connectionStateListener. */ private I_ConnectionStateListener connectionStateListener; /** * Default constructor. * You need to call {@link #init(I_Info)} thereafter. */ public XmlBlasterPublisher() { // void } /** * If called we shall subcribe to xmlBlaster for alert messages * which notifies us that there may be new changes available, we call * {@link I_ChangeDetector#checkAgain} in such a case. * @see org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer#init(I_Info,I_ChangeDetector) */ public void init(I_Info info, I_ChangeDetector changeDetector) throws Exception { this.changeDetector = changeDetector; } /** * Subscribes on the alert topic as configured with <tt>mom.alertSubscribeKey</tt>. * @see org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer#startProducing */ public void startProducing() throws Exception { registerAlertListener(new I_Update() { public void update(String topic, java.io.InputStream is, Map attrMap) { try { if (log.isLoggable(Level.FINE)) log.fine("Alert notification arrived '" + topic + "' with " + ((attrMap==null)?0:attrMap.size()) + " attributes"); changeDetector.checkAgain(attrMap); } catch (Exception e) { log.warning("Ignoring alert notification message '" + topic + "': " + e.toString()); } } }, null); } /** * Unsubscribes from the alert topic. * @see org.xmlBlaster.contrib.dbwatcher.detector.I_AlertProducer#stopProducing */ public void stopProducing() throws Exception { if (this.alertSubscriptionId != null) { UnSubscribeKey sk = new UnSubscribeKey(glob, this.alertSubscriptionId); UnSubscribeQos sq = new UnSubscribeQos(glob); this.con.unSubscribe(sk, sq); this.alertSubscriptionId = null; } } /** * @see org.xmlBlaster.contrib.I_ContribPlugin#getUsedPropertyKeys() */ public Set getUsedPropertyKeys() { Set set = new HashSet(); set.add(MOM_TOPIC_NAME); set.add(MOM_LOGIN_NAME); set.add(MOM_PASSWORD); set.add(MOM_ERASE_ON_DROP); set.add(MOM_ERASE_ON_DELETE); set.add(MOM_PUBLISH_KEY); set.add(MOM_PUBLISH_QOS); set.add(MOM_ALERT_SUBSCRIBE_KEY); set.add(MOM_ALERT_SUBSCRIBE_QOS); set.add(MOM_CONNECT_QOS); set.add(MOM_PROPS_TO_ADD_TO_CONNECT); set.add(MOM_MAX_SESSIONS); return set; } public synchronized void initWithExistingGlob(Global global, String pubKey, String pubQos, int compressionSize) { this.glob = global; this.con = this.glob.getXmlBlasterAccess(); this.publishQos = pubQos; this.publishKey = pubKey; this.compressSize = compressionSize; } /** * If a global is passed with <tt>info.getObject("org.xmlBlaster.engine.Global")</tt> * we take a clone and reuse it. * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#init(I_Info) */ public synchronized void init(I_Info info) throws Exception { // here because if somebody makes it as a second object it still works if (this.connectionStateListener == null) { log.info("The connection status listener will be added"); this.connectionStateListener = (I_ConnectionStateListener)info.getObject("_connectionStateListener"); } else log.warning("The connection status listener for this info has already been defined, ignoring this new request"); if (this.initCount > 0) { this.initCount++; return; } Global globOrig = (Global)info.getObject("org.xmlBlaster.engine.Global"); if (globOrig == null) { this.glob = new Global(); } else { if (globOrig instanceof org.xmlBlaster.engine.ServerScope) { this.glob = globOrig.getClone(globOrig.getNativeConnectArgs()); this.glob.addObjectEntry(Constants.OBJECT_ENTRY_ServerScope, globOrig.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope)); //"ServerNodeScope" } else { this.glob = globOrig; } } this.topicNameTemplate = info.get(MOM_TOPIC_NAME, "db.change.event.${groupColValue}"); this.loginName = info.get(MOM_LOGIN_NAME, "dbWatcher/1"); this.password = info.get(MOM_PASSWORD, "secret"); this.eraseOnDrop = info.getBoolean(MOM_ERASE_ON_DROP, false); this.eraseOnDelete = info.getBoolean(MOM_ERASE_ON_DELETE, false); this.publishKey = info.get(MOM_PUBLISH_KEY, (String)null); if (this.publishKey != null && this.topicNameTemplate != null) { log.warning("constructor: since 'mom.publishKey' is defined, 'mom.topicName' will be ignored"); } if (this.publishKey == null && this.topicNameTemplate == null) { //throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "at least one of the properties 'mom.topicName' or 'mom.publishKey' must be defined"); throw new IllegalArgumentException("At least one of the properties 'mom.topicName' or 'mom.publishKey' must be defined"); } if (this.publishKey == null) { this.publishKey = (new PublishKey(this.glob, this.topicNameTemplate)).toXml(); } this.publishQos = info.get(MOM_PUBLISH_QOS, "<qos/>"); this.alertSubscribeKey = info.get(MOM_ALERT_SUBSCRIBE_KEY, (String)null);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -