📄 requestbroker.java
字号:
MsgUnit origMsgUnit = null; if (entry instanceof ReferenceEntry) { ReferenceEntry referenceEntry = (ReferenceEntry)entry; origMsgUnit = ((ReferenceEntry)entry).getMsgUnitOrNull(); if (origMsgUnit == null) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring dead message for destroyed callback queue entry " + referenceEntry.getLogId()); continue; } } else { log.severe("PANIC: Internal error in deadMessage data type"); retArr[ii] = "PANIC"; continue; } try { if (entry.getKeyOid().equals(Constants.OID_DEAD_LETTER)) { // Check for recursion of dead letters log.severe("PANIC: Recursive dead message is lost, no recovery possible - dumping to file not yet coded: " + origMsgUnit.toXml() + ": " + ((reason != null) ? (": " + reason) : "") ); retArr[ii] = entry.getKeyOid(); Thread.dumpStack(); continue; } if (origMsgUnit.getQosData().getClientProperties().get("__isErrorHandled") != null) { // Check for recursion of dead letters log.warning("Recursive message '" + entry.getLogId() + "' is error handled already (sent as dead letter), we ignore it."); retArr[ii] = entry.getKeyOid(); continue; } origMsgUnit.getQosData().addClientProperty("__isErrorHandled", true); // Mark the original to avoid looping if failed client is the dead message listener String text = "Generating dead message '" + entry.getLogId() + "'" + " from publisher=" + entry.getSender() + " because delivery " + // entry.getReceiver() is recognized in queueId ((queue == null) ? "" : "with queue '"+queue.getStorageId().toString()+"' ") + "failed" + ((reason != null) ? (": " + reason) : ""); log.warning(text); PublishKey publishKey = new PublishKey(glob, Constants.OID_DEAD_LETTER); //publishKey.setClientTags("<oid>"+entry.getKeyOid()+"</oid>"); // null: use the content from origMsgUnit: pubQos.addClientProperty(Constants.CLIENTPROPERTY_DEADMSGKEY, origMsgUnit.getKey()); //"__key" pubQos.addClientProperty(Constants.CLIENTPROPERTY_DEADMSGQOS, origMsgUnit.getQos()); //"__qos" pubQos.addClientProperty(Constants.CLIENTPROPERTY_OID, origMsgUnit.getKeyOid()); //"__oid" pubQos.addClientProperty(Constants.CLIENTPROPERTY_RCVTIMESTAMP, origMsgUnit.getQosData().getRcvTimestamp()); //"__rcvTimestamp" pubQos.addClientProperty(Constants.CLIENTPROPERTY_DEADMSGREASON, text); //"__deadMessageReason" MsgUnit msgUnit = new MsgUnit(origMsgUnit, publishKey.getData(), null, pubQos.getData()); retArr[ii] = publish(unsecureSessionInfo, msgUnit); } catch(Throwable e) { log.severe("PANIC: " + entry.getKeyOid() + " dead letter is lost, no recovery possible - dumping to file not yet coded: " + e.toString() + "\n" + origMsgUnit.toXml()); e.printStackTrace(); retArr[ii] = entry.getKeyOid(); } } return retArr; } catch (Throwable e) { log.severe("PANIC: " + entries.length + " dead letters are lost, no recovery possible:" + e.getMessage()); for (int ii=0; ii<entries.length; ii++) { MsgQueueEntry entry = entries[ii]; try { if (entry == null) { continue; } /* else if (entry instanceof MsgUnitWrapper) { MsgUnitWrapper msgUnitWrapper = (MsgUnitWrapper)entry; String fileName = glob.getMsgFileDumper().store(msgUnitWrapper); log.warn(ME, "Dumped lost message to file " + fileName); } */ else if (entry instanceof MsgQueueHistoryEntry) { log.warning("History entry is lost: " + entry.toXml()); } else if (entry instanceof MsgQueueUpdateEntry) { ReferenceEntry referenceEntry = (ReferenceEntry)entry; if (referenceEntry.isDestroyed()) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring detroyed callback message " + entry.getLogId()); } else { log.warning("Callback of message failed unrecoverably: " + entry.toXml()); } } else { log.severe("PANIC: Unrecoverable lost message " + entry.toXml()); } } catch (Throwable th) { log.severe("PANIC: Unrecoverable lost message " + entry.toXml() + ": " + th.getMessage()); } } } return new String[0]; } public String subscribe(SessionInfo sessionInfo, QueryKeyData xmlKey, SubscribeQosServer subscribeQos) throws XmlBlasterException { if (!sessionInfo.hasCallback()) { throw new XmlBlasterException(glob, ErrorCode.USER_SUBSCRIBE_NOCALLBACK, ME, "You can't subscribe to '" + xmlKey.getOid() + "' without having a callback server"); } try { SubscriptionInfo.verifySubscriptionId(sessionInfo.getConnectQos().isClusterNode(), sessionInfo.getSessionName(), xmlKey, subscribeQos); if (log.isLoggable(Level.FINER)) log.finer("Entering subscribe(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "', domain='" + xmlKey.getDomain() + "') from client '" + sessionInfo.getId() + "' ..."); String returnOid = ""; if (subscribeQos.getMultiSubscribe() == false) { Vector vec = clientSubscriptions.getSubscription(sessionInfo, xmlKey); if (vec != null && vec.size() > 0) { for (int i=0; i<vec.size(); i++) { SubscriptionInfo sub = (SubscriptionInfo)vec.elementAt(i); sub.update(subscribeQos); } log.info("Ignoring duplicate subscription '" + ((xmlKey.getOid()==null)?((xmlKey.getDomain()==null)?xmlKey.getQueryString():xmlKey.getDomain()):xmlKey.getOid()) + "' as you have set multiSubscribe to false"); StatusQosData qos = new StatusQosData(glob, MethodName.SUBSCRIBE); SubscriptionInfo i = (SubscriptionInfo)vec.elementAt(0); qos.setState(Constants.STATE_WARN); qos.setSubscriptionId(i.getSubscriptionId()); return qos.toXml(); } } SubscriptionInfo subsQuery = null; if (xmlKey.isQuery()) { // fires event for query subscription, this needs to be remembered for a match check of future published messages // if (true) { // fires event for query subscription, this needs to be remembered for a match check of future published messages subsQuery = new SubscriptionInfo(glob, sessionInfo, xmlKey, subscribeQos); returnOid = subsQuery.getSubscriptionId(); // XPath query fireSubscribeEvent(subsQuery); } KeyData[] keyDataArr = queryMatchingKeys(sessionInfo, xmlKey, subscribeQos.getData()); for (int jj=0; jj<keyDataArr.length; jj++) { KeyData xmlKeyExact = keyDataArr[jj]; if (xmlKeyExact == null && xmlKey.isExact()) // subscription on a yet unknown topic ... xmlKeyExact = xmlKey; else if (xmlKeyExact != null && xmlKey.isDomain()) { xmlKeyExact.setQueryType(xmlKey.getQueryType()); } SubscriptionInfo subs = null; if (sessionInfo.getConnectQos().duplicateUpdates() == false) { Vector vec = clientSubscriptions.getSubscriptionByOid(sessionInfo, xmlKeyExact.getOid(), true); if (vec != null) { if (vec.size() > 0) { subs = (SubscriptionInfo)vec.firstElement(); if (log.isLoggable(Level.FINE)) log.fine("Session '" + sessionInfo.getId() + "', topic '" + xmlKeyExact.getOid() + "' is subscribed " + vec.size() + " times with duplicateUpdates==false"); } if (vec.size() > 1) log.severe("Internal problem for session '" + sessionInfo.getId() + "', message '" + xmlKeyExact.getOid() + "' is subscribed " + vec.size() + " times but duplicateUpdates==false!"); } } if (subs == null) { if (subsQuery != null) { subs = new SubscriptionInfo(glob, sessionInfo, subsQuery, xmlKeyExact); subsQuery.addSubscription(subs); } else subs = new SubscriptionInfo(glob, sessionInfo, xmlKeyExact, subscribeQos); } subscribeToOid(subs, false); // fires event for subscription if (returnOid.equals("")) returnOid = subs.getSubscriptionId(); } StatusQosData qos = null; if (this.glob.isClusterManagerReady()) { // cluster support - forward message to master try { subscribeQos.setSubscriptionId(returnOid); // force the same subscriptionId on all cluster nodes SubscribeReturnQos ret = glob.getClusterManager().forwardSubscribe(sessionInfo, xmlKey, subscribeQos); if (ret != null) qos = ret.getData(); //Thread.currentThread().dumpStack(); //if (ret != null) return ret.toXml(); } catch (XmlBlasterException e) { if (e.getErrorCode() == ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED) { this.glob.setUseCluster(false); } else { e.printStackTrace(); throw e; } } } if (qos == null || qos.getSubscriptionId() == null || qos.getSubscriptionId().length() < 1) { // The cluster subId is unique in another cluster node as well e.g.: "__subId:heron-2" if (qos == null) qos = new StatusQosData(glob, MethodName.SUBSCRIBE); qos.setSubscriptionId(returnOid); } if (log.isLoggable(Level.FINER)) log.finer("Leaving subscribe(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "', domain='" + xmlKey.getDomain() + "') from client '" + sessionInfo.getId() + "' -> subscriptionId='" + qos.getSubscriptionId() + "'"); return qos.toXml(); } catch (XmlBlasterException e) { log.warning(e.getMessage()); throw e; } catch (Throwable e) { e.printStackTrace(); throw XmlBlasterException.convert(glob, ME, ErrorCode.INTERNAL_SUBSCRIBE.toString(), e); } } /** * Invoked by a client, to access one/many MsgUnit. * <p /> * Synchronous read-access method. * <p /> * In the cluster environment all messages are accessed from the master cluster node, * tuning with XmlBlasterAccess.synchronousCache is not yet implemented. * * @param xmlKey Key allowing XPath or exact selection<br> * See XmlKey.dtd for a description * @param getQos Quality of Service, flags to control subscription<br> * See XmlQoS.dtd for a description, XmlQoS.xml for examples<p /> * @return A sequence of 0 - n MsgUnit structs. 0 if no message matched. * They are clones from the internal messageUnit, so native clients can manipulate * them without danger * @exception XmlBlasterException on internal errors * @see org.xmlBlaster.client.qos.GetQos * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.get.html">The interface.get requirement</a> */ public MsgUnit[] get(SessionInfo sessionInfo, QueryKeyData xmlKey, GetQosServer getQos) throws XmlBlasterException { try { if (log.isLoggable(Level.FINER)) log.finer("Entering get(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "') from client '" + sessionInfo.getId() + " ..."); if ("__refresh".equals(xmlKey.getOid())) { return new MsgUnit[0]; // get() with oid="__refresh" do only refresh the login session } if (xmlKey.isAdministrative()) { if (!glob.supportAdministrative()) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_ADMIN_UNAVAILABLE, ME, "Sorry administrative get() is not available, try to configure xmlBlaster."); MsgUnit[] raw = glob.getMomClientGateway().getCommand(sessionInfo, xmlKey, getQos.getData()); if (getQos.getWantContent()) return raw; MsgUnit[] msgUnitArr = new MsgUnit[raw.length]; for(int i=0; i<raw.length; i++) { // byte[] cont = (getQos.getWantContent()) ? raw[i].getContent() : new byte[0]; // msgUnitArr[i] = new MsgUnit(key, cont, raw[i].getQos()); msgUnitArr[i] = new MsgUnit(raw[i], null, new byte[0], null); } return msgUnitArr; } if (Constants.JDBC_OID.equals(xmlKey.getOid()/*"__sys__jdbc"*/)) { // Query RDBMS !!! hack, we need a general service interface org.xmlBlaster.protocol.jdbc.XmlDBAdapter adap = new org.xmlBlaster.protocol.jdbc.XmlDBAdapter(glob, xmlKey.getQueryString().getBytes(), (org.xmlBlaster.protocol.jdbc.NamedConnectionPool)this.glob.getObjectEntry("NamedConnectionPool-"+glob.getId())); return adap.query(); } KeyData[] keyDataArr = queryMatchingKeys(sessionInfo, xmlKey, getQos.getData()); ArrayList msgUnitList = new ArrayList(keyDataArr.length); if (log.isLoggable(Level.FINE)) log.fine("get(): " + ((keyDataArr!=null&&keyDataArr.length>0&&keyDataArr[0]!=null)?"Found local match "+keyDataArr[0].toXml():"No local match")); // Always forward the get request to the master // even if there are no matching keys // In the cluster environment all messages are accessed from the master cluster node, // tuning with XmlBlasterAccess.synchronousCache is not yet implemented. if (this.glob.isClusterManagerReady()) { // cluster support - forward erase to master try { MsgUnit tmp[] = glob.getClusterManager().forwardGet(sessionInfo, xmlKey, getQos); if (tmp != null && tmp.length > 0) { log.info("get() access of " + tmp.length + " messages from cluster master"); for (int jj=0; jj<tmp.length; jj++) { msgUnitList.add(tmp[jj]); // We currently don' cache the message here in the slave !!! // We could do it with the xmlBlasterConnection.initCache(int size) }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -