📄 xmlblasterpublisher.java
字号:
if (this.alertSubscribeKey != null && this.alertSubscribeKey.length() < 1) this.alertSubscribeKey = null; this.alertSubscribeQos = info.get(MOM_ALERT_SUBSCRIBE_QOS, "<qos/>"); String hardConnectQos = info.get(MOM_CONNECT_QOS, (String)null); if (hardConnectQos != null) { this.connectQos = new ConnectQos(this.glob, this.glob.getConnectQosFactory().readObject(hardConnectQos)); } else { this.connectQos = new ConnectQos(this.glob, this.loginName, this.password); int maxSessions = info.getInt(MOM_MAX_SESSIONS, 100); this.connectQos.setMaxSessions(maxSessions); this.connectQos.getAddress().setRetries(-1); this.connectQos.setSessionTimeout(0L); CallbackAddress cb = this.connectQos.getData().getCurrentCallbackAddress(); cb.setRetries(-1); /* if (isRunningNative) { Address address = this.connectQos.getAddress(); address.setType("LOCAL"); address.setPingInterval(0L); address.setCollectTime(0L); this.connectQos.getClientQueueProperty().setType("RAM"); this.connectQos.getClientQueueProperty().setVersion("1.0"); CallbackAddress cb = this.connectQos.getData().getCurrentCallbackAddress(); cb.setPingInterval(0L); cb.setCollectTime(0L); cb.setType("LOCAL"); this.connectQos.getData().getSessionCbQueueProperty().setType("RAM"); this.connectQos.getData().getSessionCbQueueProperty().setVersion("1.0"); this.connectQos.getData().getSubjectQueueProperty().setType("RAM"); this.connectQos.getData().getSubjectQueueProperty().setVersion("1.0"); } */ this.compressSize = info.getInt(MOM_COMPRESS_SIZE, 0); } String propKeysToAdd = info.get(MOM_PROPS_TO_ADD_TO_CONNECT, "").trim(); if (propKeysToAdd.length() > 0) { if ("*".equals(propKeysToAdd)) { // then all properties are added to the connectQos if (hardConnectQos != null) log.warning("The property '" + MOM_PROPS_TO_ADD_TO_CONNECT + "' was is set to '*' and and '" + MOM_CONNECT_QOS + "' was set too (some of the properties could be overwritten"); // fill the client properties of the connectQos with the info object new ClientPropertiesInfo(this.connectQos.getData().getClientProperties(), info); } else { StringTokenizer tokenizer = new StringTokenizer(propKeysToAdd, ","); while (tokenizer.hasMoreTokens()) { String key = tokenizer.nextToken().trim(); String val = info.get(key, null); if (val == null) log.warning("The property '" + key + "' shall be added to the connectQos but was not found among the properties"); else { if (this.connectQos.getClientProperty(key) != null) log.warning("The property '" + key + "' is already set in the client properties of the connect qos. Will be overwritten with the value '" + val + "'"); this.connectQos.addClientProperty(key, val); } } } } this.con = this.glob.getXmlBlasterAccess(); this.con.registerConnectionListener(this); this.con.connect(this.connectQos, this); this.initCount++; // Make myself available info.putObject("org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher", this); info.putObject("org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher", this); // Add JMX Registration String jmxName = I_Info.JMX_PREFIX + "xmlBlasterPublisher"; info.putObject(jmxName, this); log.info("Added object '" + jmxName + "' to I_Info to be added as an MBean"); } /** * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#shutdown */ public synchronized void shutdown() { this.initCount--; if (this.initCount > 0) return; log.fine("Closing xmlBlaster connection"); if (this.con != null) { this.con.disconnect(null); this.con = null; this.glob = null; } } private void addStringPropToQos(Map attrMap, MsgQosData qos) { synchronized (attrMap) { String[] keys = (String[])attrMap.keySet().toArray(new String[attrMap.size()]); for (int i=0; i < keys.length; i++) { Object val = attrMap.get(keys[i]); if (val != null && val instanceof String) qos.addClientProperty(keys[i], val); } } } /** * The send message is configured with <tt>mom.publishKey</tt> and <tt>mom.publishQos</tt>. * A DROP command erases the topic. * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#publish(String, String, Map) */ public String publish(String changeKey, byte[] out, Map attrMap) throws Exception { // this is only for testing purposes if (this.throwAwayMessages) { log.fine("The message '" + changeKey + "' has been thrown away (not published)"); return (new Timestamp()).toString() + "thrownAway"; } long t0 = System.currentTimeMillis(); if (out == null) out = "".getBytes(); out = MomEventEngine.compress(out, attrMap, this.compressSize, null); String pk = (changeKey.indexOf("${") == -1) ? DbWatcher.replaceVariable(this.publishKey, changeKey) : this.publishKey; String command = null; if (attrMap != null) command = (String)attrMap.get("_command"); else command = ""; String destLiteral = null; Destination destination = null; List additionalDestinations = null; if (attrMap != null) destLiteral = (String)attrMap.get("_destination"); if (destLiteral != null) { if (destLiteral.indexOf(',') < 0) { destination = new Destination(new SessionName(this.glob, destLiteral)); destination.forceQueuing(true); // to ensure it works even if this comes before manager } else { StringTokenizer tokenizer = new StringTokenizer(destLiteral, ","); // comma separated list destination = new Destination(new SessionName(this.glob, tokenizer.nextToken().trim())); destination.forceQueuing(true); additionalDestinations = new ArrayList(); while (tokenizer.hasMoreTokens()) { Destination tmp = new Destination(new SessionName(this.glob, tokenizer.nextToken().trim())); tmp.forceQueuing(true); } } } // this is used to register the owner of this object (typically the DbWatcher) if ("INITIAL_DATA_RESPONSE".equals(command) || "STATEMENT".equals(command)) { PublishQos qos = null; if (destination != null) { qos = new PublishQos(this.glob, destination); if (additionalDestinations != null) { for (int i=0; i < additionalDestinations.size(); i++) qos.addDestination((Destination)additionalDestinations.get(i)); } } else qos = new PublishQos(this.glob); qos.setSubscribable(true); // to force to fill the client properties map !! ClientPropertiesInfo tmpInfo = new ClientPropertiesInfo(attrMap); new ClientPropertiesInfo(qos.getData().getClientProperties(), tmpInfo); addStringPropToQos(attrMap, qos.getData()); PublishKey key = null; if (changeKey != null && changeKey.length() > 0) key = new PublishKey(this.glob, changeKey); else key = new PublishKey(this.glob, "dbWatcherUnspecified"); key.setContentMime("text/xml"); MsgUnit msg = new MsgUnit(key, out, qos); PublishReturnQos prq = this.con.publish(msg); String id = (prq.getRcvTimestamp()!=null)?prq.getRcvTimestamp().toString():"queued"; if (log.isLoggable(Level.FINE)) log.fine("Published '" + prq.getKeyOid() + "' '" + id + "'"); return id; } if (this.eraseOnDrop && "DROP".equals(command)) { String oid = this.glob.getMsgKeyFactory().readObject(pk).getOid(); EraseKey ek = new EraseKey(glob, oid); EraseQos eq = new EraseQos(glob); con.erase(ek, eq); log.info("Topic '" + pk + "' is erased:" + out); return "0"; } if (this.eraseOnDelete && "DELETE".equals(command)) { String oid = this.glob.getMsgKeyFactory().readObject(pk).getOid(); EraseKey ek = new EraseKey(glob, oid); EraseQos eq = new EraseQos(glob); con.erase(ek, eq); log.info("Topic '" + pk + "' is erased:" + out); return "0"; } if (log.isLoggable(Level.FINER)) log.finer("Topic '" + pk + "' is published: " + out); try { String oid = (String)attrMap.remove(ContribConstants.TOPIC_NAME); // consume it since only used to inform this method if (destination != null) { pk = this.adminKey; } if (oid != null) pk = "<key oid='" + oid + "'/>"; MsgUnit msgUnit = new MsgUnit(pk, out, this.publishQos); String tmp = msgUnit.getKeyData().getContentMime(); // FIXME pass this in the map and set only if explicitly set in the map if (tmp == null || tmp.equals("text/plain")) { msgUnit.getKeyData().setContentMime("text/xml"); } if (destination != null) ((MsgQosData)msgUnit.getQosData()).addDestination(destination); // to force to fill the client properties map !! ClientPropertiesInfo tmpInfo = new ClientPropertiesInfo(attrMap); new ClientPropertiesInfo(msgUnit.getQosData().getClientProperties(), tmpInfo); addStringPropToQos(attrMap, (MsgQosData)msgUnit.getQosData()); PublishReturnQos prq = this.con.publish(msgUnit); String id = (prq.getRcvTimestamp()!=null)?prq.getRcvTimestamp().toString():"queued"; if (log.isLoggable(Level.FINE)) log.fine("Published '" + prq.getKeyOid() + "' '" + id + "'"); this.lastPublishTime = System.currentTimeMillis() - t0; return id; } catch (XmlBlasterException e) { log.severe("Can't publish to xmlBlaster: " + e.getMessage()); throw e; } } /** * Subscribes on the alert topic as configured with <tt>mom.alertSubscribeKey</tt>. * @see org.xmlBlaster.contrib.dbwatcher.mom.I_ChangePublisher#registerAlertListener(I_Update) * @param attrs it currently accepts a null (old behaviour) or if it is not null, then * the attribute ptp must be set (does not matter to what). * * @throws Exception Typically a XmlBlasterException */ public boolean registerAlertListener(final I_Update momCb, Map attrs) throws Exception { if (momCb == null) throw new IllegalArgumentException("I_Update==null"); try { if (attrs == null) { // 'old' behaviour if (this.alertSubscribeKey == null) return false; log.info("Registering on '" + this.alertSubscribeKey + "' for alerts"); SubscribeReturnQos subRet = this.con.subscribe(this.alertSubscribeKey, this.alertSubscribeQos, new I_Callback() { public String update(String s, UpdateKey k, byte[] c, UpdateQos q) throws XmlBlasterException { log.fine("Receiving alert message '" + k.getOid() + "'"); Map attrMap = clientPropertiesToMap(q.getClientProperties()); try { momCb.update(k.getOid(), new ByteArrayInputStream(c), attrMap); } catch (Exception e) { log.severe("Can't subscribe from xmlBlaster: " + e.getMessage()); throw new XmlBlasterException(glob, ErrorCode.USER_UPDATE_ERROR, "alertListener", "", e); } return ""; } }); this.alertSubscriptionId = subRet.getSubscriptionId();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -