📄 streamingcallback.java
字号:
if (this.queue == null) return; final boolean ignorePutInterceptor = false; if (cbSessId != null) { String oldCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null); if (oldCbSessionId != null && !oldCbSessionId.equals(cbSessId)) { log.warning("the client property '" + ENTRY_CB_SESSION_ID + "' is a reserved word, we will overwrite its value='" + oldCbSessionId + "' to be '" + cbSessionId + "'"); ClientProperty prop = new ClientProperty(ENTRY_CB_SESSION_ID, null, null, cbSessId); qos.getClientProperties().put(prop.getName(), prop); } } MsgUnit msgUnit = new MsgUnit(key.getData(), cont, qos.getData()); MsgQueuePublishEntry entry = new MsgQueuePublishEntry(this.global, msgUnit, this.queue.getStorageId()); this.queue.put(entry, ignorePutInterceptor); } /** * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos) */ public String updateStraight(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException { log.fine("cbSessionId='" + cbSessId + "'"); ByteArrayInputStream bais = new ByteArrayInputStream(cont); return this.callback.update(cbSessId, updKey, bais, updQos); } /** * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos) */ public String updateNewMessage(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException { log.fine("cbSessionId='" + cbSessId + "'"); return this.callback.update(cbSessId, updKey, in, updQos); } private final boolean isFirstChunk(UpdateQos qos) { int seq = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), 0); return seq == 0; } private final boolean isLastChunk(UpdateQos qos) { boolean hasGroupId = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), (String)null) != null; if (!hasGroupId) return true; return qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), false); } private final ClientProperty getProp(String key, UpdateQos qos) { return qos.getClientProperty(Constants.addJmsPrefix(key, log)); } /** * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos) */ public String update(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException { boolean sendInitial = this.queue != null && this.lastMessageCompleted && this.queue.getNumOfEntries() > 0; if (sendInitial) sendInitialQueueEntries(); final boolean isExternal = true; log.fine("cbSessionId='" + cbSessId + "'"); return updateInternal(cbSessId, updKey, cont, updQos, isExternal); } /** * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos) */ private final String updateInternal(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos, boolean isExternal) throws XmlBlasterException { this.lastMessageCompleted = false; boolean doStore = isExternal; boolean isLastChunk = false; try { log.fine("entering with cbSessionId='" + cbSessId + "'"); if (this.timer != null && this.timestamp != null) { // no need to be threadsafe since update is single thread this.timer.removeTimeoutListener(this.timestamp); this.timestamp = null; } ClientProperty exProp = getProp(XBConnectionMetaData.JMSX_GROUP_EX, updQos); // TODO Check if this exception really should be thrown: I think it shall not be thrown since it is an exception // which occured when publishing and this is the information that the update should return if (exProp != null) throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_INTERNALERROR, "update", "An exception occured on a chunk when updating. " + updQos.toXml()); isLastChunk = isLastChunk(updQos); synchronized(this) { consumeExceptionIfNotNull(); if (this.ret != null) { clearQueue(); return ret; } } if (isLastChunk) { // no need to store the last message since sync return if (isFirstChunk(updQos)) { // TODO a sync to wait until cleared (the updateStraight after the sync, not inside). try { return updateStraight(cbSessId, updKey, cont, updQos); } catch (IOException e) { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: exception occured.", e); } } try { if (cont != null && cont.length > 0) { this.writer.write(this.out, cont); } this.writer.close(this.out); // wait until the client has returned his method. try { mutex.acquire(); consumeExceptionIfNotNull(); clearQueue(); return this.ret; } finally { mutex.release(); } } catch (InterruptedException e) { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e); } finally { reset(); } } else { // it is not the last message if (this.timer != null) this.timestamp = this.timer.addTimeoutListener(this, this.waitForChunksTimeout, null); try { if (isFirstChunk(updQos)) { this.mutex.acquire(); this.cbSessionId = cbSessId; this.out = new PipedOutputStream(); this.in = new PipedInputStream(this.out); ExecutionThread thread = new ExecutionThread(cbSessId, updKey, cont, updQos); thread.start(); } else { // check if the message is complete /* if (this.oldGroupId == null) { try { mutex.acquire(); throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: The message is not the first of a group but the previous one was already completed."); } finally { mutex.release(); } } */ } this.writer.write(this.out, cont); } catch (InterruptedException e) { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e); } catch (IOException e) { throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e); } if (doStore) storeEntry(cbSessId, updKey, cont, updQos); // and return a fake positive response. return Constants.RET_OK; } } catch (XmlBlasterException e) { try { this.writer.close(this.out); } catch (InterruptedException e1) { e1.printStackTrace(); } this.lastMessageCompleted = true; throw e; } catch (Throwable e) { e.printStackTrace(); throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "throwable in updateInternal", "", e); } finally { if (isLastChunk) { this.lastMessageCompleted = true; } log.fine("Leaving method"); } } /** * It is used here to inform the user update method that a timeout occured, it will throw * an IOException when reading the in stream of the update method. * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object) */ public void timeout(Object userData) { try { this.writer.close(this.out); } catch (Throwable e) { // we can not make it threadsafe so we must protect against possible NPE Exceptions e.printStackTrace(); } } private final void clearQueue() { if (queue != null) { log.fine("Clear the queue " + this.queue.getStorageId()); queue.clear(); } } /** * Always makes a USER_UPDATE_HOLDBACK Exception out of it, no matter what the original exception * was. * @param ex */ private synchronized void setException(Throwable ex) { if (ex instanceof XmlBlasterException) { XmlBlasterException tmp = (XmlBlasterException)ex; if (tmp.getErrorCode().equals(ErrorCode.USER_UPDATE_HOLDBACK)) this.ex = tmp; else this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex); } else { this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex); } } /** * returns the exception (if any) and resets it. * @return */ private synchronized void consumeExceptionIfNotNull() throws XmlBlasterException { XmlBlasterException e = this.ex; if (e != null) { this.ex = null; throw e; } } // implementation of interface I_ConnectionStateListener /** * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess) */ public synchronized void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { log.fine("I am alive now"); // only used on first connect after it is ignored. if (this.initialized) return; if (this.useQueue) { log.info("going to instance the queue"); ConnectQos connectQos = connection.getConnectQos(); ClientQueueProperty prop = connectQos.getClientQueueProperty(); // The storageId must remain the same after a client restart String storageIdStr = connection.getId(); if (((XmlBlasterAccess)connection).getPublicSessionId() == 0 ) { // having no public sessionId we need to generate a unique queue name storageIdStr += System.currentTimeMillis()+Global.getCounter(); } StorageId queueId = new StorageId(Constants.RELATING_CLIENT_UPDATE, storageIdStr); try { this.queue = this.global.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), queueId, connectQos.getClientQueueProperty()); if (((XmlBlasterAccess)connection).isCallbackDispatcherActive()) sendInitialQueueEntries(); } catch (XmlBlasterException e) { log.severe("An exception occured when trying to initialize the callback client queue: " + e.getMessage()); e.printStackTrace(); } } this.initialized = true; } /* (non-Javadoc) * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess) */ public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { // TODO Auto-generated method stub } /* (non-Javadoc) * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess) */ public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) { // TODO Auto-generated method stub } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -