⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sessionpersistenceplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      SessionPersistencePlugin.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.HashMap;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.authentication.ClientEvent;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.qos.AddressServer;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.engine.qos.ConnectReturnQosServer;import org.xmlBlaster.engine.queuemsg.SessionEntry;import org.xmlBlaster.engine.queuemsg.SubscribeEntry;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.key.KeyData;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.ConnectQosData;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.qos.storage.SubscribeStoreProperty;import org.xmlBlaster.util.qos.storage.SessionStoreProperty;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_Entry;/** * SessionPersistencePlugin provides the persistent storage for both sessions * and subscriptions. * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class SessionPersistencePlugin implements I_SessionPersistencePlugin {   private final static String ME = "SessionPersistencePlugin";   /** when recovering all subscriptions must be 'noInitialUpdate' because otherwise    * we would get messages which we already got in the past    */   private final static String ORIGINAL_INITIAL_UPDATES = "__originalInitialUpdates";   private PluginInfo info;   private ServerScope global;   private static Logger log = Logger.getLogger(SessionPersistencePlugin.class.getName());   /** flag indicating the status: true means initialized and not yet shut down) */   private boolean isOK;   private I_Map sessionStore;   private I_Map subscribeStore;   private StorageId sessionStorageId;   private StorageId subscribeStorageId;   private AddressServer addressServer;   private Object sync = new Object();   private int duplicateCounter;   private int errorCounter;   /**    *    * @return hash map containing the secret sessionId of the entries recovered    *         as values and as keys the corresponding absolute name for the session (String)    * @throws XmlBlasterException    */   private HashMap recoverSessions() throws XmlBlasterException {      I_MapEntry[] entries = this.sessionStore.getAll(null);      HashMap sessionIds = new HashMap();      for (int i=0; i < entries.length; i++) {    	 try {	         if (entries[i] instanceof SessionEntry) {	            // do connect	            SessionEntry entry = (SessionEntry)entries[i];	            ConnectQosData data = this.global.getConnectQosFactory().readObject(entry.getQos());	            this.addressServer = new AddressServer(this.global, "NATIVE", this.global.getId(), (java.util.Properties)null);	            ConnectQosServer qos = new ConnectQosServer(this.global, data);	            qos.isFromPersistenceRecovery(true);	            qos.setPersistenceUniqueId(entry.getUniqueId());	            qos.setAddressServer(this.addressServer);	            SessionName sessionName = data.getSessionName();	            String sessionId = data.getSessionQos().getSecretSessionId();	            sessionIds.put(sessionName.getAbsoluteName(), sessionId);	            if (log.isLoggable(Level.FINE))	               log.fine("recoverSessions: store in map session='" + sessionName.getAbsoluteName() + "' has secret sessionId='" + sessionId + "' and persistenceUniqueId=" + entry.getUniqueId());	            // if (log.isLoggable(Level.FINE)) log.trace(ME, "recoverSessions: session: '" + data.getSessionName() + "' secretSessionId='" + qos.getSessionQos().getSecretSessionId() + "' qos='" + qos.toXml() + "'");	            ConnectReturnQosServer ret = this.global.getAuthenticate().connect(this.addressServer, qos);	            if (log.isLoggable(Level.FINEST))	               log.finest("recoverSessions: return of connect: returnConnectQos='" + ret.toXml() + "'");	         }	         else {	            throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME + ".recoverSessions: the entry in the storage should be of type 'SessionEntry' but is of type'" + entries[i].getClass().getName() + "'");	         }    	  }	      catch (XmlBlasterException e) { // authenticate password changed? TODO	    	  log.warning("Loading session from persistency failed: " + e.toString());	      }      }      return sessionIds;   }   /**    * When recovering due to a run level change (without shutting down the    * application) this will not work.    *    * @throws XmlBlasterException    */   private void recoverSubscriptions(final HashMap sessionIds) throws XmlBlasterException {      {         boolean checkForDuplicateSubscriptions = this.global.getProperty().get("xmlBlaster/checkForDuplicateSubscriptions", false);         if (checkForDuplicateSubscriptions) {            duplicateCounter = 0;            errorCounter = 0;            final java.util.Map duplicates = new java.util.TreeMap();            /*I_MapEntry[] results = */this.subscribeStore.getAll(new I_EntryFilter() {               public I_Entry intercept(I_Entry entry, I_Storage storage) {                  if (storage.isTransient()) return null;                  try {                     SubscribeEntry subscribeEntry = (SubscribeEntry)entry;                     //QueryKeyData keyData = queryKeyFactory.readObject(subscribeEntry.getKey());                     QueryQosData qosData = global.getQueryQosFactory().readObject(subscribeEntry.getQos());                     //String key = keyData.getOid() + qosData.getSender().getAbsoluteName();                     SessionName sessionName = new SessionName(global, subscribeEntry.getSessionName());                     Object found = sessionIds.get(sessionName.getAbsoluteName());                     if (found == null) {                        if (errorCounter == 0) {                           log.warning("Ignoring invalid entry '" + sessionName.getAbsoluteName() + "' as user is not known");                        }                        errorCounter++;                        return null;                     }                     String key = qosData.getSubscriptionId();                     if (log.isLoggable(Level.FINE))                        log.fine("Cleanup of duplicate subscriptions, key=" + key);                     if (duplicates.containsKey(key)) {                        if (duplicateCounter == 0)                           log.warning("Cleanup of duplicate subscriptions, this may take a while, please wait ...");                        duplicateCounter++;                        //log.warn(ME, "Removing duplicate subscription '" + key + "' oid=" + keyData.getOid());                        //subscribeStore.remove(subscribeEntry);                     }                     else {                        duplicates.put(key, subscribeEntry);                     }                  }                  catch (XmlBlasterException e) {                     log.severe("Ignoring unexpected problem in checkForDuplicateSubscriptions :" + e.toString());                  }                  return null;               }            });            if (duplicateCounter > 0) {               this.subscribeStore.clear();               if (this.subscribeStore.getNumOfEntries() > 0)                     log.severe("Internal prpblem with checkForDuplicateSubscriptions");               java.util.Iterator it = duplicates.keySet().iterator();               while (it.hasNext()) {                  this.subscribeStore.put((I_MapEntry)duplicates.get(it.next()));               }               log.warning("Removed " + (duplicateCounter-duplicates.size()) + " identical subscriptions, keeping " + duplicates.size() + ". Ignored " + errorCounter + " invalid subscriptions as no session was found");            }         }      }      I_MapEntry[] entries = this.subscribeStore.getAll(null);      for (int i=0; i < entries.length; i++) {         if (entries[i] instanceof SubscribeEntry) {            // do connect            SubscribeEntry entry = (SubscribeEntry)entries[i];            String qos = entry.getQos();            QueryQosData qosData = global.getQueryQosFactory().readObject(qos);            ClientProperty clientProperty = qosData.getClientProperty(Constants.PERSISTENCE_ID);            if (clientProperty == null) {               log.severe("SubscribeQos with missing " + Constants.PERSISTENCE_ID + ": " + qosData.toXml());               long uniqueId = new Timestamp().getTimestamp();               qosData.getClientProperties().put(Constants.PERSISTENCE_ID, new ClientProperty(Constants.PERSISTENCE_ID, "long", null, "" + uniqueId));            }            boolean initialUpdates = qosData.getInitialUpdateProp().getValue();            if (initialUpdates) {               qosData.getClientProperties().put(ORIGINAL_INITIAL_UPDATES, new ClientProperty(ORIGINAL_INITIAL_UPDATES, "boolean", null, "true"));            }            SessionName sessionName = new SessionName(this.global, entry.getSessionName());            String sessionId = (String)sessionIds.get(sessionName.getAbsoluteName());            if (sessionId == null) {               log.severe("The secret sessionId was not found for session='" + sessionName.getAbsoluteName() + "', removing persistent subscription " + entry.getLogId());               this.subscribeStore.remove(entry);               continue;               //throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_NULLPOINTER, ME + ".recoverSubscriptions", "The secret sessionId was not found for session='" + sessionName.getAbsoluteName() + "'");            }            // TODO remove the setting of client properties and invoke directly requestBroker.subscribe with subscribeQosServer.inhibitInitialUpdates(true);            // also get the sessionInfo object from authenticate => eliminate sessionIds            this.global.getAuthenticate().getXmlBlaster().subscribe(this.addressServer, sessionId, entry.getKey(), qosData.toXml());         }         else {            throw new XmlBlasterException(this.global, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME + ".recoverSubscriptions: the entry in the storage should be of type 'SubscribeEntry'but is of type'" + entries[i].getClass().getName() + "'");         }      }   }   private void removeAssociatedSubscriptions(SessionInfo sessionInfo)      throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("removeAssociatedSubscriptions for session '" + sessionInfo.getId() + "'");      XmlBlasterException e = null;      SubscriptionInfo[] subs = this.global.getRequestBroker().getClientSubscriptions().getSubscriptions(sessionInfo);      for (int i=0; i < subs.length; i++) {         try {            if (log.isLoggable(Level.FINER)) log.finer("removeAssociatedSubscriptions for session '" + sessionInfo.getId() + "' subscription '" + subs[i].getId() + "'");            this.subscriptionRemove(new SubscriptionEvent(subs[i]));         }         catch (XmlBlasterException ex) {            if (e == null) e = ex;            log.severe("removeAssociatedSubscriptions: exception occured for session '" + sessionInfo.getId() + "' and subscriptions '" + subs[i].getId() + "' : ex: " + ex.getMessage());         }      }      // just throw the first exception encountered (if any)      if (e != null) throw e;   }   /**    * Initializes the plugin    * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, org.xmlBlaster.util.plugin.PluginInfo)    */   public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo)      throws XmlBlasterException {      synchronized (this.sync) {         if (this.isOK) return;         this.info = pluginInfo;         this.global = (org.xmlBlaster.engine.ServerScope)glob;         if (log.isLoggable(Level.FINER))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -