⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xmlblasteraccess.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      return (SubscribeReturnQos)queueMessage(entry);   }   /**    * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback)    */   public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos, I_Callback cb) throws XmlBlasterException {      return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),                       new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)),                       cb );   }   /**    * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback)    */   public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos, I_Callback cb) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      if (this.updateListener == null) {         String text = "No callback listener is registered. " +                       " Please use XmlBlasterAccess.connect() with default I_Callback given.";         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text);      }      // sync subscribe & put against update()'s check for entry      // otherwise if the update was faster then the subscribe to return we miss the entry      synchronized (this.updateDispatcher) {         SubscribeReturnQos subscribeReturnQos = subscribe(subscribeKey, subscribeQos);         this.updateDispatcher.addCallback(subscribeReturnQos.getSubscriptionId(), cb, subscribeQos.getPersistent());         if (!subscribeReturnQos.isFakedReturn()) {            this.updateDispatcher.ackSubscription(subscribeReturnQos.getSubscriptionId());         }         return subscribeReturnQos;      }   }   /**    * @see I_XmlBlasterAccess#get(GetKey, GetQos)    */   public MsgUnit[] get(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {      return get(new GetKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),                 new GetQos(glob, glob.getQueryQosFactory().readObject(qos)) );   }   /**    * @see I_XmlBlasterAccess#getCached(GetKey, GetQos)    */   public MsgUnit[] getCached(GetKey getKey, GetQos getQos) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "getCached");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      if (this.synchronousCache == null) {  //Is synchronousCache installed?         throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,              "Can't handle getCached(), please install a cache with createSynchronousCache() first");      }      MsgUnit[] msgUnitArr = null;      msgUnitArr = this.synchronousCache.get(getKey, getQos);      if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"CacheDump: msgUnitArr=" + msgUnitArr + ": '" + getKey.toXml().trim() + "' \n" + getQos.toXml() + this.synchronousCache.toXml(""));      //not found in this.synchronousCache      if(msgUnitArr == null) {         msgUnitArr = get(getKey, getQos);  //get messages from xmlBlaster (synchronous)         SubscribeKey subscribeKey = new SubscribeKey(glob, getKey.getData());         SubscribeQos subscribeQos = new SubscribeQos(glob, getQos.getData());         SubscribeReturnQos subscribeReturnQos = null;         synchronized (this.synchronousCache) {            subscribeReturnQos = subscribe(subscribeKey, subscribeQos); //subscribe to this messages (asynchronous)            this.synchronousCache.newEntry(subscribeReturnQos.getSubscriptionId(), getKey, msgUnitArr);     //fill messages to this.synchronousCache         }         log.info(getLogId()+"New entry in this.synchronousCache created (subscriptionId="+subscribeReturnQos.getSubscriptionId()+")");      }      return msgUnitArr;   }   /**    * @see I_XmlBlasterAccess#get(GetKey, GetQos)    */   public MsgUnit[] get(GetKey getKey, GetQos getQos) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "get");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      MsgQueueGetEntry entry  = new MsgQueueGetEntry(glob,                                      this.clientQueue.getStorageId(), getKey, getQos);      MsgUnit[] arr = (MsgUnit[])queueMessage(entry);      return (arr == null) ? new MsgUnit[0] : arr;   }   /**    * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos)    */   public UnSubscribeReturnQos[] unSubscribe(UnSubscribeKey unSubscribeKey, UnSubscribeQos unSubscribeQos) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "unSubscribe");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      MsgQueueUnSubscribeEntry entry  = new MsgQueueUnSubscribeEntry(glob,                                      this.clientQueue.getStorageId(), unSubscribeKey, unSubscribeQos);      UnSubscribeReturnQos[] arr = (UnSubscribeReturnQos[])queueMessage(entry);      this.updateDispatcher.removeCallback(unSubscribeKey.getOid());      return (arr == null) ? new UnSubscribeReturnQos[0] : arr;   }   /**    * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos)    */   public UnSubscribeReturnQos[] unSubscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {      return unSubscribe(new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),                       new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );   }   /**    * @see I_XmlBlasterAccess#publish(MsgUnit)    */   public PublishReturnQos publish(MsgUnit msgUnit) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publish");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      MsgQueuePublishEntry entry  = new MsgQueuePublishEntry(glob, msgUnit, this.clientQueue.getStorageId());      return (PublishReturnQos)queueMessage(entry);   }   /**    * @see I_XmlBlasterAccess#publishOneway(MsgUnit[])    */   public void publishOneway(org.xmlBlaster.util.MsgUnit [] msgUnitArr) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishOneway");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      final boolean ONEWAY = true;      for (int ii=0; ii<msgUnitArr.length; ii++) {         MsgQueuePublishEntry entry  = new MsgQueuePublishEntry(glob, msgUnitArr[ii],                                          this.clientQueue.getStorageId(), ONEWAY);         queueMessage(entry);      }   }   // rename to publish()   public PublishReturnQos[] publishArr(org.xmlBlaster.util.MsgUnit[] msgUnitArr) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishArr");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      if (this.firstWarn) {         log.warning(getLogId()+"Publishing arrays is not atomic implemented - TODO");         this.firstWarn = false;      }      PublishReturnQos[] retQos = new PublishReturnQos[msgUnitArr.length];      for (int ii=0; ii<msgUnitArr.length; ii++) {         MsgQueuePublishEntry entry  = new MsgQueuePublishEntry(glob, msgUnitArr[ii],                                          this.clientQueue.getStorageId());         retQos[ii] = (PublishReturnQos)queueMessage(entry);      }      return retQos;   }   /**    * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos)    */   public EraseReturnQos[] erase(EraseKey eraseKey, EraseQos eraseQos) throws XmlBlasterException {      if (!this.isValid)         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "erase");      if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);      MsgQueueEraseEntry entry  = new MsgQueueEraseEntry(glob,                                      this.clientQueue.getStorageId(), eraseKey, eraseQos);      EraseReturnQos[] arr = (EraseReturnQos[])queueMessage(entry);      return (arr == null) ? new EraseReturnQos[0] : arr;   }   /**    * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos)    */   public EraseReturnQos[] erase(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {      return erase(new EraseKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),                       new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );   }   /**    * This is the callback method invoked from xmlBlaster    * delivering us a new asynchronous message.    * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)    */   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Entering update(updateKey=" + updateKey.getOid() +                    ", subscriptionId=" + updateQos.getSubscriptionId() + ", " + ((this.synchronousCache != null) ? "using synchronousCache" : "no synchronousCache") + ") ...");      if (this.synchronousCache != null) {         boolean retVal;         synchronized (this.synchronousCache) {            retVal = this.synchronousCache.update(updateQos.getSubscriptionId(), updateKey, content, updateQos);         }         if (retVal) {            if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Putting update message " + updateQos.getSubscriptionId() + " into cache");            return Constants.RET_OK; // "<qos><state id='OK'/></qos>";         }         if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Update message " + updateQos.getSubscriptionId() + " is not for cache");      }      Object obj = null;      // sync against subscribe & put      // otherwise if the update was faster then the subscribe to return we miss the entry      synchronized (this.updateDispatcher) {         obj = this.updateDispatcher.getCallback(updateQos.getSubscriptionId());      }      if (obj != null) {  // If a special callback was specified for this subscription:         I_Callback cb = (I_Callback)obj;         return cb.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client      }      else if (this.updateListener != null) {         // If a general callback was specified on login:         return this.updateListener.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client      }      else {         log.severe(getLogId()+"Ignoring unexpected update message as client has not registered a callback: " + updateKey.toXml() + "" + updateQos.toXml());      }      return Constants.RET_OK; // "<qos><state id='OK'/></qos>";   }   /**    * Call by DispatchManager on connection state transition.    * <p />    * Enforced by interface I_ConnectionStatusListener    */   public void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) {      if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " connectInProgress=" + this.connectInProgress);      if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) {         log.info(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE +                      " connectInProgress=" + this.connectInProgress +                      " with " + this.clientQueue.getNumOfEntries() + " client side queued messages");      }      if (this.connectInProgress) {         dispatchManager.trySyncMode(true);         if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) {            try {               MsgQueueEntry entry = (MsgQueueEntry)this.clientQueue.peek();               if (entry.getMethodName() == MethodName.CONNECT) {                  this.clientQueue.remove();                  log.info(getLogId()+"Removed queued connect message, our new connect has precedence");               }            }            catch (XmlBlasterException e) {               log.severe(getLogId()+"Removing connect entry in client tail back queue failed: " + e.getMessage() + "\n" + toXml());            }         }         return;      }      if (this.clientQueue == null || this.clientQueue.getNumOfEntries() == 0) {         dispatchManager.trySyncMode(true);      }      if (this.connectReturnQos == null || !this.connectReturnQos.isReconnected()) {         cleanupForNewServer();      }      if (this.connectionListener != null) {         this.connectionListener.reachedAlive(oldState, this);      }   }   /**    * If we have reconnected to xmlBlaster and the xmlBlaster server instance    * is another one which does not know our session state and subscribes we need to clear all    * cached subscribes etc.    */   private void cleanupForNewServer() {      if (this.updateDispatcher.size() > 0) {         int num = this.updateDispatcher.clearAckNonPersistentSubscriptions(); // to avoid memory leaks, subscribes pending in the queue are not cleared         if (num > 0) {            log.info(getLogId()+"Removed " + num + " subscribe specific callback registrations");         }         // TODO: On switch to sync delivery and the client has         // cleared subscribes from the queue manually we have still a memory leak here:         // We would need to call clearNAKSubscriptions()      }      if (this.synchronousCache != null) {         this.synchronousCache.clear(); // we need to re-subscribe      }   }   /**    * Call by DispatchManager on connection state transition.    * <p />    * Enforced by interface I_ConnectionStatusListener    */   public void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) {      if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING + " connectInProgress=" + this.connectInProgress);      if (this.connectInProgress) return;      if (this.connectionListener != null) {         this.connectionListener.reachedPolling(oldState, this);      }   }   /**    * Call by DispatchManager on connection state transition.    * <p>Enforced by interface I_ConnectionStatusListener</p>    */   public void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) {      if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD + " connectInProgress=" + this.connectInProgress);      if (this.connectionListener != null) {         this.connectionListener.reachedDead(oldState, this);      }   }   /**    * Access the environment settings of this connection.    * <p>Enforced by interface I_XmlBlasterAccess</p>    * @return The global handle (like a stack with local variables for this connection)    */   public Global getGlobal() {      return this.glob;   }   /**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -