hempbroker.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 776 行 · 第 1/2 页
JAVA
776 行
BamStream stream = findAgent(to); if (stream != null) stream.presenceUnsubscribe(to, from, data); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceUnsubscribe (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Presence unsubscribed */ public void presenceUnsubscribed(String to, String from, Serializable data) { BamStream stream = findAgent(to); if (stream != null) stream.presenceUnsubscribed(to, from, data); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceUnsubscribed (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Presence error */ public void presenceError(String to, String from, Serializable data, BamError error) { BamStream stream = findAgent(to); if (stream != null) stream.presenceError(to, from, data, error); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceError (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Sends a message */ public void message(String to, String from, Serializable value) { BamStream stream = findAgent(to); if (stream != null) stream.message(to, from, value); else { log.fine(this + " sendMessage to=" + to + " from=" + from + " is an unknown stream"); } } /** * Sends a message */ public void messageError(String to, String from, Serializable value, BamError error) { BamStream stream = findAgent(to); if (stream != null) stream.messageError(to, from, value, error); else { log.fine(this + " sendMessageError to=" + to + " from=" + from + " error=" + error + " is an unknown stream"); } } /** * Query an entity */ public boolean queryGet(long id, String to, String from, Serializable query) { BamStream stream = findAgent(to); if (stream != null) { if (! stream.queryGet(id, to, from, query)) { if (log.isLoggable(Level.FINE)) { log.fine(this + " queryGet to unknown feature to='" + to + "' from=" + from + " query='" + query + "'" + " stream=" + stream); } String msg = L.l("'{0}' is an unknown feature for to='{1}'", query, to); BamError error = new BamError(BamError.TYPE_CANCEL, BamError.FEATURE_NOT_IMPLEMENTED, msg); queryError(id, from, to, query, error); } return true; } if (log.isLoggable(Level.FINE)) { log.fine(this + " queryGet to unknown stream to='" + to + "' from=" + from); } String msg = L.l("'{0}' is an unknown service for queryGet", to); BamError error = new BamError(BamError.TYPE_CANCEL, BamError.SERVICE_UNAVAILABLE, msg); queryError(id, from, to, query, error); return true; } /** * Query an entity */ public boolean querySet(long id, String to, String from, Serializable query) { BamStream stream = findAgent(to); if (stream == null) { if (log.isLoggable(Level.FINE)) { log.fine(this + " querySet to unknown stream '" + to + "' from=" + from); } String msg = L.l("'{0}' is an unknown service for querySet", to); BamError error = new BamError(BamError.TYPE_CANCEL, BamError.SERVICE_UNAVAILABLE, msg); queryError(id, from, to, query, error); return true; } if (stream.querySet(id, to, from, query)) return true; if (log.isLoggable(Level.FINE)) { log.fine(this + " querySet with unknown feature to=" + to + " from=" + from + " resource=" + stream + " query=" + query); } String msg = L.l("'{0}' is an unknown feature for querySet", query); BamError error = new BamError(BamError.TYPE_CANCEL, BamError.FEATURE_NOT_IMPLEMENTED, msg); queryError(id, from, to, query, error); return true; } /** * Query an entity */ public void queryResult(long id, String to, String from, Serializable value) { BamStream stream = findAgent(to); if (stream != null) stream.queryResult(id, to, from, value); else throw new RuntimeException(L.l("{0} is an unknown entity", to)); } /** * Query an entity */ public void queryError(long id, String to, String from, Serializable query, BamError error) { BamStream stream = findAgent(to); if (stream != null) stream.queryError(id, to, from, query, error); else throw new RuntimeException(L.l("{0} is an unknown entity", to)); } protected BamStream findAgent(String jid) { synchronized (_agentMap) { WeakReference<BamStream> ref = _agentMap.get(jid); if (ref != null) return ref.get(); } if (jid.endsWith("@")) { // jms/3d00 jid = jid + getDomain(); } BamStream agentStream; BamService service = findService(jid); if (service == null) { return null; } else if (jid.equals(service.getJid())) { agentStream = service.getAgentStream(); if (agentStream != null) { synchronized (_agentMap) { WeakReference<BamStream> ref = _agentMap.get(jid); if (ref != null) return ref.get(); _agentMap.put(jid, new WeakReference<BamStream>(agentStream)); return agentStream; } } } else { if (! service.startAgent(jid)) return null; synchronized (_agentMap) { WeakReference<BamStream> ref = _agentMap.get(jid); if (ref != null) return ref.get(); } } return null; } protected BamService findService(String jid) { if (jid == null) return null; synchronized (_serviceCache) { WeakReference<BamService> ref = _serviceCache.get(jid); if (ref != null) return ref.get(); } if (startServiceFromManager(jid)) { synchronized (_serviceCache) { WeakReference<BamService> ref = _serviceCache.get(jid); if (ref != null) return ref.get(); } } if (jid.indexOf('/') < 0 && jid.indexOf('@') < 0) { BamService service = findDomain(jid); if (service != null) { synchronized (_serviceCache) { WeakReference<BamService> ref = _serviceCache.get(jid); if (ref != null) return ref.get(); _serviceCache.put(jid, new WeakReference<BamService>(service)); return service; } } } int p; if ((p = jid.indexOf('/')) > 0) { String uid = jid.substring(0, p); return findService(uid); } else if ((p = jid.indexOf('@')) > 0) { String domainName = jid.substring(p + 1); return findService(domainName); } else return null; } protected BamService findDomain(String domain) { if (domain == null) return null; BamBroker broker = _manager.findBroker(domain); if (broker instanceof HempBroker) { HempBroker hempBroker = (HempBroker) broker; return hempBroker.getDomainService(); } if (broker == null || broker == this) return null; // construct foreign return null; } protected boolean startServiceFromManager(String jid) { for (BamServiceManager manager : _serviceManagerList) { if (manager.startService(jid)) return true; } return false; } /** * Closes a connection */ void closeAgent(String jid) { int p = jid.indexOf('/'); if (p > 0) { String owner = jid.substring(0, p); BamService service = findService(owner); if (service != null) { try { service.onAgentStop(jid); } catch (Exception e) { log.log(Level.FINE, e.toString(), e); } } } synchronized (_serviceCache) { _serviceCache.remove(jid); } synchronized (_agentMap) { _agentMap.remove(jid); } } public void close() { _manager.removeBroker(_domain); for (String alias : _aliasList) _manager.removeBroker(alias); _serviceMap.clear(); _serviceCache.clear(); _agentMap.clear(); } @Override public String toString() { return getClass().getSimpleName() + "[" + _domain + "]"; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?