📄 requestreplyexecutor.java
字号:
public final void setCbClient(I_CallbackExtended cbClient) { this.cbClient = cbClient; } public final void setXmlBlasterCore(I_XmlBlaster xmlBlaster) { this.xmlBlasterImpl = xmlBlaster; } public final I_CallbackExtended getCbClient() { return this.cbClient; } public void finalize() { if (log.isLoggable(Level.FINE)) log.fine("Garbage Collected"); } /** * Sets the loginName and automatically the requestId as well */ protected void setLoginName(String loginName) { if (loginName != null && loginName.length() > 0) this.prefix = loginName + ":"; else this.prefix = null; } /** * Adds the listener to receive response/exception events. */ public final void addResponseListener(String requestId, I_ResponseListener l) { if (requestId == null || l == null) { throw new IllegalArgumentException("addResponseListener() with requestId=null"); } Object o = this.responseListenerMap.put(requestId, l); if (o == null) { if (log.isLoggable(Level.FINE)) log.fine("Added addResponseListener requestId=" + requestId); } else { log.warning("Added addResponseListener requestId=" + requestId + " but there was already one"); } } /** * Removes the specified listener. */ public final void removeResponseListener(String requestId) { if (requestId == null) { throw new IllegalArgumentException("removeResponseListener() with requestId=null"); } Object o = this.responseListenerMap.remove(requestId); if (o == null) { if (this.responseListenerMapWasCleared) { if (log.isLoggable(Level.FINE)) log.fine("removeResponseListener(" + requestId + ") entry not found, size is " + this.responseListenerMap.size()); } else { log.severe("removeResponseListener(" + requestId + ") entry not found, size is " + this.responseListenerMap.size()); } } else { if (log.isLoggable(Level.FINE)) log.fine("removeResponseListener(" + requestId + ") done"); } } /** * Get the response listener object */ public final I_ResponseListener getResponseListener(String requestId) { if (requestId == null) { throw new IllegalArgumentException("getResponseListener() with requestId=null"); } return (I_ResponseListener)this.responseListenerMap.get(requestId); } public void clearResponseListenerMap() { try { StringBuffer buf = null; int size = 0; synchronized (this.responseListenerMap) { // for iteration we need to sync size = this.responseListenerMap.size(); if (size > 0) { buf = new StringBuffer(256); java .util.Iterator iterator = this.responseListenerMap.keySet().iterator(); while (iterator.hasNext()) { if (buf.length() > 0) buf.append(", "); String key = (String)iterator.next(); buf.append(key); } } } if (size > 0) { if (buf != null) log.warning("There are " + size + " messages pending without a response, request IDs are '" + buf.toString() + "', we remove them now."); this.responseListenerMap.clear(); this.responseListenerMapWasCleared = true; } } catch (Throwable e) { e.printStackTrace(); } } /** * Handle common messages * @return false: for connect() and disconnect() which must be handled by the base class */ public boolean receiveReply(MsgInfo receiver, boolean udp) throws XmlBlasterException, IOException { if (log.isLoggable(Level.FINE)) log.fine("Receiving '" + receiver.getTypeStr() + "' message " + receiver.getMethodName() + "(" + receiver.getRequestId() + ")"); if (receiver.isInvoke()) { // handling invocations ... if (MethodName.PUBLISH_ONEWAY == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length < 1) { log.severe("Invocation of " + receiver.getMethodName() + "() failed, missing arguments"); return true; } xmlBlasterImpl.publishOneway((AddressServer)this.addressConfig, receiver.getSecretSessionId(), arr); } else if (MethodName.PUBLISH == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length < 1) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invocation of " + receiver.getMethodName() + "() failed, missing arguments"); String[] response = xmlBlasterImpl.publishArr((AddressServer)this.addressConfig, receiver.getSecretSessionId(), arr); executeResponse(receiver, response, udp); } else if (MethodName.UPDATE_ONEWAY == receiver.getMethodName()) { try { I_CallbackExtended cbClientTmp = this.cbClient; if (cbClientTmp == null) { throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_CALLBACKSERVER_NOTAVAILABLE, ME, "The " + getType() + " callback driver is not created, can't process the remote invocation. Try configuration ' -protocol "+getType()+"'"); } MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length < 1) { log.severe("Invocation of " + receiver.getMethodName() + "() failed, missing arguments"); return true; } cbClientTmp.updateOneway(receiver.getSecretSessionId(), arr); } catch (XmlBlasterException e) { executeException(receiver, e, udp); return true; } catch (Throwable e) { XmlBlasterException xmlBlasterException = new XmlBlasterException(glob, ErrorCode.USER_UPDATE_INTERNALERROR, ME, "Invocation of " + receiver.getMethodName() + "() failed, missing arguments", e); executeException(receiver, xmlBlasterException, udp); return true; } } else if (MethodName.UPDATE == receiver.getMethodName()) { try { I_CallbackExtended cbClientTmp = this.cbClient; // Remember to avoid synchronized block if (cbClientTmp == null) { throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_CALLBACKSERVER_NOTAVAILABLE, ME, "No "+getType()+" callback driver is available, can't process the remote invocation."); } MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length < 1) { throw new XmlBlasterException(glob, ErrorCode.USER_UPDATE_INTERNALERROR, ME, "Invocation of " + receiver.getMethodName() + "() failed, missing arguments"); } String[] response = cbClientTmp.update(receiver.getSecretSessionId(), arr); executeResponse(receiver, response, udp); } catch (XmlBlasterException e) { executeException(receiver, e, udp); return true; } catch (Throwable e) { XmlBlasterException xmlBlasterException = new XmlBlasterException(glob, ErrorCode.USER_UPDATE_INTERNALERROR, ME, "Invocation of " + receiver.getMethodName() + "() failed, missing arguments", e); executeException(receiver, xmlBlasterException, udp); return true; } } else if (MethodName.GET == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length != 1) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invocation of " + receiver.getMethodName() + "() failed, wrong arguments"); MsgUnitRaw[] response = xmlBlasterImpl.get((AddressServer)this.addressConfig, receiver.getSecretSessionId(), arr[0].getKey(), arr[0].getQos()); executeResponse(receiver, response, udp); } else if (MethodName.PING == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (this.cbClient == null && !glob.isServerSide()) { XmlBlasterException xmlBlasterException = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_CALLBACKSERVER_NOTAVAILABLE, ME, "No "+getType()+" callback driver is available, can't process the remote invocation."); executeException(receiver, xmlBlasterException, udp); return true; } if (xmlBlasterImpl != null) { // Server side: Forward ping to xmlBlaster core String response = xmlBlasterImpl.ping((AddressServer)this.addressConfig, /*receiver.getSecretSessionId(),*/ (arr.length>0) ? arr[0].getQos() : "<qos/>"); executeResponse(receiver, response, udp); // Constants.RET_OK="<qos><state id='OK'/></qos>" or current run level } else { // Client side: answer directly, not forwarded to client code executeResponse(receiver, Constants.RET_OK, udp); // Constants.RET_OK="<qos><state id='OK'/></qos>" or current run level } } else if (MethodName.SUBSCRIBE == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length != 1) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invocation of " + receiver.getMethodName() + "() failed, wrong arguments"); String response = xmlBlasterImpl.subscribe((AddressServer)this.addressConfig, receiver.getSecretSessionId(), arr[0].getKey(), arr[0].getQos()); executeResponse(receiver, response, udp); } else if (MethodName.UNSUBSCRIBE == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length != 1) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invocation of " + receiver.getMethodName() + "() failed, wrong arguments"); String[] response = xmlBlasterImpl.unSubscribe((AddressServer)this.addressConfig, receiver.getSecretSessionId(), arr[0].getKey(), arr[0].getQos()); executeResponse(receiver, response, udp); } else if (MethodName.ERASE == receiver.getMethodName()) { MsgUnitRaw[] arr = receiver.getMessageArr(); if (arr == null || arr.length != 1) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invocation of " + receiver.getMethodName() + "() failed, wrong arguments"); String[] response = xmlBlasterImpl.erase((AddressServer)this.addressConfig, receiver.getSecretSessionId(), arr[0].getKey(), arr[0].getQos()); executeResponse(receiver, response, udp); } else if (MethodName.CONNECT == receiver.getMethodName()) { return false; } else if (MethodName.DISCONNECT == receiver.getMethodName()) { return false; } else { log.warning("Ignoring received invocation message '" + receiver.getMethodName() + "' with requestId=" + receiver.getRequestId() + ", nobody is interested in it: " + receiver.toLiteral()); if (log.isLoggable(Level.FINEST)) log.finest("Ignoring received message, nobody is interested in it:\n>" + receiver.toLiteral() + "<"); } return true; } // Handling response or exception ... I_ResponseListener listener = getResponseListener(receiver.getRequestId()); if (listener == null) { log.warning("Ignoring received '" + receiver.getMethodName() + "' response message, requestId=" + receiver.getRequestId() + ", nobody is interested in it"); if (log.isLoggable(Level.FINEST)) log.finest("Ignoring received message, nobody is interested in it: >" + receiver.toLiteral() + "<"); return true; } removeResponseListener(receiver.getRequestId()); if (receiver.isResponse()) { if (receiver.getMethodName().returnsMsgArr()) { // GET returns MsgUnitRaw[] listener.incomingMessage(receiver.getRequestId(), receiver.getMessageArr()); } else if (receiver.getMethodName().returnsStringArr()) { // PUBLISH etc. return String[] listener.incomingMessage(receiver.getRequestId(), receiver.getQosArr()); } else if (receiver.getMethodName().returnsString()) { // SUBSCRIBE, CONNECT etc. return a String listener.incomingMessage(receiver.getRequestId(), receiver.getQos()); } else { // SUBSCRIBE, CONNECT etc. return a String throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "The method " + receiver.getMethodName() + " is not expected in this context"); } } else if (receiver.isException()) { // XmlBlasterException listener.incomingMessage(receiver.getRequestId(), receiver.getException()); } else { log.severe("PANIC: Invalid response message for " + receiver.getMethodName()); listener.incomingMessage(receiver.getRequestId(), new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invalid response message '" + receiver.getMethodName())); } return true; } /* * Overwrite on demand. * TODO: Is this needed anymore? * @return */ protected boolean hasConnection() { return true; } /** * Send a message and block until the response arrives. * <p/> * We simulate RPC (remote procedure call) here. * This should be thread save and may be invoked by many * client threads in parallel (though i have not tested it). * @param expectingResponse WAIT_ON_RESPONSE=true or ONEWAY=false * @param udp Some user info which is passed through * @return the response object of the request, of type String(QoS), MsgUnitRaw[] or XmlBlasterException */ public Object requestAndBlockForReply(MsgInfo msgInfo, boolean expectingResponse, boolean udp) throws XmlBlasterException, IOException {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -