📄 subjectinfo.java
字号:
determineNodeId = false; if (this.subjectName.getLoginName().startsWith(org.xmlBlaster.engine.RequestBroker.internalLoginNamePrefix)) return null; // don't check for internal logins if (glob.isClusterManagerReady()) { // Is the client a well known, configured cluster node? ClusterNode clusterNode = glob.getClusterManager().getClusterNode(this.subjectName.getLoginName()); // is null if not found if (clusterNode != null) { nodeId = clusterNode.getNodeId(); } else { // Does the client send a tag which marks it as a cluster node? SessionInfo ses = getFirstSession(); if (ses != null) { if (ses.getConnectQos().isClusterNode()) nodeId = new NodeId(this.subjectName.getLoginName()); } } } } return nodeId; } /** * @return true if this client is an xmlBlaster cluster node */ public boolean isCluster() throws XmlBlasterException { return getNodeId() != null; } /** * Allows to overwrite queue property. * <p> * It will be only written if prop!= null. * </p> * @param prop CbQueueProperty transports subject queue property as well * TODO: we should have a clear named SubjectQueueProperty */ public final void setSubjectQueueProperty(CbQueueProperty prop) throws XmlBlasterException { CbQueueProperty origProp = (CbQueueProperty)this.subjectQueue.getProperties(); if (origProp == null) { log.severe(ME+": Existing subject queue properties are null"); return; } if (prop == null) prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, glob.getId()); this.lock.lock(); try { if (prop.getTypeVersion().equals(origProp.getTypeVersion())) { this.subjectQueue.setProperties(prop); return; } // TODO: Extend CACHE queue to handle reconfigurations hidden so we don't need to do anything here if (!this.subjectQueue.isTransient()) { I_Queue newQueue = createSubjectQueue(prop); if (newQueue.isTransient()) { log.info(ME+": Reconfiguring subject queue: Copying " + this.subjectQueue.getNumOfEntries() + " entries from old " + origProp.getType() + " queue to " + prop.getTypeVersion() + " queue"); java.util.ArrayList list = null; int lastSize = -99; while (this.subjectQueue.getNumOfEntries() > 0) { try { list = this.subjectQueue.peek(-1, -1); if (this.subjectQueue.getNumOfEntries() == lastSize) { log.severe(ME+": PANIC: " + this.subjectQueue.getNumOfEntries() + " entries from old queue " + this.subjectQueue.getStorageId() + " can't be copied, giving up!"); break; } lastSize = (int)this.subjectQueue.getNumOfEntries(); } catch (XmlBlasterException e) { log.severe(ME+": PANIC: Can't copy from subject queue '" + this.subjectQueue.getStorageId() + "' with " + this.subjectQueue.getNumOfEntries() + " entries: " + e.getMessage()); e.printStackTrace(); continue; } MsgQueueEntry[] queueEntries = (MsgQueueEntry[])list.toArray(new MsgQueueEntry[list.size()]); // On error we send them as dead letters, as we don't know what to do with them in our holdback queue try { newQueue.put(queueEntries, false); } catch (XmlBlasterException e) { log.warning(ME+": flushHoldbackQueue() failed: " + e.getMessage()); // errorCode == "ONOVERFLOW" getMsgErrorHandler().handleError(new MsgErrorInfo(glob, queueEntries, null, e)); } try { long num = this.subjectQueue.remove(list.size(), -1); if (num != list.size()) { log.severe(ME+": PANIC: Expected to remove from subject queue '" + this.subjectQueue.getStorageId() + "' with " + this.subjectQueue.getNumOfEntries() + " entries " + list.size() + " entries, but only " + num + " where removed"); } } catch (XmlBlasterException e) { log.severe(ME+": PANIC: Expected to remove from subject queue '" + this.subjectQueue.getStorageId() + "' with " + this.subjectQueue.getNumOfEntries() + " entries " + list.size() + " entries: " + e.getMessage()); } } this.subjectQueue.clear(); this.subjectQueue.shutdown(); this.subjectQueue = newQueue; return; } } } // synchronized finally { this.lock.release(); } log.severe(ME+": Can't reconfigure subject queue type '" + origProp.getTypeVersion() + "' to '" + prop.getTypeVersion() + "'"); return; //throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME+".setSubjectQueueProperty()", "Can't reconfigure subject queue type '" + origProps.getTypeVersion() + "' to '" + props.getTypeVersion() + "'"); } /** * This queue holds all messages which where addressed to destination loginName * @return never null */ public I_Queue getSubjectQueue() { return this.subjectQueue; } /** * Subject specific informations from the security framework * @return null if created without login (for example with a PtP message) */ public I_Subject getSecurityCtx() { return this.securityCtx; } public void setSecurityCtx(I_Subject securityCtx) { this.securityCtx = securityCtx; } /* * Check if this subject is permitted to do something * <p/> * @param String The action the user tries to perfrom * @param String whereon the user tries to perform the action * * EXAMPLE: * isAuthorized("PUBLISH", "thisIsAMessageKey"); * * The above line checks if this subject is permitted to >>publish<< * a message under the key >>thisIsAMessageKey<< * * Known action keys: * PUBLISH, SUBSCRIBE, GET, ERASE, public boolean isAuthorized(MethodName actionKey, String key) { if (this.securityCtx == null) { log.warning("No authorization for '" + actionKey + "' and msg=" + key); return false; } return this.securityCtx.isAuthorized(actionKey, key); }*/ /** * PtP mode: If the qos is set to forceQueuing the message is queued. * @param msgUnit The message. Only called in sync mode on publish (TopicHandler) * @param destination The Destination object of the receiver */ public final void queueMessage(MsgQueueEntry entry) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME+": Queuing message for destination " + entry.getReceiver()); if (log.isLoggable(Level.FINEST)) log.finest(ME+": Putting PtP message to queue: " + entry.toXml("")); this.subjectQueue.put(entry, I_Queue.USE_PUT_INTERCEPTOR); //forwardToSessionQueue(); // returns here with no waiting ... this.glob.getSubjectInfoShuffler().shuffle(this); } /** * Forward entries in subject queue to all session queues, * if no entries are available * we return 0 without doing anything. * @return number of messages taken from queue and forwarded */ public final long forwardToSessionQueue() { if (getSessions().length < 1 || this.subjectQueue.getNumOfEntries() < 1) return 0; long numMsgs = 0; MsgQueueUpdateEntry entry = null; if (log.isLoggable(Level.FINE)) log.fine(ME+": Trying to forward " + this.subjectQueue.getNumOfEntries() + " messages in subject queue to session queue ..."); while (true) { try { try { entry = (MsgQueueUpdateEntry)this.subjectQueue.peek(); // non-blocking } catch (Throwable ex) { log.severe(ME+": Can't get entry from subject queue when trying to forward it to session queue " + ex.getMessage()); // TODO toDead from the subject may be necessary to avoid looping break; } if (entry == null) break; if (entry.isDestroyed()) { log.info(ME+": Message " + entry.getLogId() + " is destroyed, ignoring it"); this.subjectQueue.removeRandom(entry); // Remove the destroyed entry } else { int countForwarded = forwardToSessionQueue(entry); if (countForwarded > 0) { this.subjectQueue.removeRandom(entry); // Remove the forwarded entry (blocking) numMsgs++; } else if (countForwarded == -1) { // There are sessions but they don't want PtP break; } } } catch(Throwable e) { MsgQueueEntry[] msgQueueEntries = new MsgQueueEntry[] { entry }; MsgErrorInfo msgErrorInfo = new MsgErrorInfo(glob, msgQueueEntries, null, e); // this.subjectQueue getMsgErrorHandler().handleError(msgErrorInfo); try { this.subjectQueue.removeRandom(entry); // Remove the entry } catch (XmlBlasterException ex) { log.severe(ME+": Can't empty queue when removing '" + entry.getLogId() + "' " + ex.getMessage()); // TODO toDead from the subject may be necessary to avoid looping break; } } } if (log.isLoggable(Level.FINE)) log.fine(ME+": Forwarded " + numMsgs + " messages from subject queue to session queue"); if (!isLoggedIn()) { // Check if we can shutdown now shutdown(false, false); } return numMsgs; } /** * Forward the given message to session queue. * @return Number of session queues this message is forwarded to. * -1 if not delivered because the available sessions don't want PtP * @throws XmlBlasterException if not delivered at all. */ private final int forwardToSessionQueue(MsgQueueEntry entry) throws XmlBlasterException { if (getSessions().length < 1) return -1; int countForwarded = 0; SessionName destination = entry.getReceiver(); if (destination.isSession()) { // send to a specific session, it should never happen to have such messages in the subject queue ... String tmp = "Can't forward msg " + entry.getLogId() + " from " + this.subjectQueue.getStorageId() + " size=" + this.subjectQueue.getNumOfEntries() + " to unknown session '" + entry.getReceiver().getAbsoluteName() + "'"; log.warning(ME+": "+tmp); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, tmp); } // ... or send to ALL sessions SessionInfo[] sessions = getSessions(); for (int i=0; i<sessions.length; i++) { SessionInfo sessionInfo = sessions[i]; I_Queue sessionQueue = sessionInfo.getSessionQueue(); if (sessionInfo.getConnectQos().isPtpAllowed() && sessionInfo.hasCallback() && sessionQueue != null) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Forwarding msg " + entry.getLogId() + " from " + this.subjectQueue.getStorageId() + " size=" + this.subjectQueue.getNumOfEntries() + " to session queue " + sessionQueue.getStorageId() + " size=" + sessionQueue.getNumOfEntries() + " ..."); try { MsgQueueUpdateEntry entryCb = new MsgQueueUpdateEntry((MsgQueueUpdateEntry)entry, sessionQueue.getStorageId()); sessionInfo.queueMessage(entryCb); countForwarded++; } catch (XmlBlasterException e) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Can't forward message from subject queue '" + this.subjectQueue.getStorageId() + "' to session '" + sessionInfo.getId() + "', we keep it in the subject queue: " + e.getMessage()); } catch (Throwable e) { e.printStackTrace(); log.warning(ME+": Can't forward message from subject queue '" + this.subjectQueue.getStorageId() + "' to session '" + sessionInfo.getId() + "', we keep it in the subject queue: " + e.toString()); } } } if (countForwarded > 0) { return countForwarded; } return -1; } public final I_MsgErrorHandler getMsgErrorHandler() { if (this.msgErrorHandler == null) { this.lock.lock(); try { if (this.msgErrorHandler == null) { log.severe(ME+": INTERNAL: Support for MsgErrorHandler is not implemented"); this.msgErrorHandler = new MsgErrorHandler(glob, null); } } finally { this.lock.release(); } } return this.msgErrorHandler; } /** * Is the client currently logged in? * @return true yes * false client is not on line */ public final boolean isLoggedIn() { synchronized (this.sessionMap) { return this.sessionMap.size() > 0; } //return getSessions().length > 0; } /** * Access the collection containing all SessionInfo objects of this user. */ public final SessionInfo[] getSessions() { if (this.sessionArrCache == null) { synchronized (this.sessionMap) { if (this.sessionArrCache == null) { this.sessionArrCache = (SessionInfo[])this.sessionMap.values().toArray(new SessionInfo[this.sessionMap.size()]); } } } return this.sessionArrCache; } /** * Find a session by its absolute name. * @param absoluteName e.g. "/node/heron/client/joe/2" * @return SessionInfo or null if not found */ public final SessionInfo getSessionByAbsoluteName(String absoluteName) { synchronized (this.sessionMap) { return (SessionInfo)this.sessionMap.get(absoluteName); } } /** * Find a session by its public session ID. * @param sessionName * @return SessionInfo or null if not found */ public final SessionInfo getSession(SessionName sessionName) { synchronized (this.sessionMap) { return (SessionInfo)this.sessionMap.get(sessionName.getAbsoluteName()); } } public final SessionInfo getFirstSession() { SessionInfo[] sessions = getSessions(); return (sessions.length > 0) ? sessions[0] : null; } /** * Get the callback addresses for this subjectQueue, every session * callback may have decided to receive subject messages */ public final CallbackAddress[] getCallbackAddresses() { if (this.callbackAddressCache == null) { SessionInfo[] sessions = getSessions(); Set set = new HashSet(); for (int i=0; i<sessions.length; i++) { SessionInfo ses = sessions[i]; if (ses.hasCallback()) { CallbackAddress[] arr = ((CbQueueProperty)ses.getSessionQueue().getProperties()).getCallbackAddresses(); for (int ii=0; arr!=null && ii<arr.length; ii++) { if (arr[ii].useForSubjectQueue() == true) set.add(arr[ii]); } } } this.callbackAddressCache = (CallbackAddress[])set.toArray(new CallbackAddress[set.size()]);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -