hempbroker.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 776 行 · 第 1/2 页

JAVA
776
字号
    BamStream stream = findAgent(to);    if (stream != null)      stream.presenceUnsubscribe(to, from, data);    else {      if (log.isLoggable(Level.FINER)) {	log.finer(this + " sendPresenceUnsubscribe (no resource) to=" + to		  + " from=" + from + " value=" + data);      }    }  }  /**   * Presence unsubscribed   */  public void presenceUnsubscribed(String to,				       String from,				       Serializable data)  {    BamStream stream = findAgent(to);    if (stream != null)      stream.presenceUnsubscribed(to, from, data);    else {      if (log.isLoggable(Level.FINER)) {	log.finer(this + " sendPresenceUnsubscribed (no resource) to=" + to		  + " from=" + from + " value=" + data);      }    }  }  /**   * Presence error   */  public void presenceError(String to,			        String from,			        Serializable data,			        BamError error)  {    BamStream stream = findAgent(to);    if (stream != null)      stream.presenceError(to, from, data, error);    else {      if (log.isLoggable(Level.FINER)) {	log.finer(this + " sendPresenceError (no resource) to=" + to		  + " from=" + from + " value=" + data);      }    }  }  /**   * Sends a message   */  public void message(String to, String from, Serializable value)  {    BamStream stream = findAgent(to);    if (stream != null)      stream.message(to, from, value);    else {      log.fine(this + " sendMessage to=" + to + " from=" + from	       + " is an unknown stream");    }  }  /**   * Sends a message   */  public void messageError(String to,			       String from,			       Serializable value,			       BamError error)  {    BamStream stream = findAgent(to);    if (stream != null)      stream.messageError(to, from, value, error);    else {      log.fine(this + " sendMessageError to=" + to + " from=" + from	       + " error=" + error + " is an unknown stream");    }  }  /**   * Query an entity   */  public boolean queryGet(long id, String to, String from,			      Serializable query)  {    BamStream stream = findAgent(to);    if (stream != null) {      if (! stream.queryGet(id, to, from, query)) {	if (log.isLoggable(Level.FINE)) {	  log.fine(this + " queryGet to unknown feature to='" + to		   + "' from=" + from + " query='" + query + "'"		   + " stream=" + stream);	}		String msg = L.l("'{0}' is an unknown feature for to='{1}'",			 query, to);    	BamError error = new BamError(BamError.TYPE_CANCEL,					BamError.FEATURE_NOT_IMPLEMENTED,					msg);		queryError(id, from, to, query, error);      }      return true;    }    if (log.isLoggable(Level.FINE)) {      log.fine(this + " queryGet to unknown stream to='" + to	       + "' from=" + from);    }    String msg = L.l("'{0}' is an unknown service for queryGet", to);        BamError error = new BamError(BamError.TYPE_CANCEL,				    BamError.SERVICE_UNAVAILABLE,				    msg);				        queryError(id, from, to, query, error);        return true;  }  /**   * Query an entity   */  public boolean querySet(long id, String to, String from, Serializable query)  {    BamStream stream = findAgent(to);    if (stream == null) {      if (log.isLoggable(Level.FINE)) {	log.fine(this + " querySet to unknown stream '" + to		 + "' from=" + from);      }      String msg = L.l("'{0}' is an unknown service for querySet", to);          BamError error = new BamError(BamError.TYPE_CANCEL,				      BamError.SERVICE_UNAVAILABLE,				      msg);				          queryError(id, from, to, query, error);      return true;    }    if (stream.querySet(id, to, from, query))      return true;    if (log.isLoggable(Level.FINE)) {      log.fine(this + " querySet with unknown feature to=" + to	       + " from=" + from + " resource=" + stream	       + " query=" + query);    }    String msg = L.l("'{0}' is an unknown feature for querySet",		     query);        BamError error = new BamError(BamError.TYPE_CANCEL,				    BamError.FEATURE_NOT_IMPLEMENTED,				    msg);				        queryError(id, from, to, query, error);    return true;  }  /**   * Query an entity   */  public void queryResult(long id, String to, String from, Serializable value)  {    BamStream stream = findAgent(to);    if (stream != null)      stream.queryResult(id, to, from, value);    else      throw new RuntimeException(L.l("{0} is an unknown entity", to));  }  /**   * Query an entity   */  public void queryError(long id,			 String to,			 String from,			 Serializable query,			 BamError error)  {    BamStream stream = findAgent(to);    if (stream != null)      stream.queryError(id, to, from, query, error);    else      throw new RuntimeException(L.l("{0} is an unknown entity", to));  }  protected BamStream findAgent(String jid)  {    synchronized (_agentMap) {      WeakReference<BamStream> ref = _agentMap.get(jid);      if (ref != null)        return ref.get();    }    if (jid.endsWith("@")) {      // jms/3d00      jid = jid + getDomain();    }    BamStream agentStream;    BamService service = findService(jid);    if (service == null) {      return null;    }    else if (jid.equals(service.getJid())) {      agentStream = service.getAgentStream();       if (agentStream != null) {        synchronized (_agentMap) {          WeakReference<BamStream> ref = _agentMap.get(jid);          if (ref != null)            return ref.get();          _agentMap.put(jid, new WeakReference<BamStream>(agentStream));          return agentStream;        }      }    }    else {      if (! service.startAgent(jid))        return null;      synchronized (_agentMap) {        WeakReference<BamStream> ref = _agentMap.get(jid);        if (ref != null)          return ref.get();      }    }    return null;  }  protected BamService findService(String jid)  {    if (jid == null)      return null;    synchronized (_serviceCache) {      WeakReference<BamService> ref = _serviceCache.get(jid);      if (ref != null)	return ref.get();    }    if (startServiceFromManager(jid)) {      synchronized (_serviceCache) {        WeakReference<BamService> ref = _serviceCache.get(jid);        if (ref != null)          return ref.get();      }    }    if (jid.indexOf('/') < 0 && jid.indexOf('@') < 0) {      BamService service = findDomain(jid);      if (service != null) {        synchronized (_serviceCache) {          WeakReference<BamService> ref = _serviceCache.get(jid);          if (ref != null)            return ref.get();          _serviceCache.put(jid, new WeakReference<BamService>(service));          return service;        }      }    }        int p;    if ((p = jid.indexOf('/')) > 0) {      String uid = jid.substring(0, p);      return findService(uid);    }    else if ((p = jid.indexOf('@')) > 0) {      String domainName = jid.substring(p + 1);            return findService(domainName);    }    else      return null;  }  protected BamService findDomain(String domain)  {    if (domain == null)      return null;    BamBroker broker = _manager.findBroker(domain);    if (broker instanceof HempBroker) {      HempBroker hempBroker = (HempBroker) broker;      return hempBroker.getDomainService();    }        if (broker == null || broker == this)      return null;    // construct foreign    return null;  }  protected boolean startServiceFromManager(String jid)  {    for (BamServiceManager manager : _serviceManagerList) {      if (manager.startService(jid))        return true;    }    return false;  }    /**   * Closes a connection   */  void closeAgent(String jid)  {    int p = jid.indexOf('/');    if (p > 0) {      String owner = jid.substring(0, p);            BamService service = findService(owner);      if (service != null) {	try {	  service.onAgentStop(jid);	} catch (Exception e) {	  log.log(Level.FINE, e.toString(), e);	}      }    }        synchronized (_serviceCache) {      _serviceCache.remove(jid);    }        synchronized (_agentMap) {      _agentMap.remove(jid);    }  }  public void close()  {    _manager.removeBroker(_domain);    for (String alias : _aliasList)      _manager.removeBroker(alias);        _serviceMap.clear();    _serviceCache.clear();    _agentMap.clear();  }    @Override  public String toString()  {    return getClass().getSimpleName() + "[" + _domain + "]";  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?