📄 topicaccessor.java
字号:
/*------------------------------------------------------------------------------ Name: TopicAccessor.java Project: xmlBlaster.org Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file ------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;// http://dcl.mathcs.emory.edu/util/backport-util-concurrent/doc/api/import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;/** * Singleton in ServerScope to access a TopicHandler instance. * <p> * Used to guarantee single threaded access to a TopicHandler instance. * Only well defined methods allow dirty reads from other threads simultaneously, * further we have a pattern to dispatch the topicHandler access to a worker thread. * @see <a * href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.message.lifecycle.html">The * engine.message.lifecylce requirement</a> * @see org.xmlBlaster.test.topic.TestTopicLifeCycle * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class TopicAccessor { private final ServerScope serverScope; private static Logger log = Logger.getLogger(TopicAccessor.class.getName()); /** * Map for TopicContainer. * <p> * key = oid value from <key oid="..."> (== topicHandler.getUniqueKey()) * value = TopicContainer instance */ private final Map topicHandlerMap = new HashMap(); /** * For listeners who want to be informed about topic creation / deletion * events. */ private final Set topicListenerSet = new TreeSet(); private BlockingQueue blockingQueue; /* * For listeners who want to be informed about topic creation / deletion * events. */ public TopicAccessor(ServerScope serverScope) { this.serverScope = serverScope; // Support async execution of tasks // this.blockingQueue = new SynchronousQueue(); // has no capacity but // blocks until other thread takes it out final int capacity = 10000; this.blockingQueue = new LinkedBlockingQueue(capacity); Consumer c = new Consumer(this.blockingQueue); Thread t = new Thread(c); t.setName("XmlBlaster.Consumer"); t.setDaemon(true); t.start(); } /** * Access a topicHandler by its unique oid. * <p /> * You need to call release(topicHandler) after usage! * * @param oid * topicHandler.getUniqueKey() * @return The topicHandler instance or null, in case of null you don't need * to release anything */ public TopicHandler access(String oid) { TopicContainer tc = accessTopicContainer(oid); if (tc == null) return null; return tc.lock(); // Here the calling threads block until its their turn } /** * The topicHandler is not locked, use for read only access only and when you know what you are doing. * @param oid * topicHandler.getUniqueKey() * @return The topicHandler instance or null, you don't need to release anything */ public TopicHandler accessDirtyRead(String oid) { TopicContainer tc = accessTopicContainer(oid); if (tc == null) return null; return tc.getTopicHandler(); } // Is NOT locked! private TopicContainer accessTopicContainer(String oid) { Object obj; synchronized (this.topicHandlerMap) { obj = this.topicHandlerMap.get(oid); } if (obj == null) { // Normal case if topic is new created (by subscribe) if (log.isLoggable(Level.FINE)) log.fine("key oid " + oid + " is unknown, topicHandler == null"); if (log.isLoggable(Level.FINEST)) Thread.dumpStack(); return null; } return (TopicContainer) obj; } /** * Return the topicHandler which you previously got with access(String oid). * * @param topicHandler * Currently logs severe if null */ public void release(TopicHandler topicHandler) { if (topicHandler == null) { log.severe("Unexpected topicHandler == null"); Thread.dumpStack(); return; } Object obj; synchronized (this.topicHandlerMap) { obj = this.topicHandlerMap.get(topicHandler.getUniqueKey()); } if (obj == null) { // Happens for example in RequestBroler.erase() which // triggers a toDead() which cleans up the // TopicContainer already if (log.isLoggable(Level.FINE)) log.fine("key oid " + topicHandler.getUniqueKey() + " is unknown, topicHandler == null"); if (log.isLoggable(Level.FINEST)) Thread.dumpStack(); return; } TopicContainer tc = (TopicContainer) obj; tc.unlock(); } /** * Access a topicHandler by its unique oid or create it if not known. * <p /> * You need to call release(topicHandler) after usage. * * @param sessionInfo * Can be null if called by a subscription * @param oid * topicHandler.getUniqueKey() * @return The topicHandler instance but never null */ public TopicHandler findOrCreate(SessionInfo sessionInfo, String oid) throws XmlBlasterException { TopicContainer tc = null; Object oldOne; synchronized (this.topicHandlerMap) { oldOne = this.topicHandlerMap.get(oid); if (oldOne == null) { TopicHandler topicHandler = new TopicHandler(this.serverScope .getRequestBroker(), sessionInfo, oid); tc = new TopicContainer(topicHandler); this.topicHandlerMap.put(topicHandler.getUniqueKey(), tc); } else { tc = (TopicContainer) oldOne; } } TopicHandler topicHandler = tc.lock(); if (topicHandler == null) { // try again recursive log.warning("Trying again to get a TopicHandler '"+oid+"': " + sessionInfo.toXml()); return findOrCreate(sessionInfo, oid); } // old, pre 1.3 behaviour: // if (obj == null && sessionInfo != null) { // is new created for a // publish(), but not when created for a subscribe if (oldOne == null) { // is new created fireTopicEvent(topicHandler); // is locked! } return topicHandler; } /** * Remove the given topic * * @param topicHandler */ public void erase(String oid) throws XmlBlasterException { TopicContainer tc = accessTopicContainer(oid); if (tc == null) return; Object obj; TopicHandler topicHandler = tc.lock(); try { fireTopicEvent(topicHandler); // is locked! synchronized (this.topicHandlerMap) { obj = this.topicHandlerMap.remove(oid); } if (obj == null) { log.severe("topicHandler '" + oid + "' was not found in map"); Thread.dumpStack(); } } finally { tc.erase(); // unlocks all locks } } /** * Treat as read only! For class internal use only. * * @return A current snapshot of all topics (never null) */ private TopicHandler[] getTopicHandlerArr() { synchronized (this.topicHandlerMap) { int len = this.topicHandlerMap.size(); TopicHandler[] handlers = new TopicHandler[len]; Iterator it = this.topicHandlerMap.values().iterator(); int i = 0; while (it.hasNext()) { handlers[i] = ((TopicContainer) it.next()).getTopicHandler(); i++; } return handlers; } } /** * Access oid array * * @return A string array of all topicHandler.getUniqueKey() */ public String[] getTopics() { synchronized (this.topicHandlerMap) { int len = this.topicHandlerMap.size(); String[] handlers = new String[len]; Iterator it = this.topicHandlerMap.values().iterator(); int i = 0; while (it.hasNext()) { handlers[i] = ((TopicContainer) it.next()).getTopicHandler() .getUniqueKey(); i++; } return handlers; } } /** * Access the number of known topics. * * @return Number of registered topics */ public int getNumTopics() { synchronized (this.topicHandlerMap) { return this.topicHandlerMap.size(); } } /** * Called from SessionPersistencePlugin after all sessions / subscriptions are * alive after a server startup. * <p/>
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -