hempbroker.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 776 行 · 第 1/2 页
JAVA
776 行
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * * @author Scott Ferguson */package com.caucho.hemp.broker;import com.caucho.bam.BamServiceManager;import com.caucho.bam.BamBroker;import com.caucho.bam.BamConnection;import com.caucho.bam.BamError;import com.caucho.hemp.*;import com.caucho.bam.BamService;import com.caucho.bam.BamStream;import com.caucho.server.resin.*;import com.caucho.util.*;import java.util.*;import java.util.logging.*;import java.lang.ref.*;import java.io.Serializable;/** * Broker */public class HempBroker implements BamBroker, BamStream{ private static final Logger log = Logger.getLogger(HempBroker.class.getName()); private static final L10N L = new L10N(HempBroker.class); private HempBrokerManager _manager; // agents private final HashMap<String,WeakReference<BamStream>> _agentMap = new HashMap<String,WeakReference<BamStream>>(); private final HashMap<String,BamService> _serviceMap = new HashMap<String,BamService>(); private final HashMap<String,WeakReference<BamService>> _serviceCache = new HashMap<String,WeakReference<BamService>>(); private String _serverId = Resin.getCurrent().getServerId(); private String _domain = "localhost"; private String _managerJid = "localhost"; private HempDomainService _domainService; private ArrayList<String> _aliasList = new ArrayList<String>(); private BamServiceManager []_serviceManagerList = new BamServiceManager[0]; public HempBroker() { _manager = HempBrokerManager.getCurrent(); _domainService = new HempDomainService(this, ""); } public HempBroker(String domain) { _manager = HempBrokerManager.getCurrent(); _domain = domain; _managerJid = domain; _domainService = new HempDomainService(this, domain); } /** * Adds a domain alias */ public void addAlias(String domain) { _aliasList.add(domain); } /** * Returns the stream to the broker */ public BamStream getBrokerStream() { return this; } /** * Returns the domain service */ public BamService getDomainService() { return _domainService; } // // configuration // /** * Adds a broker implementation, e.g. the IM broker. */ public void addServiceManager(BamServiceManager serviceManager) { BamServiceManager []serviceManagerList = new BamServiceManager[_serviceManagerList.length + 1]; System.arraycopy(_serviceManagerList, 0, serviceManagerList, 0, _serviceManagerList.length); serviceManagerList[serviceManagerList.length - 1] = serviceManager; _serviceManagerList = serviceManagerList; } // // API // /** * Creates a session */ public BamConnection getConnection(String uid, String password) { return getConnection(uid, password, null); } /** * Creates a session */ public BamConnection getConnection(String uid, String password, String resourceId) { String jid = generateJid(uid, resourceId); HempConnectionImpl conn = new HempConnectionImpl(this, jid); BamStream agentStream = conn.getAgentStreamHandler(); synchronized (_agentMap) { _agentMap.put(jid, new WeakReference<BamStream>(agentStream)); } if (log.isLoggable(Level.FINE)) log.fine(conn + " created"); int p = jid.indexOf('/'); if (p > 0) { String owner = jid.substring(0, p); BamService resource = findService(owner); if (resource != null) resource.onAgentStart(jid); } return conn; } protected String generateJid(String uid, String resource) { StringBuilder sb = new StringBuilder(); if (uid.indexOf('@') > 0) sb.append(uid); else sb.append(uid).append('@').append(getDomain()); sb.append("/"); if (resource != null) sb.append(resource); else { Base64.encode(sb, RandomUtil.getRandomLong()); } return sb.toString(); } /** * Registers a service */ public void addService(BamService service) { String jid = service.getJid(); synchronized (_serviceMap) { BamService oldService = _serviceMap.get(jid); if (oldService != null) throw new IllegalStateException(L.l("duplicated jid='{0}' is not allowed", jid)); _serviceMap.put(jid, service); _serviceCache.put(jid, new WeakReference<BamService>(service)); } synchronized (_agentMap) { WeakReference<BamStream> oldRef = _agentMap.get(jid); if (oldRef != null && oldRef.get() != null) throw new IllegalStateException(L.l("duplicated jid='{0}' is not allowed", jid)); BamStream agentStream = service.getAgentStream(); _agentMap.put(jid, new WeakReference<BamStream>(agentStream)); } if (log.isLoggable(Level.FINE)) log.fine(this + " addService jid=" + jid + " " + service); } /** * Removes a service */ public void removeService(BamService service) { String jid = service.getJid(); synchronized (_serviceMap) { _serviceMap.remove(jid); } synchronized (_serviceCache) { _serviceCache.remove(jid); } synchronized (_agentMap) { _agentMap.remove(jid); } if (log.isLoggable(Level.FINE)) log.fine(this + " removeService jid=" + jid + " " + service); } /** * Returns the manager's own id. */ protected String getManagerJid() { return _managerJid; } /** * Returns the domain */ protected String getDomain() { return _domain; } /** * getJid() returns null for the broker */ public String getJid() { return _domain; } /** * Presence */ public void presence(String to, String from, Serializable value) { /* if (to == null) { BamServiceManager []resourceManagers = _serviceManagerList; for (BamServiceManager manager : resourceManagers) { manager.presence(to, from, data); } } else { */ BamStream stream = findAgent(to); if (stream != null) stream.presence(to, from, value); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresence (no resource) to=" + to + " from=" + from + " value=" + value); } } } /** * Presence unavailable */ public void presenceUnavailable(String to, String from, Serializable data) { BamStream stream = findAgent(to); if (stream != null) stream.presenceUnavailable(to, from, data); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceUnavailable (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Presence probe */ public void presenceProbe(String to, String from, Serializable data) { BamStream stream = findAgent(to); if (stream != null) stream.presenceProbe(to, from, data); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceProbe (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Presence subscribe */ public void presenceSubscribe(String to, String from, Serializable data) { BamStream stream = findAgent(to); if (stream != null) stream.presenceSubscribe(to, from, data); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceSubscribe (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Presence subscribed */ public void presenceSubscribed(String to, String from, Serializable data) { BamStream stream = findAgent(to); if (stream != null) stream.presenceSubscribed(to, from, data); else { if (log.isLoggable(Level.FINER)) { log.finer(this + " sendPresenceSubscribed (no resource) to=" + to + " from=" + from + " value=" + data); } } } /** * Presence unsubscribe */ public void presenceUnsubscribe(String to, String from, Serializable data) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?