📄 xmlblasteraccess.java
字号:
return (SubscribeReturnQos)queueMessage(entry); } /** * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback) */ public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos, I_Callback cb) throws XmlBlasterException { return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)), new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)), cb ); } /** * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback) */ public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos, I_Callback cb) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); if (this.updateListener == null) { String text = "No callback listener is registered. " + " Please use XmlBlasterAccess.connect() with default I_Callback given."; throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text); } // sync subscribe & put against update()'s check for entry // otherwise if the update was faster then the subscribe to return we miss the entry synchronized (this.updateDispatcher) { SubscribeReturnQos subscribeReturnQos = subscribe(subscribeKey, subscribeQos); this.updateDispatcher.addCallback(subscribeReturnQos.getSubscriptionId(), cb, subscribeQos.getPersistent()); if (!subscribeReturnQos.isFakedReturn()) { this.updateDispatcher.ackSubscription(subscribeReturnQos.getSubscriptionId()); } return subscribeReturnQos; } } /** * @see I_XmlBlasterAccess#get(GetKey, GetQos) */ public MsgUnit[] get(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException { return get(new GetKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)), new GetQos(glob, glob.getQueryQosFactory().readObject(qos)) ); } /** * @see I_XmlBlasterAccess#getCached(GetKey, GetQos) */ public MsgUnit[] getCached(GetKey getKey, GetQos getQos) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "getCached"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); if (this.synchronousCache == null) { //Is synchronousCache installed? throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME, "Can't handle getCached(), please install a cache with createSynchronousCache() first"); } MsgUnit[] msgUnitArr = null; msgUnitArr = this.synchronousCache.get(getKey, getQos); if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"CacheDump: msgUnitArr=" + msgUnitArr + ": '" + getKey.toXml().trim() + "' \n" + getQos.toXml() + this.synchronousCache.toXml("")); //not found in this.synchronousCache if(msgUnitArr == null) { msgUnitArr = get(getKey, getQos); //get messages from xmlBlaster (synchronous) SubscribeKey subscribeKey = new SubscribeKey(glob, getKey.getData()); SubscribeQos subscribeQos = new SubscribeQos(glob, getQos.getData()); SubscribeReturnQos subscribeReturnQos = null; synchronized (this.synchronousCache) { subscribeReturnQos = subscribe(subscribeKey, subscribeQos); //subscribe to this messages (asynchronous) this.synchronousCache.newEntry(subscribeReturnQos.getSubscriptionId(), getKey, msgUnitArr); //fill messages to this.synchronousCache } log.info(getLogId()+"New entry in this.synchronousCache created (subscriptionId="+subscribeReturnQos.getSubscriptionId()+")"); } return msgUnitArr; } /** * @see I_XmlBlasterAccess#get(GetKey, GetQos) */ public MsgUnit[] get(GetKey getKey, GetQos getQos) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "get"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); MsgQueueGetEntry entry = new MsgQueueGetEntry(glob, this.clientQueue.getStorageId(), getKey, getQos); MsgUnit[] arr = (MsgUnit[])queueMessage(entry); return (arr == null) ? new MsgUnit[0] : arr; } /** * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos) */ public UnSubscribeReturnQos[] unSubscribe(UnSubscribeKey unSubscribeKey, UnSubscribeQos unSubscribeQos) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "unSubscribe"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); MsgQueueUnSubscribeEntry entry = new MsgQueueUnSubscribeEntry(glob, this.clientQueue.getStorageId(), unSubscribeKey, unSubscribeQos); UnSubscribeReturnQos[] arr = (UnSubscribeReturnQos[])queueMessage(entry); this.updateDispatcher.removeCallback(unSubscribeKey.getOid()); return (arr == null) ? new UnSubscribeReturnQos[0] : arr; } /** * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos) */ public UnSubscribeReturnQos[] unSubscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException { return unSubscribe(new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)), new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) ); } /** * @see I_XmlBlasterAccess#publish(MsgUnit) */ public PublishReturnQos publish(MsgUnit msgUnit) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publish"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); MsgQueuePublishEntry entry = new MsgQueuePublishEntry(glob, msgUnit, this.clientQueue.getStorageId()); return (PublishReturnQos)queueMessage(entry); } /** * @see I_XmlBlasterAccess#publishOneway(MsgUnit[]) */ public void publishOneway(org.xmlBlaster.util.MsgUnit [] msgUnitArr) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishOneway"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); final boolean ONEWAY = true; for (int ii=0; ii<msgUnitArr.length; ii++) { MsgQueuePublishEntry entry = new MsgQueuePublishEntry(glob, msgUnitArr[ii], this.clientQueue.getStorageId(), ONEWAY); queueMessage(entry); } } // rename to publish() public PublishReturnQos[] publishArr(org.xmlBlaster.util.MsgUnit[] msgUnitArr) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishArr"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); if (this.firstWarn) { log.warning(getLogId()+"Publishing arrays is not atomic implemented - TODO"); this.firstWarn = false; } PublishReturnQos[] retQos = new PublishReturnQos[msgUnitArr.length]; for (int ii=0; ii<msgUnitArr.length; ii++) { MsgQueuePublishEntry entry = new MsgQueuePublishEntry(glob, msgUnitArr[ii], this.clientQueue.getStorageId()); retQos[ii] = (PublishReturnQos)queueMessage(entry); } return retQos; } /** * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos) */ public EraseReturnQos[] erase(EraseKey eraseKey, EraseQos eraseQos) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "erase"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); MsgQueueEraseEntry entry = new MsgQueueEraseEntry(glob, this.clientQueue.getStorageId(), eraseKey, eraseQos); EraseReturnQos[] arr = (EraseReturnQos[])queueMessage(entry); return (arr == null) ? new EraseReturnQos[0] : arr; } /** * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos) */ public EraseReturnQos[] erase(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException { return erase(new EraseKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)), new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) ); } /** * This is the callback method invoked from xmlBlaster * delivering us a new asynchronous message. * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos) */ public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Entering update(updateKey=" + updateKey.getOid() + ", subscriptionId=" + updateQos.getSubscriptionId() + ", " + ((this.synchronousCache != null) ? "using synchronousCache" : "no synchronousCache") + ") ..."); if (this.synchronousCache != null) { boolean retVal; synchronized (this.synchronousCache) { retVal = this.synchronousCache.update(updateQos.getSubscriptionId(), updateKey, content, updateQos); } if (retVal) { if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Putting update message " + updateQos.getSubscriptionId() + " into cache"); return Constants.RET_OK; // "<qos><state id='OK'/></qos>"; } if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Update message " + updateQos.getSubscriptionId() + " is not for cache"); } Object obj = null; // sync against subscribe & put // otherwise if the update was faster then the subscribe to return we miss the entry synchronized (this.updateDispatcher) { obj = this.updateDispatcher.getCallback(updateQos.getSubscriptionId()); } if (obj != null) { // If a special callback was specified for this subscription: I_Callback cb = (I_Callback)obj; return cb.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client } else if (this.updateListener != null) { // If a general callback was specified on login: return this.updateListener.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client } else { log.severe(getLogId()+"Ignoring unexpected update message as client has not registered a callback: " + updateKey.toXml() + "" + updateQos.toXml()); } return Constants.RET_OK; // "<qos><state id='OK'/></qos>"; } /** * Call by DispatchManager on connection state transition. * <p /> * Enforced by interface I_ConnectionStatusListener */ public void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) { if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " connectInProgress=" + this.connectInProgress); if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) { log.info(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " connectInProgress=" + this.connectInProgress + " with " + this.clientQueue.getNumOfEntries() + " client side queued messages"); } if (this.connectInProgress) { dispatchManager.trySyncMode(true); if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) { try { MsgQueueEntry entry = (MsgQueueEntry)this.clientQueue.peek(); if (entry.getMethodName() == MethodName.CONNECT) { this.clientQueue.remove(); log.info(getLogId()+"Removed queued connect message, our new connect has precedence"); } } catch (XmlBlasterException e) { log.severe(getLogId()+"Removing connect entry in client tail back queue failed: " + e.getMessage() + "\n" + toXml()); } } return; } if (this.clientQueue == null || this.clientQueue.getNumOfEntries() == 0) { dispatchManager.trySyncMode(true); } if (this.connectReturnQos == null || !this.connectReturnQos.isReconnected()) { cleanupForNewServer(); } if (this.connectionListener != null) { this.connectionListener.reachedAlive(oldState, this); } } /** * If we have reconnected to xmlBlaster and the xmlBlaster server instance * is another one which does not know our session state and subscribes we need to clear all * cached subscribes etc. */ private void cleanupForNewServer() { if (this.updateDispatcher.size() > 0) { int num = this.updateDispatcher.clearAckNonPersistentSubscriptions(); // to avoid memory leaks, subscribes pending in the queue are not cleared if (num > 0) { log.info(getLogId()+"Removed " + num + " subscribe specific callback registrations"); } // TODO: On switch to sync delivery and the client has // cleared subscribes from the queue manually we have still a memory leak here: // We would need to call clearNAKSubscriptions() } if (this.synchronousCache != null) { this.synchronousCache.clear(); // we need to re-subscribe } } /** * Call by DispatchManager on connection state transition. * <p /> * Enforced by interface I_ConnectionStatusListener */ public void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) { if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING + " connectInProgress=" + this.connectInProgress); if (this.connectInProgress) return; if (this.connectionListener != null) { this.connectionListener.reachedPolling(oldState, this); } } /** * Call by DispatchManager on connection state transition. * <p>Enforced by interface I_ConnectionStatusListener</p> */ public void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) { if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD + " connectInProgress=" + this.connectInProgress); if (this.connectionListener != null) { this.connectionListener.reachedDead(oldState, this); } } /** * Access the environment settings of this connection. * <p>Enforced by interface I_XmlBlasterAccess</p> * @return The global handle (like a stack with local variables for this connection) */ public Global getGlobal() { return this.glob; } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -