⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 subjectinfo.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      SubjectInfo.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   Handling the Client dataAuthor:    xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.authentication;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.authentication.plugins.I_Subject;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.qos.SessionQos;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.cluster.NodeId;import org.xmlBlaster.engine.cluster.ClusterNode;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.dispatch.DispatchStatistic;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;import org.xmlBlaster.engine.query.plugins.QueueQueryPlugin;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.engine.admin.I_AdminSession;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.util.error.MsgErrorInfo;import org.xmlBlaster.engine.MsgErrorHandler;import java.util.Map;import java.util.HashMap;import java.util.Properties;import java.util.Set;import java.util.HashSet;import java.util.Iterator;//import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;import org.xmlBlaster.util.ReentrantLock;import javax.management.NotificationBroadcasterSupport;import javax.management.AttributeChangeNotification;import javax.management.MBeanNotificationInfo;/** * The SubjectInfo stores all known data about a client. * <p> * It also contains a subject queue, where messages are stored * until they are delivered at the next login of this client. * </p> * <p> * There are three states for SubjectInfo namely UNDEF, ALIVE, DEAD. * A transition from UNDEF directly to DEAD is not supported. * Transitions from ALIVE or DEAD to UNDEF are not possible. * </p> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class SubjectInfo extends NotificationBroadcasterSupport /* implements I_AdminSubject, SubjectInfoMBean -> is delegated to SubjectInfoProtector */{   private String ME = "SubjectInfo";   private final ServerScope glob;   private static Logger log = Logger.getLogger(SubjectInfo.class.getName());   private final ContextNode contextNode;   private final Authenticate authenticate;   /** The cluster wide unique identifier of the subject e.g. "/node/heron/client/joe" */   private SessionName subjectName;   /** The partner class from the security framework */   private I_Subject securityCtx = null;   /**    * All sessions of this subject are stored in this map.    * The absoluteSessionName == sessionInfo.getId() is the key,    * the SessionInfo object the value    */   private Map sessionMap = new HashMap();   private volatile SessionInfo[] sessionArrCache;   public CallbackAddress[] callbackAddressCache = null;   private MsgErrorHandler msgErrorHandler;   private final DispatchStatistic dispatchStatistic;   private final SubjectInfoProtector subjectInfoProtector;   private NodeId nodeId = null;   private boolean determineNodeId = true;   // Enforced by I_AdminSubject   /** Incarnation time of this object instance in millis */   private long startupTime;   private int maxSessions;   /** State during and after construction */   public final int UNDEF = -1;   /** State after calling toAlive() */   public final int ALIVE = 0;   /** State after calling shutdown() */   public final int DEAD = 1;   private int state = UNDEF;   private ReentrantLock lock = new ReentrantLock();   /**    * All MsgUnit which can't be delivered to the client (if he is not logged in)    * are queued here and are delivered when the client comes on line.    * <p>    * Node objects = MsgQueueEntry    */   private I_Queue subjectQueue;   /** this is used for administrative gets (queries on callback queue) */   private volatile QueueQueryPlugin queueQueryPlugin;   /** Statistics */   private static long instanceCounter = 0L;   private long instanceId = 0L;   /** My JMX registration */   private JmxMBeanHandle mbeanHandle;   /**    * <p />    * @param subjectName  The unique loginName    * @param securityCtx  The security context of this subject    * @param prop         The property from the subject queue, usually from connectQos.getSubjectQueueProperty()    */   public SubjectInfo(ServerScope glob, Authenticate authenticate, SessionName subjectName) //, I_Subject securityCtx, CbQueueProperty prop)          throws XmlBlasterException {      synchronized (SubjectInfo.class) {         this.instanceId = instanceCounter;         instanceCounter++;      }      this.glob = glob;      this.authenticate = authenticate;      this.subjectInfoProtector = new SubjectInfoProtector(this);      this.subjectName = subjectName; //new SessionName(glob, glob.getNodeId(), loginName);      if (this.subjectName.isSession()) {         log.severe(ME+": Didn't expect a session name for a subject: " + this.subjectName.toXml());         Thread.dumpStack();      }      String instanceName = this.glob.validateJmxValue(this.subjectName.getLoginName());      this.contextNode = new ContextNode(ContextNode.SUBJECT_MARKER_TAG, instanceName,                                       this.glob.getContextNode());      this.ME = this.instanceId + "-" + this.subjectName.getAbsoluteName();      this.dispatchStatistic = new DispatchStatistic();      // JMX register "client/joe"      this.mbeanHandle = this.glob.registerMBean(this.contextNode, this.subjectInfoProtector);      if (log.isLoggable(Level.FINE)) log.fine(ME+": Created new SubjectInfo");   }   /**    * The unique name of this subject instance.    * @return Never null, for example "/xmlBlaster/node/heron/client/joe"    */   public final ContextNode getContextNode() {      return this.contextNode;   }   /**    * if state==UNDEF we block until we are ALIVE (or DEAD)    * @exception If we are DEAD or on one minute timeout, subjectInfo is never locked in such a case    */   public void waitUntilAlive(boolean returnLocked) throws XmlBlasterException {      if (this.state == ALIVE) {         if (returnLocked) this.lock.lock();         return;      }      if (this.state == DEAD)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME+".waitUntilAlive()", "Did not expect state DEAD, please try again.");      if (log.isLoggable(Level.FINE)) log.fine(ME+": is going to wait max. one minute");      long msecs = 1000 * 60;      while (true) {         synchronized (this) {            try {               this.wait(msecs);               break;            }            catch (InterruptedException e) {               log.severe(ME+": Ignoring unexpected exception: " + e.toString());            }         }      }      if (returnLocked) this.lock.lock();      if (this.state != ALIVE) {         if (returnLocked) this.lock.release();         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME+".waitUntilAlive()", "ALIVE not reached, state=" + this.state);      }      return;   }   /**    * Access the synchronization object of this SubjectInfo instance.    */   public ReentrantLock getLock() {      return this.lock;   }   SubjectInfoProtector getSubjectInfoProtector() {      return this.subjectInfoProtector;   }   /**    * Initialize SubjectInfo    * @param securityCtx Can be null for PtP message with implicit SubjectInfo creation    * @param prop The property to configure the PtP message queue    */   public void toAlive(I_Subject securityCtx, CbQueueProperty prop) throws XmlBlasterException {      if (isAlive()) {         return;      }      this.lock.lock();      try {         if (securityCtx != null) {            this.securityCtx = securityCtx;         }         this.startupTime = System.currentTimeMillis();         this.maxSessions = glob.getProperty().get("session.maxSessions", SessionQos.DEFAULT_maxSessions);         if (glob.getId() != null)            this.maxSessions = glob.getProperty().get("session.maxSessions["+glob.getId()+"]", this.maxSessions);         this.subjectQueue = createSubjectQueue(prop);         this.state = ALIVE;         synchronized (this) {            this.notifyAll();    // notify waitUntilAlive()         }         // done externally to avoid dead locks         //this.authenticate.addLoginName(this); // register myself -> enters synchronized(this.loginNameSubjectInfoMap)         try { // Keep this code in the sync block, in case a disconnect() arrives immediately            glob.getRequestBroker().updateInternalUserList();         }         catch (XmlBlasterException e) {            log.severe(ME+": Publishing internal user list failed: " + e.getMessage());         }         if (log.isLoggable(Level.FINE)) log.fine(ME+": Transition from UNDEF to ALIVE done");      }      finally {         this.lock.release();      }   }   private I_Queue createSubjectQueue(CbQueueProperty prop) throws XmlBlasterException {      if (prop == null) prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, glob.getId());      String type = prop.getType();      String version = prop.getVersion();      I_Queue queue = glob.getQueuePluginManager().getPlugin(type, version,                           new StorageId(Constants.RELATING_SUBJECT, this.subjectName.getAbsoluteName()), prop);      queue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting      return queue;   }   /**    * The shutdown is synchronized and checks if there is no need for this subject anymore.    * <p>    * clearQueue==false&&forceIfEntries==true: We shutdown and preserve existing PtP messages    * </p>    * @param clearQueue Shall the message queue of the client be destroyed as well on last session logout?    * @param forceIfEntries Shutdown even if there are messages in the queue    */   public void shutdown(boolean clearQueue, boolean forceIfEntries) {      if (log.isLoggable(Level.FINER)) log.finer(ME+": clearQueue=" + clearQueue + ", forceIfEntries=" + forceIfEntries);      this.lock.lock();      try {         if (!this.isAlive()) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Ignoring shutdown request as we are in state " + getStateStr());            return;         }         if (isLoggedIn()) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Ignoring shutdown request as there are still login sessions");            return;         }         if (!forceIfEntries && !clearQueue && getSubjectQueue().getNumOfEntries() > 0) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Ignoring shutdown request as there are still messages in the subject queue");            return;         }         this.glob.unregisterMBean(this.mbeanHandle);         if (getSubjectQueue().getNumOfEntries() < 1)            log.info(ME+": Destroying SubjectInfo. Nobody is logged in and no queue entries available");         else {            if (clearQueue)               log.warning(ME+": Destroying SubjectInfo. Lost " + getSubjectQueue().getNumOfEntries() + " messages in the subject queue as clearQueue is set to true");            else               log.warning(ME+": Destroying SubjectInfo. The subject queue still contains " + getSubjectQueue().getNumOfEntries() + " messages, " +                            getSubjectQueue().getNumOfPersistentEntries() + " persistent messages remain on disk, the transients are lost");         }         this.authenticate.removeLoginName(this);  // deregister         this.state = DEAD;         if (clearQueue)            this.subjectQueue.clear();         if (this.subjectQueue != null) {            this.subjectQueue.shutdown();         }         if (getSessions().length > 0) {            log.warning(ME+": shutdown of subject " + getLoginName() + " has still " + getSessions().length + " sessions - memory leak?");         }         synchronized (this.sessionMap) {            this.sessionArrCache = null;            this.sessionMap.clear();            this.callbackAddressCache = null;         }         if (this.msgErrorHandler != null)            this.msgErrorHandler.shutdown();         synchronized (this) {            this.notifyAll();    // notify waitUntilAlive()         }         // Not possible to allow toAlive()         //this.securityCtx = null;      }      finally {         this.lock.release();      }   }   /**    * Shutdown my queue    */   public void finalize() {      if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected");      //boolean force = true;      //this.subjectQueue.shutdown();   }   /**    * Find a session by its pubSessionId or return null if not found    */   public SessionInfo getSessionInfo(SessionName sessionName) {      SessionInfo[] sessions = getSessions();      for (int ii=0; ii<sessions.length; ii++) {         if (sessions[ii].getSessionName().equalsRelative(sessionName)) {            return sessions[ii];         }      }      return null;   }   /**    * @return not null if client is a cluster node, else null    */   public final NodeId getNodeId() throws XmlBlasterException {      if (determineNodeId) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -