📄 sessionpersistenceplugin.java
字号:
log.finer("init"); // init the storages QueuePropertyBase sessionProp = new SessionStoreProperty(this.global, this.global.getStrippedId()); if (sessionProp.getMaxEntries() > 0L) { String type = sessionProp.getType(); String version = sessionProp.getVersion(); this.sessionStorageId = new StorageId(Constants.RELATING_SESSION, this.global.getStrippedId() +"/" + this.info.getId()); this.sessionStore = this.global.getStoragePluginManager().getPlugin(type, version, this.sessionStorageId, sessionProp); } else { if (log.isLoggable(Level.FINE)) log.fine(Constants.RELATING_SESSION + " persistence for session is switched off with maxEntries=0"); } QueuePropertyBase subscribeProp = new SubscribeStoreProperty(this.global, this.global.getStrippedId()); if (subscribeProp.getMaxEntries() > 0L) { String type = subscribeProp.getType(); String version = subscribeProp.getVersion(); this.subscribeStorageId = new StorageId(Constants.RELATING_SUBSCRIBE, this.global.getStrippedId() +"/" + this.info.getId()); this.subscribeStore = this.global.getStoragePluginManager().getPlugin(type, version, this.subscribeStorageId, subscribeProp); } else if (log.isLoggable(Level.FINE)) log.fine(Constants.RELATING_SUBSCRIBE + " persistence for subscribe is switched off with maxEntries=0"); this.isOK = true; // register before having retreived the data since needed to fill info objects with persistenceId this.global.getRequestBroker().getAuthenticate().addClientListener(this); this.global.getRequestBroker().addSubscriptionListener(this); log.fine("Recovering Sessions"); HashMap sessionIds = recoverSessions(); log.fine("Recovering Subscriptions"); recoverSubscriptions(sessionIds); log.fine("Recovering of Subscriptions finished"); } //The topics restored from persistence didn't switch on the destroyTimeout to not diappear until we are finished this.global.getTopicAccessor().spanTopicDestroyTimeout(); } /** * returns the plugin type * @see org.xmlBlaster.util.plugin.I_Plugin#getType() */ public String getType() { if (this.info != null) return this.info.getType(); return null; } /** * returns the plugin version * @see org.xmlBlaster.util.plugin.I_Plugin#getVersion() */ public String getVersion() { if (this.info != null) return this.info.getVersion(); return null; } /** * Shutsdown the plugin * @see org.xmlBlaster.util.plugin.I_Plugin#shutdown() */ public void shutdown() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("shutdown"); synchronized (this.sync) { this.isOK = false; this.global.getRequestBroker().getAuthenticate().removeClientListener(this); this.global.getRequestBroker().removeSubscriptionListener(this); this.sessionStore.shutdown(); this.subscribeStore.shutdown(); } } /** * A new session is added, checks if it shall be persisted. */ private void addSession(SessionInfo sessionInfo) throws XmlBlasterException { ConnectQosData connectQosData = sessionInfo.getConnectQos().getData(); // Is transient? if (connectQosData.getPersistentProp() == null || !connectQosData.getPersistentProp().getValue()) return; // Avoid recursion if (sessionInfo.getConnectQos().isFromPersistenceRecovery()) return; if (sessionInfo.getSessionName().isPubSessionIdInternal()) { // negative pubSessionId? log.warning("To use persistent session/subscriptions you should login with a given publicSessionId, see http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.persistence.session.html"); } // Persist it long uniqueId = new Timestamp().getTimestamp(); SessionEntry entry = new SessionEntry(connectQosData.toXml(), uniqueId, connectQosData.size()); if (log.isLoggable(Level.FINE)) log.fine("addSession (persistent) for NEW uniqueId: '" + entry.getUniqueId() + "'"); sessionInfo.setPersistenceUniqueId(uniqueId); this.sessionStore.put(entry); } /** * * @see org.xmlBlaster.authentication.I_ClientListener#sessionAdded(org.xmlBlaster.authentication.ClientEvent) */ public void sessionAdded(ClientEvent e) throws XmlBlasterException { if (!this.isOK) throw new XmlBlasterException(this.global, ErrorCode.RESOURCE_UNAVAILABLE, ME + ".sessionAdded: invoked when plugin already shut down"); SessionInfo sessionInfo = e.getSessionInfo(); addSession(sessionInfo); } public void sessionRemoved(ClientEvent e) throws XmlBlasterException { } /** * * @see org.xmlBlaster.authentication.I_ClientListener#sessionRemoved(org.xmlBlaster.authentication.ClientEvent) */ public void sessionPreRemoved(ClientEvent e) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("sessionRemoved '" + e.getSessionInfo().getId() + "'"); if (!this.isOK) throw new XmlBlasterException(this.global, ErrorCode.RESOURCE_UNAVAILABLE, ME + ".sessionRemoved: invoked when plugin already shut down"); SessionInfo sessionInfo = e.getSessionInfo(); ConnectQosData connectQosData = sessionInfo.getConnectQos().getData(); if (!connectQosData.getPersistentProp().getValue()) return; removeAssociatedSubscriptions(sessionInfo); // TODO add a method I_Queue.removeRandom(long uniqueId) long uniqueId = sessionInfo.getPersistenceUniqueId(); if (log.isLoggable(Level.FINE)) log.fine("sessionRemoved (persistent) for uniqueId: '" + uniqueId + "'"); // String sessionId = getOriginalSessionId(connectQosData.getSessionQos().getSecretSessionId()); SessionEntry entry = new SessionEntry(connectQosData.toXml(), uniqueId, 0L); int num = this.sessionStore.remove(entry); if (num != 1) { XmlBlasterException ex = sessionInfo.getTransportConnectFail(); if (ex != null) log.fine("sessionRemoved (persistent) for uniqueId: '" + uniqueId + "' failed, entry not found."); else log.severe("sessionRemoved (persistent) for uniqueId: '" + uniqueId + "' failed, entry not found."); } } /** * This event is invoked even by child subscriptions. However only * parent subscriptions should be stored, so all child subscriptions are * ignored. * @see org.xmlBlaster.engine.I_SubscriptionListener#subscriptionAdd(org.xmlBlaster.engine.SubscriptionEvent) */ public void subscriptionAdd(SubscriptionEvent e) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("subscriptionAdd '" + e.getSubscriptionInfo().getId() + "'"); if (!this.isOK) throw new XmlBlasterException(this.global, ErrorCode.RESOURCE_UNAVAILABLE, ME + ".subscriptionAdded: invoked when plugin already shut down"); //Thread.dumpStack(); SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo(); KeyData data = subscriptionInfo.getKeyData(); // if (!(data instanceof QueryKeyData)) return; // this filters away child subscriptions if (subscriptionInfo.isCreatedByQuerySubscription()) return; // TODO add a method I_Queue.removeRandom(long uniqueId) QueryQosData subscribeQosData = subscriptionInfo.getQueryQosData(); if (log.isLoggable(Level.FINEST)) log.finest("subscriptionAdd: key='" + data.toXml() + "'"); if (subscribeQosData != null) if (log.isLoggable(Level.FINEST)) log.finest("subscriptionAdd: qos='" + subscribeQosData.toXml() + "'"); if (subscribeQosData == null || !subscribeQosData.isPersistent()) return; SessionInfo sessionInfo = subscriptionInfo.getSessionInfo(); if (!sessionInfo.getConnectQos().getData().isPersistent()) { sessionInfo.getConnectQos().getData().setPersistent(true); this.addSession(sessionInfo); } // is it a remote connect ? ClientProperty clientProperty = subscribeQosData.getClientProperty(Constants.PERSISTENCE_ID); if (clientProperty == null) { long uniqueId = new Timestamp().getTimestamp(); subscribeQosData.getClientProperties().put(Constants.PERSISTENCE_ID, new ClientProperty(Constants.PERSISTENCE_ID, "long", null, "" + uniqueId)); QueryKeyData subscribeKeyData = (QueryKeyData)data; // to be found when the client usubscribes after a server crash ... subscribeQosData.setSubscriptionId(subscriptionInfo.getSubscriptionId()); SubscribeEntry entry = new SubscribeEntry(subscribeKeyData.toXml(), subscribeQosData.toXml(), sessionInfo.getConnectQos().getSessionName().getAbsoluteName(), uniqueId, 0L); if (log.isLoggable(Level.FINE)) log.fine("subscriptionAdd: putting to persistence NEW entry '" + entry.getUniqueId() + "' key='" + subscribeKeyData.toXml() + "' qos='" + subscribeQosData.toXml() + "' secretSessionId='" + sessionInfo.getSecretSessionId() + "'"); subscriptionInfo.setPersistenceId(uniqueId); this.subscribeStore.put(entry); } else { // ... or from a recovery ? // TODO handle by recoverSubscriptions(..) // No remove: To avoid danger of looping we keep the marker (Marcel 2005-08-08) // subscribeQosData.getClientProperties().remove(Constants.PERSISTENCE_ID); long uniqueId = clientProperty.getLongValue(); if (log.isLoggable(Level.FINE)) log.fine("subscriptionAdd: filling OLD uniqueId into subscriptionInfo '" + uniqueId + "'"); subscriptionInfo.setPersistenceId(uniqueId); ClientProperty prop = subscribeQosData.getClientProperty(ORIGINAL_INITIAL_UPDATES); if (prop != null) { if (subscriptionInfo.getSubscribeQosServer() != null) { subscriptionInfo.getSubscribeQosServer().inhibitInitalUpdates(true); subscribeQosData.getClientProperties().remove(ORIGINAL_INITIAL_UPDATES); } } } } /** * * @see org.xmlBlaster.engine.I_SubscriptionListener#subscriptionRemove(org.xmlBlaster.engine.SubscriptionEvent) */ public void subscriptionRemove(SubscriptionEvent e) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("subscriptionRemove '" + e.getSubscriptionInfo().getId() + "'"); if (!this.isOK) throw new XmlBlasterException(this.global, ErrorCode.RESOURCE_UNAVAILABLE, ME + ".subscriptionRemove: invoked when plugin already shut down"); SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo(); KeyData keyData = subscriptionInfo.getKeyData(); if (!(keyData instanceof QueryKeyData)) { if (log.isLoggable(Level.FINE)) log.fine("subscriptionRemove keyData wrong instance'"); return; } if (subscriptionInfo.getPersistenceId() < 1L) { return; } // TODO add a method I_Queue.removeRandom(long uniqueId) QueryQosData qosData = subscriptionInfo.getQueryQosData(); if (qosData == null || qosData.getPersistentProp() == null || !qosData.getPersistentProp().getValue()) { return; } this.subscribeStore.remove(subscriptionInfo.getPersistenceId()); /* SubscribeEntry entry = new SubscribeEntry(keyData.toXml(), qosData.toXml(), subscriptionInfo.getSessionInfo().getConnectQos().getSessionName().getAbsoluteName(), subscriptionInfo.getPersistenceId(), 0L); if (log.isLoggable(Level.FINE)) log.trace(ME, "subscriptionRemove: removing from persistence entry '" + entry.getUniqueId() + "' secretSessionId='" + subscriptionInfo.getSessionInfo().getConnectQos().getSessionName().getAbsoluteName()); this.subscribeStore.remove(entry); */ } /** * does nothing */ public void subjectAdded(ClientEvent e) throws XmlBlasterException { } /** * does nothing */ public void subjectRemoved(ClientEvent e) throws XmlBlasterException { } /** * @see org.xmlBlaster.engine.I_SubscriptionListener#getPriority() */ public Integer getPriority() { return PRIO_10; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -