⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamingcallback.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      if (this.queue == null)         return;      final boolean ignorePutInterceptor = false;      if (cbSessId != null) {         String oldCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);         if (oldCbSessionId != null && !oldCbSessionId.equals(cbSessId)) {            log.warning("the client property '" + ENTRY_CB_SESSION_ID + "' is a reserved word, we will overwrite its value='" + oldCbSessionId + "' to be '" + cbSessionId + "'");            ClientProperty prop = new ClientProperty(ENTRY_CB_SESSION_ID, null, null, cbSessId);            qos.getClientProperties().put(prop.getName(), prop);         }      }      MsgUnit msgUnit = new MsgUnit(key.getData(), cont, qos.getData());      MsgQueuePublishEntry entry = new MsgQueuePublishEntry(this.global, msgUnit, this.queue.getStorageId());      this.queue.put(entry, ignorePutInterceptor);   }      /**    * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)    */   public String updateStraight(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {      log.fine("cbSessionId='" + cbSessId + "'");      ByteArrayInputStream bais = new ByteArrayInputStream(cont);      return this.callback.update(cbSessId, updKey, bais, updQos);   }      /**    * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)    */   public String updateNewMessage(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {      log.fine("cbSessionId='" + cbSessId + "'");      return this.callback.update(cbSessId, updKey, in, updQos);   }   private final boolean isFirstChunk(UpdateQos qos) {      int seq = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), 0);      return seq == 0;   }      private final boolean isLastChunk(UpdateQos qos) {      boolean hasGroupId = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), (String)null) != null;      if (!hasGroupId)         return true;      return qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), false);   }      private final ClientProperty getProp(String key, UpdateQos qos) {      return qos.getClientProperty(Constants.addJmsPrefix(key, log));   }      /**    * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)    */   public String update(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException {      boolean sendInitial = this.queue != null && this.lastMessageCompleted && this.queue.getNumOfEntries() > 0;       if (sendInitial)         sendInitialQueueEntries();            final boolean isExternal = true;      log.fine("cbSessionId='" + cbSessId + "'");      return updateInternal(cbSessId, updKey, cont, updQos, isExternal);   }      /**    * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)    */   private final String updateInternal(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos, boolean isExternal) throws XmlBlasterException {      this.lastMessageCompleted = false;      boolean doStore = isExternal;      boolean isLastChunk = false;      try {         log.fine("entering with cbSessionId='" + cbSessId + "'");         if (this.timer != null && this.timestamp != null) { // no need to be threadsafe since update is single thread            this.timer.removeTimeoutListener(this.timestamp);            this.timestamp = null;         }         ClientProperty exProp = getProp(XBConnectionMetaData.JMSX_GROUP_EX, updQos);         // TODO Check if this exception really should be thrown: I think it shall not be thrown since it is an exception         // which occured when publishing and this is the information that the update should return         if (exProp != null)            throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_INTERNALERROR, "update", "An exception occured on a chunk when updating. " + updQos.toXml());         isLastChunk = isLastChunk(updQos);                  synchronized(this) {            consumeExceptionIfNotNull();            if (this.ret != null) {               clearQueue();               return ret;            }         }                  if (isLastChunk) { // no need to store the last message since sync return            if (isFirstChunk(updQos)) {               // TODO a sync to wait until cleared (the updateStraight after the sync, not inside).               try {                  return updateStraight(cbSessId, updKey, cont, updQos);               }               catch (IOException e) {                  throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: exception occured.", e);               }                           }                        try {               if (cont != null && cont.length > 0) {                  this.writer.write(this.out, cont);               }                              this.writer.close(this.out);               // wait until the client has returned his method.               try {                  mutex.acquire();                  consumeExceptionIfNotNull();                  clearQueue();                  return this.ret;               }               finally {                  mutex.release();               }            }            catch (InterruptedException e) {               throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);            }            finally {               reset();            }         }         else { // it is not the last message            if (this.timer != null)               this.timestamp = this.timer.addTimeoutListener(this, this.waitForChunksTimeout, null);            try {               if (isFirstChunk(updQos)) {                  this.mutex.acquire();                  this.cbSessionId = cbSessId;                  this.out = new PipedOutputStream();                  this.in = new PipedInputStream(this.out);                  ExecutionThread thread = new ExecutionThread(cbSessId, updKey, cont, updQos);                  thread.start();               }               else { // check if the message is complete                  /*                  if (this.oldGroupId == null) {                     try {                        mutex.acquire();                        throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: The message is not the first of a group but the previous one was already completed.");                     }                     finally {                        mutex.release();                     }                  }                  */               }               this.writer.write(this.out, cont);            }            catch (InterruptedException e) {               throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);            }            catch (IOException e) {               throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);            }            if (doStore)               storeEntry(cbSessId, updKey, cont, updQos);            // and return a fake positive response.            return Constants.RET_OK;         }               }      catch (XmlBlasterException e) {         try {            this.writer.close(this.out);         }         catch (InterruptedException e1) {            e1.printStackTrace();         }         this.lastMessageCompleted = true;         throw e;      }      catch (Throwable e) {         e.printStackTrace();         throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "throwable in updateInternal", "", e);      }      finally {         if (isLastChunk) {            this.lastMessageCompleted = true;         }         log.fine("Leaving method");      }   }   /**    * It is used here to inform the user update method that a timeout occured, it will throw    * an IOException when reading the in stream of the update method.    * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)    */   public void timeout(Object userData) {      try {         this.writer.close(this.out);      }      catch (Throwable e) {         // we can not make it threadsafe so we must protect against possible NPE Exceptions         e.printStackTrace();      }         }   private final void clearQueue() {      if (queue != null) {         log.fine("Clear the queue " + this.queue.getStorageId());         queue.clear();      }   }   /**    * Always makes a USER_UPDATE_HOLDBACK Exception out of it, no matter what the original exception     * was.    * @param ex    */   private synchronized void setException(Throwable ex) {      if (ex instanceof XmlBlasterException) {         XmlBlasterException tmp = (XmlBlasterException)ex;         if (tmp.getErrorCode().equals(ErrorCode.USER_UPDATE_HOLDBACK))            this.ex = tmp;         else            this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);      }      else {         this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);      }   }      /**    * returns the exception (if any) and resets it.    * @return    */   private synchronized void consumeExceptionIfNotNull() throws XmlBlasterException {      XmlBlasterException e = this.ex;      if (e != null) {         this.ex = null;         throw e;      }   }      // implementation of interface I_ConnectionStateListener      /**    * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)    */   public synchronized void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {      log.fine("I am alive now");      // only used on first connect after it is ignored.      if (this.initialized)         return;           if (this.useQueue) {         log.info("going to instance the queue");         ConnectQos connectQos = connection.getConnectQos();         ClientQueueProperty prop = connectQos.getClientQueueProperty();         // The storageId must remain the same after a client restart         String storageIdStr = connection.getId();         if (((XmlBlasterAccess)connection).getPublicSessionId() == 0 ) {            // having no public sessionId we need to generate a unique queue name            storageIdStr += System.currentTimeMillis()+Global.getCounter();         }         StorageId queueId = new StorageId(Constants.RELATING_CLIENT_UPDATE, storageIdStr);         try {            this.queue = this.global.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), queueId,                  connectQos.getClientQueueProperty());            if (((XmlBlasterAccess)connection).isCallbackDispatcherActive())               sendInitialQueueEntries();         }         catch (XmlBlasterException e) {            log.severe("An exception occured when trying to initialize the callback client queue: " + e.getMessage());            e.printStackTrace();         }      }            this.initialized = true;   }   /* (non-Javadoc)    * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)    */   public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {      // TODO Auto-generated method stub         }   /* (non-Javadoc)    * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)    */   public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {      // TODO Auto-generated method stub         }   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -