📄 subjectinfo.java
字号:
/*------------------------------------------------------------------------------Name: SubjectInfo.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Handling the Client dataAuthor: xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.authentication;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.authentication.plugins.I_Subject;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.qos.SessionQos;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.cluster.NodeId;import org.xmlBlaster.engine.cluster.ClusterNode;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.dispatch.DispatchStatistic;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;import org.xmlBlaster.engine.query.plugins.QueueQueryPlugin;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.engine.admin.I_AdminSession;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.util.error.MsgErrorInfo;import org.xmlBlaster.engine.MsgErrorHandler;import java.util.Map;import java.util.HashMap;import java.util.Properties;import java.util.Set;import java.util.HashSet;import java.util.Iterator;//import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;import org.xmlBlaster.util.ReentrantLock;import javax.management.NotificationBroadcasterSupport;import javax.management.AttributeChangeNotification;import javax.management.MBeanNotificationInfo;/** * The SubjectInfo stores all known data about a client. * <p> * It also contains a subject queue, where messages are stored * until they are delivered at the next login of this client. * </p> * <p> * There are three states for SubjectInfo namely UNDEF, ALIVE, DEAD. * A transition from UNDEF directly to DEAD is not supported. * Transitions from ALIVE or DEAD to UNDEF are not possible. * </p> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class SubjectInfo extends NotificationBroadcasterSupport /* implements I_AdminSubject, SubjectInfoMBean -> is delegated to SubjectInfoProtector */{ private String ME = "SubjectInfo"; private final ServerScope glob; private static Logger log = Logger.getLogger(SubjectInfo.class.getName()); private final ContextNode contextNode; private final Authenticate authenticate; /** The cluster wide unique identifier of the subject e.g. "/node/heron/client/joe" */ private SessionName subjectName; /** The partner class from the security framework */ private I_Subject securityCtx = null; /** * All sessions of this subject are stored in this map. * The absoluteSessionName == sessionInfo.getId() is the key, * the SessionInfo object the value */ private Map sessionMap = new HashMap(); private volatile SessionInfo[] sessionArrCache; public CallbackAddress[] callbackAddressCache = null; private MsgErrorHandler msgErrorHandler; private final DispatchStatistic dispatchStatistic; private final SubjectInfoProtector subjectInfoProtector; private NodeId nodeId = null; private boolean determineNodeId = true; // Enforced by I_AdminSubject /** Incarnation time of this object instance in millis */ private long startupTime; private int maxSessions; /** State during and after construction */ public final int UNDEF = -1; /** State after calling toAlive() */ public final int ALIVE = 0; /** State after calling shutdown() */ public final int DEAD = 1; private int state = UNDEF; private ReentrantLock lock = new ReentrantLock(); /** * All MsgUnit which can't be delivered to the client (if he is not logged in) * are queued here and are delivered when the client comes on line. * <p> * Node objects = MsgQueueEntry */ private I_Queue subjectQueue; /** this is used for administrative gets (queries on callback queue) */ private volatile QueueQueryPlugin queueQueryPlugin; /** Statistics */ private static long instanceCounter = 0L; private long instanceId = 0L; /** My JMX registration */ private JmxMBeanHandle mbeanHandle; /** * <p /> * @param subjectName The unique loginName * @param securityCtx The security context of this subject * @param prop The property from the subject queue, usually from connectQos.getSubjectQueueProperty() */ public SubjectInfo(ServerScope glob, Authenticate authenticate, SessionName subjectName) //, I_Subject securityCtx, CbQueueProperty prop) throws XmlBlasterException { synchronized (SubjectInfo.class) { this.instanceId = instanceCounter; instanceCounter++; } this.glob = glob; this.authenticate = authenticate; this.subjectInfoProtector = new SubjectInfoProtector(this); this.subjectName = subjectName; //new SessionName(glob, glob.getNodeId(), loginName); if (this.subjectName.isSession()) { log.severe(ME+": Didn't expect a session name for a subject: " + this.subjectName.toXml()); Thread.dumpStack(); } String instanceName = this.glob.validateJmxValue(this.subjectName.getLoginName()); this.contextNode = new ContextNode(ContextNode.SUBJECT_MARKER_TAG, instanceName, this.glob.getContextNode()); this.ME = this.instanceId + "-" + this.subjectName.getAbsoluteName(); this.dispatchStatistic = new DispatchStatistic(); // JMX register "client/joe" this.mbeanHandle = this.glob.registerMBean(this.contextNode, this.subjectInfoProtector); if (log.isLoggable(Level.FINE)) log.fine(ME+": Created new SubjectInfo"); } /** * The unique name of this subject instance. * @return Never null, for example "/xmlBlaster/node/heron/client/joe" */ public final ContextNode getContextNode() { return this.contextNode; } /** * if state==UNDEF we block until we are ALIVE (or DEAD) * @exception If we are DEAD or on one minute timeout, subjectInfo is never locked in such a case */ public void waitUntilAlive(boolean returnLocked) throws XmlBlasterException { if (this.state == ALIVE) { if (returnLocked) this.lock.lock(); return; } if (this.state == DEAD) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME+".waitUntilAlive()", "Did not expect state DEAD, please try again."); if (log.isLoggable(Level.FINE)) log.fine(ME+": is going to wait max. one minute"); long msecs = 1000 * 60; while (true) { synchronized (this) { try { this.wait(msecs); break; } catch (InterruptedException e) { log.severe(ME+": Ignoring unexpected exception: " + e.toString()); } } } if (returnLocked) this.lock.lock(); if (this.state != ALIVE) { if (returnLocked) this.lock.release(); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME+".waitUntilAlive()", "ALIVE not reached, state=" + this.state); } return; } /** * Access the synchronization object of this SubjectInfo instance. */ public ReentrantLock getLock() { return this.lock; } SubjectInfoProtector getSubjectInfoProtector() { return this.subjectInfoProtector; } /** * Initialize SubjectInfo * @param securityCtx Can be null for PtP message with implicit SubjectInfo creation * @param prop The property to configure the PtP message queue */ public void toAlive(I_Subject securityCtx, CbQueueProperty prop) throws XmlBlasterException { if (isAlive()) { return; } this.lock.lock(); try { if (securityCtx != null) { this.securityCtx = securityCtx; } this.startupTime = System.currentTimeMillis(); this.maxSessions = glob.getProperty().get("session.maxSessions", SessionQos.DEFAULT_maxSessions); if (glob.getId() != null) this.maxSessions = glob.getProperty().get("session.maxSessions["+glob.getId()+"]", this.maxSessions); this.subjectQueue = createSubjectQueue(prop); this.state = ALIVE; synchronized (this) { this.notifyAll(); // notify waitUntilAlive() } // done externally to avoid dead locks //this.authenticate.addLoginName(this); // register myself -> enters synchronized(this.loginNameSubjectInfoMap) try { // Keep this code in the sync block, in case a disconnect() arrives immediately glob.getRequestBroker().updateInternalUserList(); } catch (XmlBlasterException e) { log.severe(ME+": Publishing internal user list failed: " + e.getMessage()); } if (log.isLoggable(Level.FINE)) log.fine(ME+": Transition from UNDEF to ALIVE done"); } finally { this.lock.release(); } } private I_Queue createSubjectQueue(CbQueueProperty prop) throws XmlBlasterException { if (prop == null) prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, glob.getId()); String type = prop.getType(); String version = prop.getVersion(); I_Queue queue = glob.getQueuePluginManager().getPlugin(type, version, new StorageId(Constants.RELATING_SUBJECT, this.subjectName.getAbsoluteName()), prop); queue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting return queue; } /** * The shutdown is synchronized and checks if there is no need for this subject anymore. * <p> * clearQueue==false&&forceIfEntries==true: We shutdown and preserve existing PtP messages * </p> * @param clearQueue Shall the message queue of the client be destroyed as well on last session logout? * @param forceIfEntries Shutdown even if there are messages in the queue */ public void shutdown(boolean clearQueue, boolean forceIfEntries) { if (log.isLoggable(Level.FINER)) log.finer(ME+": clearQueue=" + clearQueue + ", forceIfEntries=" + forceIfEntries); this.lock.lock(); try { if (!this.isAlive()) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Ignoring shutdown request as we are in state " + getStateStr()); return; } if (isLoggedIn()) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Ignoring shutdown request as there are still login sessions"); return; } if (!forceIfEntries && !clearQueue && getSubjectQueue().getNumOfEntries() > 0) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Ignoring shutdown request as there are still messages in the subject queue"); return; } this.glob.unregisterMBean(this.mbeanHandle); if (getSubjectQueue().getNumOfEntries() < 1) log.info(ME+": Destroying SubjectInfo. Nobody is logged in and no queue entries available"); else { if (clearQueue) log.warning(ME+": Destroying SubjectInfo. Lost " + getSubjectQueue().getNumOfEntries() + " messages in the subject queue as clearQueue is set to true"); else log.warning(ME+": Destroying SubjectInfo. The subject queue still contains " + getSubjectQueue().getNumOfEntries() + " messages, " + getSubjectQueue().getNumOfPersistentEntries() + " persistent messages remain on disk, the transients are lost"); } this.authenticate.removeLoginName(this); // deregister this.state = DEAD; if (clearQueue) this.subjectQueue.clear(); if (this.subjectQueue != null) { this.subjectQueue.shutdown(); } if (getSessions().length > 0) { log.warning(ME+": shutdown of subject " + getLoginName() + " has still " + getSessions().length + " sessions - memory leak?"); } synchronized (this.sessionMap) { this.sessionArrCache = null; this.sessionMap.clear(); this.callbackAddressCache = null; } if (this.msgErrorHandler != null) this.msgErrorHandler.shutdown(); synchronized (this) { this.notifyAll(); // notify waitUntilAlive() } // Not possible to allow toAlive() //this.securityCtx = null; } finally { this.lock.release(); } } /** * Shutdown my queue */ public void finalize() { if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected"); //boolean force = true; //this.subjectQueue.shutdown(); } /** * Find a session by its pubSessionId or return null if not found */ public SessionInfo getSessionInfo(SessionName sessionName) { SessionInfo[] sessions = getSessions(); for (int ii=0; ii<sessions.length; ii++) { if (sessions[ii].getSessionName().equalsRelative(sessionName)) { return sessions[ii]; } } return null; } /** * @return not null if client is a cluster node, else null */ public final NodeId getNodeId() throws XmlBlasterException { if (determineNodeId) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -