📄 xmlblasterpublisher.java
字号:
return true; } else { // could be for ptp or in future for additional subscriptions Object ptp = attrs.get("ptp"); if (ptp != null) { synchronized (this) { this.defaultUpdate = momCb; return true; } } else { Thread.dumpStack(); throw new XmlBlasterException(this.glob, ErrorCode.USER_CONFIGURATION, "XmlBlasterPublisher.registerAlertListener", "non-ptp are not implemented. Please assign to the attrs a 'ptp' attribute (no matter which value)"); } } } catch (XmlBlasterException e) { log.severe("Can't subscribe from xmlBlaster: " + e.getMessage()); throw e; } } /** * Not available via interface, used by test suite only. * @param topic If the topic starts with "XPATH:" the prefix will be stripped * and an Xpath subscription is done. * @param momCb Incoming messages are forwarded to this interface * @return A unique identifier of the subscription * @throws Exception Typically a XmlBlasterException */ public String subscribe(String topic, final I_Update momCb) throws Exception { if (momCb == null) throw new IllegalArgumentException("I_Update==null"); if (topic == null) throw new IllegalArgumentException("topic==null"); try { SubscribeKey sk = topic.startsWith("XPATH:") ? new SubscribeKey(glob, topic.substring(6), "XPATH") : new SubscribeKey(glob, topic); SubscribeQos sq = new SubscribeQos(glob); SubscribeReturnQos subRet = this.con.subscribe(sk, sq, new I_Callback() { public String update(String s, UpdateKey k, byte[] c, UpdateQos q) throws XmlBlasterException { if (log.isLoggable(Level.FINE)) log.fine("Receiving xmlBlaster message " + k.getOid()); Map attrMap = clientPropertiesToMap(q.getClientProperties()); try { momCb.update(k.getOid(), new ByteArrayInputStream(c), attrMap); } catch (Exception e) { log.severe("Can't subscribe from xmlBlaster: " + e.getMessage()); throw new XmlBlasterException(glob, ErrorCode.USER_UPDATE_ERROR, "alertListener", "", e); } return ""; } }); log.info("Subscribed on " + topic); return subRet.getSubscriptionId(); } catch (XmlBlasterException e) { log.severe("Can't subscribe from xmlBlaster: " + e.getMessage()); throw e; } } /** * Dummy implementation, PtP messages could arrive here which are ignored. * @param s The sessionId * @param k The xml key * @param c The message content * @param q The message QoS * @return The UpdateReturnQos * @see I_Callback#update */ public String update(String s, UpdateKey k, byte[] content, UpdateQos q) throws XmlBlasterException { InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), q.getClientProperties()); if (this.defaultUpdate == null) { log.warning("No update message expected, ignoring received " + k.toXml()); return Constants.RET_OK; } synchronized(this) { try { // TODO Add here the specific qos attributes to the map. q.getData().addClientProperty("_sender", q.getSender().getRelativeName()); this.defaultUpdate.update(s, is, q.getClientProperties()); return Constants.RET_OK; } catch (Exception ex) { ex.printStackTrace(); log.severe("Exception occured in the update method for key='" + s + "'"); throw new XmlBlasterException(this.glob, ErrorCode.USER_UPDATE_HOLDBACK, "XmlBlasterPublisher.update", "user exception", ex); } catch (Throwable ex) { ex.printStackTrace(); log.severe("Throwable occured in the update method for key='" + s + "'"); throw new XmlBlasterException(this.glob, ErrorCode.USER_UPDATE_HOLDBACK, "XmlBlasterPublisher.update", "user exception", ex); } } } /** * Copy a map<String, ClientProperty> to a map<String, String>. * @param clp The xmlBlaster ClientProperties * @return The simple string map */ protected Map clientPropertiesToMap(Map clp) { Map attrMap = new TreeMap(); Iterator it = clp.keySet().iterator(); while (it.hasNext()) { String key = (String)it.next(); ClientProperty prop = (ClientProperty)clp.get(key); attrMap.put(key, prop.getStringValue()); } return attrMap; } /** * @see org.xmlBlaster.contrib.I_ChangePublisher#getJmsSession() */ public XBSession getJmsSession() { return new XBSession(this.glob, XBSession.AUTO_ACKNOWLEDGE, false); } /** * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess) */ public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { if (this.connectionStateListener != null) { log.info("reached alive for user '" + this.con.getId() + "'"); this.connectionStateListener.reachedAlive(oldState, connection); } } /** * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess) */ public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { if (this.connectionStateListener != null) { log.info("reached dead for user '" + this.con.getId() + "'"); this.connectionStateListener.reachedDead(oldState, connection); } } /** * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess) */ public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { if (this.connectionStateListener != null) { log.info("reached polling for user '" + this.con.getId() + "'"); this.connectionStateListener.reachedPolling(oldState, connection); } } // for jmx public String getAdminKey() { return adminKey; } public void setAdminKey(String adminKey) { this.adminKey = adminKey; } public String getAlertSubscribeKey() { return alertSubscribeKey; } public void setAlertSubscribeKey(String alertSubscribeKey) { this.alertSubscribeKey = alertSubscribeKey; } public String getAlertSubscribeQos() { return alertSubscribeQos; } public void setAlertSubscribeQos(String alertSubscribeQos) { this.alertSubscribeQos = alertSubscribeQos; } public String getAlertSubscriptionId() { return alertSubscriptionId; } public void setAlertSubscriptionId(String alertSubscriptionId) { this.alertSubscriptionId = alertSubscriptionId; } public int getCompressSize() { return compressSize; } public void setCompressSize(int compressSize) { this.compressSize = compressSize; } public String getConnectQos() { return connectQos.toXml(); } public boolean isEraseOnDelete() { return eraseOnDelete; } public void setEraseOnDelete(boolean eraseOnDelete) { this.eraseOnDelete = eraseOnDelete; } public boolean isEraseOnDrop() { return eraseOnDrop; } public void setEraseOnDrop(boolean eraseOnDrop) { this.eraseOnDrop = eraseOnDrop; } public String getPublishKey() { return publishKey; } public void setPublishKey(String publishKey) { this.publishKey = publishKey; } public String getPublishQos() { return publishQos; } public void setPublishQos(String publishQos) { this.publishQos = publishQos; } public boolean isThrowAwayMessages() { return throwAwayMessages; } public void setThrowAwayMessages(boolean throwAwayMessages) { this.throwAwayMessages = throwAwayMessages; } public String getTopicNameTemplate() { return topicNameTemplate; } public void setTopicNameTemplate(String topicNameTemplate) { this.topicNameTemplate = topicNameTemplate; } public String getLoginName() { return loginName; } /** * Returns the time in ms it took for the last real publish. Real publish is meant * the last publish of messages which are not drop or delete */ public long getLastPublishTime() { return this.lastPublishTime; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -