📄 topicaccessor.java
字号:
* The topic destroy timers where inhibited and can now be activated */ public void spanTopicDestroyTimeout() { String[] oids = getTopics(); // Other topics which are created in this sync gap don't need it // as they are not from persistence store for (int i=0; i<oids.length; i++) { TopicHandler topicHandler = access(oids[i]); if (topicHandler == null) continue; try { topicHandler.startDestroyTimer(); } finally { release(topicHandler); } } } /** * Access the message meat without a lock. * * @param topicId * The topic oid * @param msgUnitWrapperUniqueId * The message instance id * @return null if not found * @throws XmlBlasterException */ public MsgUnitWrapper lookupDirtyRead(String topicId, long msgUnitWrapperUniqueId) throws XmlBlasterException { TopicHandler topicHandler = getTopicHandlerDirtyRead(topicId); if (topicHandler == null) return null; return topicHandler.getMsgUnitWrapper(msgUnitWrapperUniqueId); } public void changeDirtyRead(MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException { TopicHandler topicHandler = getTopicHandlerDirtyRead(msgUnitWrapper .getKeyOid()); if (topicHandler == null) return; topicHandler.change(msgUnitWrapper); } private TopicHandler getTopicHandlerDirtyRead(String topicId) { TopicContainer tc = accessTopicContainer(topicId); if (tc == null) return null; return tc.getTopicHandler(); // no lock ... } public boolean hasMsgDistributorPluginDirtyRead(String topicId) { TopicHandler topicHandler = getTopicHandlerDirtyRead(topicId); if (topicHandler == null) return false; return topicHandler.getMsgDistributorPlugin() != null; } /** * @param topicId key oid * @return Never null */ public final SubscriptionInfo[] getSubscriptionInfoArrDirtyRead(String topicId) { TopicHandler topicHandler = getTopicHandlerDirtyRead(topicId); if (topicHandler == null) return new SubscriptionInfo[0]; return topicHandler.getSubscriptionInfoArr(); } /** * Dump all TopicHandler to xml. This is implemented as dirty read to gain * performance * * @param extraOffset * @return The markup of all TopicHandlers * @throws XmlBlasterException */ public final String toXml(String extraOffset) throws XmlBlasterException { // Dirty read: if it makes sync problems we need to lookup each // TopicHandler seperately over getTopics() StringBuffer sb = new StringBuffer(10000); if (extraOffset == null) extraOffset = ""; TopicHandler[] topicHandlerArr = getTopicHandlerArr(); for (int ii = 0; ii < topicHandlerArr.length; ii++) { sb.append(topicHandlerArr[ii].toXml(extraOffset + Constants.INDENT)); } return sb.toString(); } /** * Helper class to hold the TopicHandler and some additonal locking information. * * @author marcel */ private final class TopicContainer { private TopicHandler topicHandler; private final boolean fairness = false; private final ReentrantLock lock = new ReentrantLock(fairness); public TopicContainer(TopicHandler topicHandler) { this.topicHandler = topicHandler; } public TopicHandler getTopicHandler() { return this.topicHandler; } public void erase() { if (this.topicHandler == null) return; synchronized (this.lock) { this.topicHandler = null; int c = this.lock.getHoldCount(); for (int i = 0; i < c; i++) this.lock.unlock(); } } public TopicHandler lock() { if (this.topicHandler == null) return null; this.lock.lock(); TopicHandler th = this.topicHandler; if (th == null) { this.lock.unlock(); return null; } return th; } public void unlock() { synchronized (this.lock) { if (this.lock.getHoldCount() > 0) // returns 0 if we are not the // holder this.lock.unlock(); // IllegalMonitorStateException if our // thread is not the holder of the lock, // never happens because of above if() } } } // class TopicContainer /** * Queue request for later execution, to be outside of sync-locks * * @param msgUnitWrapper */ public void entryDestroyed_scheduleForExecution(MsgUnitWrapper msgUnitWrapper) { try { this.blockingQueue.put(msgUnitWrapper); } catch (InterruptedException e) { } } /** * Called by msgUnitWrapper.toDestroyed(): * this.glob.getTopicAccessor().entryDestroyed_scheduleForExecution(this); * Is currently switched off (not used) * @author mr@marcelruff.info */ private class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { this.queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { log.severe("TopicAccessor: Unexpected problem: " + ex.toString()); } } void consume(Object x) { MsgUnitWrapper msgUnitWrapper = (MsgUnitWrapper) x; TopicHandler topicHandler = access(msgUnitWrapper.getKeyOid()); if (topicHandler == null) return; // Too late try { log.severe("DEBUG ONLY: Executing now entry destroyed"); topicHandler.entryDestroyed(msgUnitWrapper); } finally { release(topicHandler); } } } /** * Adds the specified Topic listener to receive creation/destruction events * of Topics. * <p> * Note that the fired event holds a locked topicHandler, you shouldn't spend * too much time with it to allow other threads to do their work as well * * @param l * Your listener implementation */ public void addTopicListener(I_TopicListener l) { if (l == null) { throw new IllegalArgumentException( "TopicAccessor.addTopicListener: the listener is null"); } synchronized (this.topicListenerSet) { this.topicListenerSet.add(l); } } /** * Removes the specified listener. * * @param l * Your listener implementation */ public void removeTopicListener(I_TopicListener l) { if (l == null) { throw new IllegalArgumentException( "TopicAccessor.removeTopicListener: the listener is null"); } synchronized (this.topicListenerSet) { this.topicListenerSet.remove(l); } } /** * Access a current snapshot of all listeners. * * @return The array of registered listeners */ public synchronized I_TopicListener[] getRemotePropertiesListenerArr() { synchronized (this.topicListenerSet) { return (I_TopicListener[]) this.topicListenerSet .toArray(new I_TopicListener[this.topicListenerSet.size()]); } } /** * Is fired on topic creation or destruction. * <p> * Does never throw any exception * * @param topicHandler * The locked! handler */ private void fireTopicEvent(TopicHandler topicHandler) { try { I_TopicListener[] arr = getRemotePropertiesListenerArr(); for (int i = 0; i < arr.length; i++) { try { I_TopicListener l = arr[i]; TopicEvent event = new TopicEvent(topicHandler); l.changed(event); } catch (Throwable e) { e.printStackTrace(); } } } catch (Throwable e) { e.printStackTrace(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -