📄 sessionpersistenceplugin.java
字号:
/*------------------------------------------------------------------------------Name: SessionPersistencePlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.HashMap;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.authentication.ClientEvent;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.qos.AddressServer;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.engine.qos.ConnectReturnQosServer;import org.xmlBlaster.engine.queuemsg.SessionEntry;import org.xmlBlaster.engine.queuemsg.SubscribeEntry;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.key.KeyData;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.qos.storage.SubscribeStoreProperty;import org.xmlBlaster.util.qos.storage.SessionStoreProperty;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_Entry;/** * SessionPersistencePlugin provides the persistent storage for both sessions * and subscriptions. * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class SessionPersistencePlugin implements I_SessionPersistencePlugin { private final static String ME = "SessionPersistencePlugin"; /** when recovering all subscriptions must be 'noInitialUpdate' because otherwise * we would get messages which we already got in the past */ private final static String ORIGINAL_INITIAL_UPDATES = "__originalInitialUpdates"; private PluginInfo info; private ServerScope global; private static Logger log = Logger.getLogger(SessionPersistencePlugin.class.getName()); /** flag indicating the status: true means initialized and not yet shut down) */ private boolean isOK; private I_Map sessionStore; private I_Map subscribeStore; private StorageId sessionStorageId; private StorageId subscribeStorageId; private AddressServer addressServer; private Object sync = new Object(); private int duplicateCounter; private int errorCounter; /** * * @return hash map containing the secret sessionId of the entries recovered * as values and as keys the corresponding absolute name for the session (String) * @throws XmlBlasterException */ private HashMap recoverSessions() throws XmlBlasterException { I_MapEntry[] entries = this.sessionStore.getAll(null); HashMap sessionIds = new HashMap(); for (int i=0; i < entries.length; i++) { try { if (entries[i] instanceof SessionEntry) { // do connect SessionEntry entry = (SessionEntry)entries[i]; ConnectQosData data = this.global.getConnectQosFactory().readObject(entry.getQos()); this.addressServer = new AddressServer(this.global, "NATIVE", this.global.getId(), (java.util.Properties)null); ConnectQosServer qos = new ConnectQosServer(this.global, data); qos.isFromPersistenceRecovery(true); qos.setPersistenceUniqueId(entry.getUniqueId()); qos.setAddressServer(this.addressServer); SessionName sessionName = data.getSessionName(); String sessionId = data.getSessionQos().getSecretSessionId(); sessionIds.put(sessionName.getAbsoluteName(), sessionId); if (log.isLoggable(Level.FINE)) log.fine("recoverSessions: store in map session='" + sessionName.getAbsoluteName() + "' has secret sessionId='" + sessionId + "' and persistenceUniqueId=" + entry.getUniqueId()); // if (log.isLoggable(Level.FINE)) log.trace(ME, "recoverSessions: session: '" + data.getSessionName() + "' secretSessionId='" + qos.getSessionQos().getSecretSessionId() + "' qos='" + qos.toXml() + "'"); ConnectReturnQosServer ret = this.global.getAuthenticate().connect(this.addressServer, qos); if (log.isLoggable(Level.FINEST)) log.finest("recoverSessions: return of connect: returnConnectQos='" + ret.toXml() + "'"); } else { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME + ".recoverSessions: the entry in the storage should be of type 'SessionEntry' but is of type'" + entries[i].getClass().getName() + "'"); } } catch (XmlBlasterException e) { // authenticate password changed? TODO log.warning("Loading session from persistency failed: " + e.toString()); } } return sessionIds; } /** * When recovering due to a run level change (without shutting down the * application) this will not work. * * @throws XmlBlasterException */ private void recoverSubscriptions(final HashMap sessionIds) throws XmlBlasterException { { boolean checkForDuplicateSubscriptions = this.global.getProperty().get("xmlBlaster/checkForDuplicateSubscriptions", false); if (checkForDuplicateSubscriptions) { duplicateCounter = 0; errorCounter = 0; final java.util.Map duplicates = new java.util.TreeMap(); /*I_MapEntry[] results = */this.subscribeStore.getAll(new I_EntryFilter() { public I_Entry intercept(I_Entry entry, I_Storage storage) { if (storage.isTransient()) return null; try { SubscribeEntry subscribeEntry = (SubscribeEntry)entry; //QueryKeyData keyData = queryKeyFactory.readObject(subscribeEntry.getKey()); QueryQosData qosData = global.getQueryQosFactory().readObject(subscribeEntry.getQos()); //String key = keyData.getOid() + qosData.getSender().getAbsoluteName(); SessionName sessionName = new SessionName(global, subscribeEntry.getSessionName()); Object found = sessionIds.get(sessionName.getAbsoluteName()); if (found == null) { if (errorCounter == 0) { log.warning("Ignoring invalid entry '" + sessionName.getAbsoluteName() + "' as user is not known"); } errorCounter++; return null; } String key = qosData.getSubscriptionId(); if (log.isLoggable(Level.FINE)) log.fine("Cleanup of duplicate subscriptions, key=" + key); if (duplicates.containsKey(key)) { if (duplicateCounter == 0) log.warning("Cleanup of duplicate subscriptions, this may take a while, please wait ..."); duplicateCounter++; //log.warn(ME, "Removing duplicate subscription '" + key + "' oid=" + keyData.getOid()); //subscribeStore.remove(subscribeEntry); } else { duplicates.put(key, subscribeEntry); } } catch (XmlBlasterException e) { log.severe("Ignoring unexpected problem in checkForDuplicateSubscriptions :" + e.toString()); } return null; } }); if (duplicateCounter > 0) { this.subscribeStore.clear(); if (this.subscribeStore.getNumOfEntries() > 0) log.severe("Internal prpblem with checkForDuplicateSubscriptions"); java.util.Iterator it = duplicates.keySet().iterator(); while (it.hasNext()) { this.subscribeStore.put((I_MapEntry)duplicates.get(it.next())); } log.warning("Removed " + (duplicateCounter-duplicates.size()) + " identical subscriptions, keeping " + duplicates.size() + ". Ignored " + errorCounter + " invalid subscriptions as no session was found"); } } } I_MapEntry[] entries = this.subscribeStore.getAll(null); for (int i=0; i < entries.length; i++) { if (entries[i] instanceof SubscribeEntry) { // do connect SubscribeEntry entry = (SubscribeEntry)entries[i]; String qos = entry.getQos(); QueryQosData qosData = global.getQueryQosFactory().readObject(qos); ClientProperty clientProperty = qosData.getClientProperty(Constants.PERSISTENCE_ID); if (clientProperty == null) { log.severe("SubscribeQos with missing " + Constants.PERSISTENCE_ID + ": " + qosData.toXml()); long uniqueId = new Timestamp().getTimestamp(); qosData.getClientProperties().put(Constants.PERSISTENCE_ID, new ClientProperty(Constants.PERSISTENCE_ID, "long", null, "" + uniqueId)); } boolean initialUpdates = qosData.getInitialUpdateProp().getValue(); if (initialUpdates) { qosData.getClientProperties().put(ORIGINAL_INITIAL_UPDATES, new ClientProperty(ORIGINAL_INITIAL_UPDATES, "boolean", null, "true")); } SessionName sessionName = new SessionName(this.global, entry.getSessionName()); String sessionId = (String)sessionIds.get(sessionName.getAbsoluteName()); if (sessionId == null) { log.severe("The secret sessionId was not found for session='" + sessionName.getAbsoluteName() + "', removing persistent subscription " + entry.getLogId()); this.subscribeStore.remove(entry); continue; //throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_NULLPOINTER, ME + ".recoverSubscriptions", "The secret sessionId was not found for session='" + sessionName.getAbsoluteName() + "'"); } // TODO remove the setting of client properties and invoke directly requestBroker.subscribe with subscribeQosServer.inhibitInitialUpdates(true); // also get the sessionInfo object from authenticate => eliminate sessionIds this.global.getAuthenticate().getXmlBlaster().subscribe(this.addressServer, sessionId, entry.getKey(), qosData.toXml()); } else { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME + ".recoverSubscriptions: the entry in the storage should be of type 'SubscribeEntry'but is of type'" + entries[i].getClass().getName() + "'"); } } } private void removeAssociatedSubscriptions(SessionInfo sessionInfo) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("removeAssociatedSubscriptions for session '" + sessionInfo.getId() + "'"); XmlBlasterException e = null; SubscriptionInfo[] subs = this.global.getRequestBroker().getClientSubscriptions().getSubscriptions(sessionInfo); for (int i=0; i < subs.length; i++) { try { if (log.isLoggable(Level.FINER)) log.finer("removeAssociatedSubscriptions for session '" + sessionInfo.getId() + "' subscription '" + subs[i].getId() + "'"); this.subscriptionRemove(new SubscriptionEvent(subs[i])); } catch (XmlBlasterException ex) { if (e == null) e = ex; log.severe("removeAssociatedSubscriptions: exception occured for session '" + sessionInfo.getId() + "' and subscriptions '" + subs[i].getId() + "' : ex: " + ex.getMessage()); } } // just throw the first exception encountered (if any) if (e != null) throw e; } /** * Initializes the plugin * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, org.xmlBlaster.util.plugin.PluginInfo) */ public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo) throws XmlBlasterException { synchronized (this.sync) { if (this.isOK) return; this.info = pluginInfo; this.global = (org.xmlBlaster.engine.ServerScope)glob; if (log.isLoggable(Level.FINER))
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -