⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 subjectinfo.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
         determineNodeId = false;         if (this.subjectName.getLoginName().startsWith(org.xmlBlaster.engine.RequestBroker.internalLoginNamePrefix))            return null; // don't check for internal logins         if (glob.isClusterManagerReady()) {            // Is the client a well known, configured cluster node?            ClusterNode clusterNode = glob.getClusterManager().getClusterNode(this.subjectName.getLoginName()); // is null if not found            if (clusterNode != null) {               nodeId = clusterNode.getNodeId();            }            else {               // Does the client send a tag which marks it as a cluster node?               SessionInfo ses = getFirstSession();               if (ses != null) {                  if (ses.getConnectQos().isClusterNode())                     nodeId = new NodeId(this.subjectName.getLoginName());               }            }         }      }      return nodeId;   }   /**    * @return true if this client is an xmlBlaster cluster node    */   public boolean isCluster() throws XmlBlasterException {      return getNodeId() != null;   }   /**    * Allows to overwrite queue property.    * <p>    * It will be only written if prop!= null.    * </p>    * @param prop CbQueueProperty transports subject queue property as well    *        TODO: we should have a clear named SubjectQueueProperty    */   public final void setSubjectQueueProperty(CbQueueProperty prop) throws XmlBlasterException {      CbQueueProperty origProp = (CbQueueProperty)this.subjectQueue.getProperties();      if (origProp == null) {         log.severe(ME+": Existing subject queue properties are null");         return;      }      if (prop == null) prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, glob.getId());      this.lock.lock();      try {         if (prop.getTypeVersion().equals(origProp.getTypeVersion())) {            this.subjectQueue.setProperties(prop);            return;         }         // TODO: Extend CACHE queue to handle reconfigurations hidden so we don't need to do anything here         if (!this.subjectQueue.isTransient()) {            I_Queue newQueue = createSubjectQueue(prop);            if (newQueue.isTransient()) {               log.info(ME+": Reconfiguring subject queue: Copying " + this.subjectQueue.getNumOfEntries() + " entries from old " + origProp.getType() + " queue to " + prop.getTypeVersion() + " queue");               java.util.ArrayList list = null;               int lastSize = -99;               while (this.subjectQueue.getNumOfEntries() > 0) {                  try {                     list = this.subjectQueue.peek(-1, -1);                     if (this.subjectQueue.getNumOfEntries() == lastSize) {                        log.severe(ME+": PANIC: " + this.subjectQueue.getNumOfEntries() + " entries from old queue " + this.subjectQueue.getStorageId() + " can't be copied, giving up!");                        break;                     }                     lastSize = (int)this.subjectQueue.getNumOfEntries();                  }                  catch (XmlBlasterException e) {                     log.severe(ME+": PANIC: Can't copy from subject queue '" + this.subjectQueue.getStorageId() + "' with " + this.subjectQueue.getNumOfEntries() + " entries: " + e.getMessage());                     e.printStackTrace();                     continue;                  }                  MsgQueueEntry[] queueEntries = (MsgQueueEntry[])list.toArray(new MsgQueueEntry[list.size()]);                  // On error we send them as dead letters, as we don't know what to do with them in our holdback queue                  try {                     newQueue.put(queueEntries, false);                  }                  catch (XmlBlasterException e) {                     log.warning(ME+": flushHoldbackQueue() failed: " + e.getMessage());                     // errorCode == "ONOVERFLOW"                     getMsgErrorHandler().handleError(new MsgErrorInfo(glob, queueEntries, null, e));                  }                  try {                     long num = this.subjectQueue.remove(list.size(), -1);                     if (num != list.size()) {                        log.severe(ME+": PANIC: Expected to remove from subject queue '" + this.subjectQueue.getStorageId() + "' with " + this.subjectQueue.getNumOfEntries() + " entries " + list.size() + " entries, but only " + num + " where removed");                     }                  }                  catch (XmlBlasterException e) {                     log.severe(ME+": PANIC: Expected to remove from subject queue '" + this.subjectQueue.getStorageId() + "' with " + this.subjectQueue.getNumOfEntries() + " entries " + list.size() + " entries: " + e.getMessage());                  }               }               this.subjectQueue.clear();               this.subjectQueue.shutdown();               this.subjectQueue = newQueue;               return;            }         }      } // synchronized      finally {         this.lock.release();      }      log.severe(ME+": Can't reconfigure subject queue type '" + origProp.getTypeVersion() + "' to '" + prop.getTypeVersion() + "'");      return;      //throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME+".setSubjectQueueProperty()", "Can't reconfigure subject queue type '" + origProps.getTypeVersion() + "' to '" + props.getTypeVersion() + "'");   }   /**    * This queue holds all messages which where addressed to destination loginName    * @return never null    */   public I_Queue getSubjectQueue() {      return this.subjectQueue;   }   /**    * Subject specific informations from the security framework    * @return null if created without login (for example with a PtP message)    */   public I_Subject getSecurityCtx() {      return this.securityCtx;   }   public void setSecurityCtx(I_Subject securityCtx) {      this.securityCtx = securityCtx;   }   /*    * Check if this subject is permitted to do something    * <p/>    * @param String The action the user tries to perfrom    * @param String whereon the user tries to perform the action    *    * EXAMPLE:    *    isAuthorized("PUBLISH", "thisIsAMessageKey");    *    * The above line checks if this subject is permitted to >>publish<<    * a message under the key >>thisIsAMessageKey<<    *    * Known action keys:    *    PUBLISH, SUBSCRIBE, GET, ERASE,   public boolean isAuthorized(MethodName actionKey, String key) {      if (this.securityCtx == null) {         log.warning("No authorization for '" + actionKey + "' and msg=" + key);         return false;      }      return this.securityCtx.isAuthorized(actionKey, key);   }*/   /**    * PtP mode: If the qos is set to forceQueuing the message is queued.    * @param msgUnit The message. Only called in sync mode on publish (TopicHandler)    * @param destination The Destination object of the receiver    */   public final void queueMessage(MsgQueueEntry entry) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Queuing message for destination " + entry.getReceiver());      if (log.isLoggable(Level.FINEST)) log.finest(ME+": Putting PtP message to queue: " + entry.toXml(""));      this.subjectQueue.put(entry, I_Queue.USE_PUT_INTERCEPTOR);      //forwardToSessionQueue();      // returns here with no waiting ...      this.glob.getSubjectInfoShuffler().shuffle(this);   }   /**    * Forward entries in subject queue to all session queues,    * if no entries are available    * we return 0 without doing anything.    * @return number of messages taken from queue and forwarded    */   public final long forwardToSessionQueue() {      if (getSessions().length < 1 || this.subjectQueue.getNumOfEntries() < 1) return 0;      long numMsgs = 0;      MsgQueueUpdateEntry entry = null;      if (log.isLoggable(Level.FINE)) log.fine(ME+": Trying to forward " + this.subjectQueue.getNumOfEntries() + " messages in subject queue to session queue ...");      while (true) {         try {            try {               entry = (MsgQueueUpdateEntry)this.subjectQueue.peek(); // non-blocking            }            catch (Throwable ex) {               log.severe(ME+": Can't get entry from subject queue when trying to forward it to session queue " + ex.getMessage());               // TODO toDead from the subject may be necessary to avoid looping               break;            }            if (entry == null)               break;            if (entry.isDestroyed()) {               log.info(ME+": Message " + entry.getLogId() + " is destroyed, ignoring it");               this.subjectQueue.removeRandom(entry); // Remove the destroyed entry            }            else {               int countForwarded = forwardToSessionQueue(entry);               if (countForwarded > 0) {                  this.subjectQueue.removeRandom(entry); // Remove the forwarded entry (blocking)                  numMsgs++;               }               else if (countForwarded == -1) { // There are sessions but they don't want PtP                  break;               }            }         }         catch(Throwable e) {            MsgQueueEntry[] msgQueueEntries = new MsgQueueEntry[] { entry };            MsgErrorInfo msgErrorInfo = new MsgErrorInfo(glob, msgQueueEntries, null, e);  // this.subjectQueue            getMsgErrorHandler().handleError(msgErrorInfo);            try {               this.subjectQueue.removeRandom(entry); // Remove the entry            }            catch (XmlBlasterException ex) {               log.severe(ME+": Can't empty queue when removing '" + entry.getLogId() + "' " + ex.getMessage());               // TODO toDead from the subject may be necessary to avoid looping               break;            }         }      }      if (log.isLoggable(Level.FINE)) log.fine(ME+": Forwarded " + numMsgs + " messages from subject queue to session queue");      if (!isLoggedIn()) { // Check if we can shutdown now         shutdown(false, false);      }      return numMsgs;   }   /**    * Forward the given message to session queue.    * @return Number of session queues this message is forwarded to.    *         -1 if not delivered because the available sessions don't want PtP    * @throws XmlBlasterException if not delivered at all.    */   private final int forwardToSessionQueue(MsgQueueEntry entry) throws XmlBlasterException {      if (getSessions().length < 1) return -1;      int countForwarded = 0;      SessionName destination = entry.getReceiver();      if (destination.isSession()) {         // send to a specific session, it should never happen to have such messages in the subject queue ...         String tmp = "Can't forward msg " + entry.getLogId() + " from " +                      this.subjectQueue.getStorageId() + " size=" +                      this.subjectQueue.getNumOfEntries() + " to unknown session '" +                      entry.getReceiver().getAbsoluteName() + "'";         log.warning(ME+": "+tmp);         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, tmp);      }      // ... or send to ALL sessions      SessionInfo[] sessions = getSessions();      for (int i=0; i<sessions.length; i++) {         SessionInfo sessionInfo = sessions[i];         I_Queue sessionQueue = sessionInfo.getSessionQueue();         if (sessionInfo.getConnectQos().isPtpAllowed() && sessionInfo.hasCallback() && sessionQueue != null) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Forwarding msg " + entry.getLogId() + " from " +                          this.subjectQueue.getStorageId() + " size=" + this.subjectQueue.getNumOfEntries() +                          " to session queue " + sessionQueue.getStorageId() +                          " size=" + sessionQueue.getNumOfEntries() + " ...");            try {               MsgQueueUpdateEntry entryCb = new MsgQueueUpdateEntry((MsgQueueUpdateEntry)entry, sessionQueue.getStorageId());               sessionInfo.queueMessage(entryCb);               countForwarded++;            }            catch (XmlBlasterException e) {               if (log.isLoggable(Level.FINE)) log.fine(ME+": Can't forward message from subject queue '" + this.subjectQueue.getStorageId() + "' to session '" + sessionInfo.getId() + "', we keep it in the subject queue: " + e.getMessage());            }            catch (Throwable e) {               e.printStackTrace();               log.warning(ME+": Can't forward message from subject queue '" + this.subjectQueue.getStorageId() + "' to session '" + sessionInfo.getId() + "', we keep it in the subject queue: " + e.toString());            }         }      }      if (countForwarded > 0) {         return countForwarded;      }      return -1;   }   public final I_MsgErrorHandler getMsgErrorHandler() {      if (this.msgErrorHandler == null) {         this.lock.lock();         try {            if (this.msgErrorHandler == null) {               log.severe(ME+": INTERNAL: Support for MsgErrorHandler is not implemented");               this.msgErrorHandler = new MsgErrorHandler(glob, null);            }         }         finally {            this.lock.release();         }      }      return this.msgErrorHandler;   }   /**    * Is the client currently logged in?    * @return true yes    *         false client is not on line    */   public final boolean isLoggedIn() {      synchronized (this.sessionMap) {         return this.sessionMap.size() > 0;      }      //return getSessions().length > 0;   }   /**    * Access the collection containing all SessionInfo objects of this user.    */   public final SessionInfo[] getSessions() {      if (this.sessionArrCache == null) {         synchronized (this.sessionMap) {            if (this.sessionArrCache == null) {               this.sessionArrCache = (SessionInfo[])this.sessionMap.values().toArray(new SessionInfo[this.sessionMap.size()]);            }         }      }      return this.sessionArrCache;   }   /**    * Find a session by its absolute name.    * @param absoluteName e.g. "/node/heron/client/joe/2"    * @return SessionInfo or null if not found    */   public final SessionInfo getSessionByAbsoluteName(String absoluteName) {      synchronized (this.sessionMap) {         return (SessionInfo)this.sessionMap.get(absoluteName);      }   }   /**    * Find a session by its public session ID.    * @param sessionName    * @return SessionInfo or null if not found    */   public final SessionInfo getSession(SessionName sessionName) {      synchronized (this.sessionMap) {         return (SessionInfo)this.sessionMap.get(sessionName.getAbsoluteName());      }   }   public final SessionInfo getFirstSession() {      SessionInfo[] sessions = getSessions();      return (sessions.length > 0) ? sessions[0] : null;   }   /**    * Get the callback addresses for this subjectQueue, every session    * callback may have decided to receive subject messages    */   public final CallbackAddress[] getCallbackAddresses() {      if (this.callbackAddressCache == null) {         SessionInfo[] sessions = getSessions();         Set set = new HashSet();         for (int i=0; i<sessions.length; i++) {            SessionInfo ses = sessions[i];            if (ses.hasCallback()) {               CallbackAddress[] arr = ((CbQueueProperty)ses.getSessionQueue().getProperties()).getCallbackAddresses();               for (int ii=0; arr!=null && ii<arr.length; ii++) {                  if (arr[ii].useForSubjectQueue() == true)                     set.add(arr[ii]);               }            }         }         this.callbackAddressCache = (CallbackAddress[])set.toArray(new CallbackAddress[set.size()]);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -