📄 sessioninfo.java
字号:
/*------------------------------------------------------------------------------Name: SessionInfo.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Handling the Client dataAuthor: xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.authentication;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.checkpoint.I_Checkpoint;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.engine.SubscriptionInfo;import org.xmlBlaster.authentication.plugins.I_Session;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.engine.qos.AddressServer;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.engine.qos.DisconnectQosServer;import org.xmlBlaster.engine.query.plugins.QueueQueryPlugin;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.dispatch.DispatchStatistic;import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.engine.MsgErrorHandler;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.engine.qos.UnSubscribeQosServer;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.contrib.ClientPropertiesInfo;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;//import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;import org.xmlBlaster.util.ReentrantLock;/** * SessionInfo stores all known session data about a client. * <p /> * One client (SubjectInfo) may have multiple login sessions. * Each session has its callback queue to deliver subscribed * messages to the client. * <p /> * We distinguish two different unique ID for each login session: * <ol> * <li>sessionId: This is the unique, secret session Id which is passed * by the client on every method invocation to allow authentication</li> * <li>instanceId: This is a unique counter (with respect to one virtual machine JVM). * It allows 'public' addressing of a session</li> * </ol> * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.qos.login.session.html">The engine.qos.login requirement</a> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class SessionInfo implements I_Timeout, I_StorageSizeListener{ private String ME = "SessionInfo"; private final ContextNode contextNode; /** The cluster wide unique identifier of the session e.g. "/node/heron/client/joe/2" */ private final SessionName sessionName; private SubjectInfo subjectInfo; // all client informations private I_Session securityCtx; private static long instanceCounter = 0L; private long instanceId = 0L; /** The current connection address from the protocol plugin */ private ConnectQosServer connectQos; private Timeout expiryTimer; private Timestamp timerKey; private final ServerScope glob; private static Logger log = Logger.getLogger(SessionInfo.class.getName()); /** Do error recovery if message can't be delivered and we give it up */ private final MsgErrorHandler msgErrorHandler; /** manager for sending callback messages */ private DispatchManager dispatchManager; /** Statistic about send/received messages, can be null if there is a DispatchManager around */ private volatile DispatchStatistic statistic; private boolean isShutdown = false; /** Protects timerKey refresh */ private final Object EXPIRY_TIMER_MONITOR = new Object(); private final SessionInfoProtector sessionInfoProtector; /** My JMX registration */ private JmxMBeanHandle mbeanHandle; /** To prevent noisy warnings */ private boolean transientWarn; /** Can be optionally used by authorization frameworks */ private Object authorizationCache; private XmlBlasterException transportConnectFail; /** Holding properties send by our remote client via the topic __sys__sessionProperties */ private ClientPropertiesInfo remoteProperties; private boolean acceptWrongSenderAddress; /** * All MsgUnit which shall be delivered to the current session of the client * are queued here to be ready to deliver. * <p /> * Node objects = MsgQueueEntry */ private I_Queue sessionQueue; private long lastNumEntries = -1L; // Enforced by I_AdminSubject /** Incarnation time of this object instance in millis */ private long startupTime; private ReentrantLock lock = new ReentrantLock(); /** this is used for administrative gets (queries on callback queue) */ private volatile QueueQueryPlugin queueQueryPlugin; /** * Create this instance when a client did a login. * <p /> * @param subjectInfo the SubjectInfo with the login informations for this client */ public SessionInfo(SubjectInfo subjectInfo, I_Session securityCtx, ConnectQosServer connectQos, ServerScope glob) throws XmlBlasterException { this.glob = glob; if (securityCtx==null) { String tmp = "SessionInfo(securityCtx==null); A correct security manager must be set."; log.severe(tmp); throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_CONFIGURATION, tmp); } this.sessionInfoProtector = new SessionInfoProtector(this); //String prefix = glob.getLogPrefix(); subjectInfo.checkNumberOfSessions(connectQos.getData()); synchronized (SessionInfo.class) { instanceId = instanceCounter; instanceCounter--; } //this.id = ((prefix.length() < 1) ? "client/" : (prefix+"/client/")) + subjectInfo.getLoginName() + "/" + getPublicSessionId(); if (connectQos.getSessionName().isPubSessionIdUser()) { // client has specified its own publicSessionId (> 0) this.sessionName = connectQos.getSessionName(); } else { this.sessionName = new SessionName(glob, subjectInfo.getSubjectName(), getInstanceId()); } this.contextNode = new ContextNode(ContextNode.SESSION_MARKER_TAG, ""+this.sessionName.getPublicSessionId(), subjectInfo.getContextNode()); this.ME = this.instanceId + "-" + this.sessionName.getRelativeName(); if (log.isLoggable(Level.FINER)) log.finer(ME+": Creating new SessionInfo " + instanceId + ": " + subjectInfo.toString()); this.startupTime = System.currentTimeMillis(); this.subjectInfo = subjectInfo; this.securityCtx = securityCtx; this.connectQos = connectQos; this.msgErrorHandler = new MsgErrorHandler(glob, this); String type = connectQos.getSessionCbQueueProperty().getType(); String version = connectQos.getSessionCbQueueProperty().getVersion(); if (log.isLoggable(Level.FINE)) log.fine(ME+": Creating callback queue type=" + type + " version=" + version); this.sessionQueue = glob.getQueuePluginManager().getPlugin(type, version, new StorageId(Constants.RELATING_CALLBACK, this.sessionName.getAbsoluteName()), connectQos.getSessionCbQueueProperty()); this.sessionQueue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting this.sessionQueue.addStorageSizeListener(this); CallbackAddress[] cba = this.connectQos.getSessionCbQueueProperty().getCallbackAddresses(); if (cba.length > 0) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Creating dispatch manager as ConnectQos contains callback addresses"); for (int i=0; i<cba.length; i++) { cba[i].setSessionName(this.sessionName); cba[i].addClientProperty(new ClientProperty("__ContextNode", "String", null, this.contextNode.getAbsoluteName())); } this.dispatchManager = new DispatchManager(glob, this.msgErrorHandler, this.securityCtx, this.sessionQueue, (I_ConnectionStatusListener)null, cba, this.sessionName); } else { // No callback configured if (log.isLoggable(Level.FINE)) log.fine(ME+": Don't create dispatch manager as ConnectQos contains no callback addresses"); this.dispatchManager = null; } this.expiryTimer = glob.getSessionTimer(); if (connectQos.getSessionTimeout() > 0L) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Setting expiry timer for " + getLoginName() + " to " + connectQos.getSessionTimeout() + " msec"); this.timerKey = this.expiryTimer.addTimeoutListener(this, connectQos.getSessionTimeout(), null); } else { if (log.isLoggable(Level.FINE)) log.fine(ME+": Session lasts forever, requested expiry timer was 0"); } // "__remoteProperties" if (this.connectQos.getData().getClientProperty(Constants.CLIENTPROPERTY_REMOTEPROPERTIES, false)) { mergeRemoteProperties(this.connectQos.getData().getClientProperties()); } // TODO: Decide by authorizer // see Authenticate.java boolean may = glob.getProperty().get("xmlBlaster/acceptWrongSenderAddress", false); this.acceptWrongSenderAddress = glob.getProperty().get("xmlBlaster/acceptWrongSenderAddress/"+getSessionName().getLoginName(), false); // JMX register "client/joe/1" this.mbeanHandle = this.glob.registerMBean(this.contextNode, this.sessionInfoProtector); } public final boolean isAlive() { return !this.isShutdown; } /** * The unique name of this session instance. * @return Never null, for example "/xmlBlaster/node/heron/client/joe/session/-2" */ public final ContextNode getContextNode() { return this.contextNode; } /** * Configure server with '-xmlBlaster/acceptWrongSenderAddress true' * or "-xmlBlaster/acceptWrongSenderAddress/joe true". * Is available using JMX. * @return true: We accept wrong sender address in PublishQos.getSender() (not myself) */ public boolean isAcceptWrongSenderAddress() { return this.acceptWrongSenderAddress; } /** * @param acceptWrongSenderAddress the acceptWrongSenderAddress to set */ public void setAcceptWrongSenderAddress(boolean acceptWrongSenderAddress) { boolean old = this.acceptWrongSenderAddress; this.acceptWrongSenderAddress = acceptWrongSenderAddress; String tmp = ME + "Changed acceptWrongSenderAddress from " + old + " to " + this.acceptWrongSenderAddress + "."; //if (glob.getAuthenticate().iscceptWrongSenderAddress() if (this.acceptWrongSenderAddress == true) log.warning(tmp + " Caution: This client can now publish messages using anothers login name as sender"); else log.info(tmp + " Faking anothers publisher address is not possible"); } /** * The address information got from the protocol plugin. * @return Can be null */ public AddressServer getAddressServer() { return (this.connectQos == null) ? null : this.connectQos.getAddressServer(); } /** * if state==UNDEF we block until we are ALIVE (or DEAD) public void waitUntilAlive() { //!!! log.error(ME, "Implemenation of waitUntilAlive() is missing"); return; } */ /** * The protector prevents direct access to this sessionInfo instance. */ public final SessionInfoProtector getSessionInfoProtector() { return this.sessionInfoProtector; } /** * This is a unique instance id per JVM (it is the pubSessionId if the client hasn't specified its own). * <p> * It is NOT the secret sessionId and may be published with PtP messages * without security danger * </p> */ public final long getInstanceId() { return this.instanceId; } /** * Access the synchronization object of this SessionInfo instance. */ public ReentrantLock getLock() { return this.lock; } /** * Check if a callback was configured (if client has passed a callback address on connect). */ public final boolean hasCallback() { return this.dispatchManager != null && this.isShutdown == false; } public final I_MsgErrorHandler getMsgErrorHandler() { return this.msgErrorHandler; } /** * This is the publicSessionId which is unique in the subject scope. * <p /> * It is NOT the secret sessionId and may be published with PtP messages * without security danger * <p /> * @return The same as getInstanceId() * @see #getInstanceId */ public final long getPublicSessionId() { return this.sessionName.getPublicSessionId(); } public void finalize() { removeExpiryTimer(); if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected " + getSecretSessionId()); } public boolean isShutdown() { this.lock.lock(); try { return this.isShutdown; // sync'd because of TimeoutListener? } finally { this.lock.release(); } } public void removeExpiryTimer() { synchronized (this.EXPIRY_TIMER_MONITOR) { if (this.timerKey != null) { this.expiryTimer.removeTimeoutListener(this.timerKey); this.timerKey = null; } } } public void shutdown() { if (log.isLoggable(Level.FINER)) log.finer(ME+": shutdown() of session"); this.glob.unregisterMBean(this.mbeanHandle); this.lock.lock(); try { this.isShutdown = true; removeExpiryTimer(); I_Queue sessionQueue = this.sessionQueue; if (sessionQueue != null) { sessionQueue.shutdown(); //this.sessionQueue = null; Not set to null to support avoid synchronize(this.sessionQueue) } if (this.msgErrorHandler != null) this.msgErrorHandler.shutdown(); DispatchManager dispatchManager = this.dispatchManager; if (dispatchManager != null) dispatchManager.shutdown(); this.subjectInfo = null; // this.securityCtx = null; We need it in finalize() getSecretSessionId() // this.connectQos = null; this.expiryTimer = null; } finally { this.lock.release(); } } /** * @return null if no callback is configured, can change to null on reconfiguration */ public final DispatchManager getDispatchManager() { return this.dispatchManager; } /** * @return never null but empty if no callback is configured */ public final DispatchStatistic getDispatchStatistic() { if (this.statistic == null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -