⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestbroker.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
            MsgUnit origMsgUnit = null;            if (entry instanceof ReferenceEntry) {               ReferenceEntry referenceEntry = (ReferenceEntry)entry;               origMsgUnit = ((ReferenceEntry)entry).getMsgUnitOrNull();               if (origMsgUnit == null) {                  if (log.isLoggable(Level.FINE)) log.fine("Ignoring dead message for destroyed callback queue entry " + referenceEntry.getLogId());                  continue;               }            }            else {               log.severe("PANIC: Internal error in deadMessage data type");               retArr[ii] = "PANIC";               continue;            }            try {               if (entry.getKeyOid().equals(Constants.OID_DEAD_LETTER)) {  // Check for recursion of dead letters                  log.severe("PANIC: Recursive dead message is lost, no recovery possible - dumping to file not yet coded: " +                                origMsgUnit.toXml() + ": " +                                ((reason != null) ? (": " + reason) : "") );                  retArr[ii] = entry.getKeyOid();                  Thread.dumpStack();                  continue;               }               if (origMsgUnit.getQosData().getClientProperties().get("__isErrorHandled") != null) {  // Check for recursion of dead letters                  log.warning("Recursive message '" + entry.getLogId() + "' is error handled already (sent as dead letter), we ignore it.");                  retArr[ii] = entry.getKeyOid();                  continue;               }               origMsgUnit.getQosData().addClientProperty("__isErrorHandled", true); // Mark the original to avoid looping if failed client is the dead message listener               String text = "Generating dead message '" + entry.getLogId() + "'" +                            " from publisher=" + entry.getSender() +                            " because delivery " +            // entry.getReceiver() is recognized in queueId                            ((queue == null) ? "" : "with queue '"+queue.getStorageId().toString()+"' ") + "failed" +                            ((reason != null) ? (": " + reason) : "");               log.warning(text);               PublishKey publishKey = new PublishKey(glob, Constants.OID_DEAD_LETTER);               //publishKey.setClientTags("<oid>"+entry.getKeyOid()+"</oid>");               // null: use the content from origMsgUnit:               pubQos.addClientProperty(Constants.CLIENTPROPERTY_DEADMSGKEY, origMsgUnit.getKey()); //"__key"               pubQos.addClientProperty(Constants.CLIENTPROPERTY_DEADMSGQOS, origMsgUnit.getQos()); //"__qos"               pubQos.addClientProperty(Constants.CLIENTPROPERTY_OID, origMsgUnit.getKeyOid()); //"__oid"               pubQos.addClientProperty(Constants.CLIENTPROPERTY_RCVTIMESTAMP, origMsgUnit.getQosData().getRcvTimestamp()); //"__rcvTimestamp"               pubQos.addClientProperty(Constants.CLIENTPROPERTY_DEADMSGREASON, text); //"__deadMessageReason"               MsgUnit msgUnit = new MsgUnit(origMsgUnit, publishKey.getData(), null, pubQos.getData());               retArr[ii] = publish(unsecureSessionInfo, msgUnit);            }            catch(Throwable e) {               log.severe("PANIC: " + entry.getKeyOid() + " dead letter is lost, no recovery possible - dumping to file not yet coded: " + e.toString() + "\n" + origMsgUnit.toXml());               e.printStackTrace();               retArr[ii] = entry.getKeyOid();            }         }         return retArr;      }      catch (Throwable e) {         log.severe("PANIC: " + entries.length + " dead letters are lost, no recovery possible:" + e.getMessage());         for (int ii=0; ii<entries.length; ii++) {            MsgQueueEntry entry = entries[ii];            try {               if (entry == null) {                  continue;               }               /*               else if (entry instanceof MsgUnitWrapper) {                  MsgUnitWrapper msgUnitWrapper = (MsgUnitWrapper)entry;                  String fileName = glob.getMsgFileDumper().store(msgUnitWrapper);                  log.warn(ME, "Dumped lost message to file " + fileName);               }               */               else if (entry instanceof MsgQueueHistoryEntry) {                  log.warning("History entry is lost: " + entry.toXml());               }               else if (entry instanceof MsgQueueUpdateEntry) {                  ReferenceEntry referenceEntry = (ReferenceEntry)entry;                  if (referenceEntry.isDestroyed()) {                     if (log.isLoggable(Level.FINE)) log.fine("Ignoring detroyed callback message " + entry.getLogId());                  }                  else {                     log.warning("Callback of message failed unrecoverably: " + entry.toXml());                  }               }               else {                  log.severe("PANIC: Unrecoverable lost message " + entry.toXml());               }            }            catch (Throwable th) {               log.severe("PANIC: Unrecoverable lost message " + entry.toXml() + ": " + th.getMessage());            }         }      }      return new String[0];   }   public String subscribe(SessionInfo sessionInfo, QueryKeyData xmlKey, SubscribeQosServer subscribeQos) throws XmlBlasterException   {      if (!sessionInfo.hasCallback()) {         throw new XmlBlasterException(glob, ErrorCode.USER_SUBSCRIBE_NOCALLBACK, ME, "You can't subscribe to '" + xmlKey.getOid() + "' without having a callback server");      }      try {         SubscriptionInfo.verifySubscriptionId(sessionInfo.getConnectQos().isClusterNode(), sessionInfo.getSessionName(), xmlKey, subscribeQos);         if (log.isLoggable(Level.FINER)) log.finer("Entering subscribe(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "', domain='" + xmlKey.getDomain() + "') from client '" + sessionInfo.getId() + "' ...");         String returnOid = "";         if (subscribeQos.getMultiSubscribe() == false) {            Vector vec =  clientSubscriptions.getSubscription(sessionInfo, xmlKey);            if (vec != null && vec.size() > 0) {               for (int i=0; i<vec.size(); i++) {                  SubscriptionInfo sub = (SubscriptionInfo)vec.elementAt(i);                  sub.update(subscribeQos);               }               log.info("Ignoring duplicate subscription '" +                       ((xmlKey.getOid()==null)?((xmlKey.getDomain()==null)?xmlKey.getQueryString():xmlKey.getDomain()):xmlKey.getOid()) +                        "' as you have set multiSubscribe to false");               StatusQosData qos = new StatusQosData(glob, MethodName.SUBSCRIBE);               SubscriptionInfo i = (SubscriptionInfo)vec.elementAt(0);               qos.setState(Constants.STATE_WARN);               qos.setSubscriptionId(i.getSubscriptionId());               return qos.toXml();            }         }         SubscriptionInfo subsQuery = null;         if (xmlKey.isQuery()) { // fires event for query subscription, this needs to be remembered for a match check of future published messages         // if (true) { // fires event for query subscription, this needs to be remembered for a match check of future published messages            subsQuery = new SubscriptionInfo(glob, sessionInfo, xmlKey, subscribeQos);            returnOid = subsQuery.getSubscriptionId(); // XPath query            fireSubscribeEvent(subsQuery);         }         KeyData[] keyDataArr = queryMatchingKeys(sessionInfo, xmlKey, subscribeQos.getData());         for (int jj=0; jj<keyDataArr.length; jj++) {            KeyData xmlKeyExact = keyDataArr[jj];            if (xmlKeyExact == null && xmlKey.isExact()) // subscription on a yet unknown topic ...               xmlKeyExact = xmlKey;            else if (xmlKeyExact != null && xmlKey.isDomain()) {               xmlKeyExact.setQueryType(xmlKey.getQueryType());            }            SubscriptionInfo subs = null;            if (sessionInfo.getConnectQos().duplicateUpdates() == false) {               Vector vec =  clientSubscriptions.getSubscriptionByOid(sessionInfo, xmlKeyExact.getOid(), true);               if (vec != null) {                  if (vec.size() > 0) {                     subs = (SubscriptionInfo)vec.firstElement();                     if (log.isLoggable(Level.FINE)) log.fine("Session '" + sessionInfo.getId() +                                    "', topic '" + xmlKeyExact.getOid() + "' is subscribed " +                                    vec.size() + " times with duplicateUpdates==false");                  }                  if (vec.size() > 1)                     log.severe("Internal problem for session '" + sessionInfo.getId() + "', message '" + xmlKeyExact.getOid() + "' is subscribed " + vec.size() + " times but duplicateUpdates==false!");               }            }            if (subs == null) {               if (subsQuery != null) {                  subs = new SubscriptionInfo(glob, sessionInfo, subsQuery, xmlKeyExact);                  subsQuery.addSubscription(subs);               }               else                  subs = new SubscriptionInfo(glob, sessionInfo, xmlKeyExact, subscribeQos);            }            subscribeToOid(subs, false); // fires event for subscription            if (returnOid.equals("")) returnOid = subs.getSubscriptionId();         }         StatusQosData qos = null;         if (this.glob.isClusterManagerReady()) { // cluster support - forward message to master            try {               subscribeQos.setSubscriptionId(returnOid); // force the same subscriptionId on all cluster nodes               SubscribeReturnQos ret = glob.getClusterManager().forwardSubscribe(sessionInfo, xmlKey, subscribeQos);               if (ret != null)                  qos = ret.getData();               //Thread.currentThread().dumpStack();               //if (ret != null) return ret.toXml();            }            catch (XmlBlasterException e) {               if (e.getErrorCode() == ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED) {                  this.glob.setUseCluster(false);               }               else {                  e.printStackTrace();                  throw e;               }            }         }         if (qos == null || qos.getSubscriptionId() == null ||  qos.getSubscriptionId().length() < 1) {            // The cluster subId is unique in another cluster node as well e.g.: "__subId:heron-2"            if (qos == null) qos = new StatusQosData(glob, MethodName.SUBSCRIBE);            qos.setSubscriptionId(returnOid);         }         if (log.isLoggable(Level.FINER)) log.finer("Leaving subscribe(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() +                                          "', query='" + xmlKey.getQueryString() + "', domain='" + xmlKey.getDomain() + "') from client '" +                                          sessionInfo.getId() + "' -> subscriptionId='" + qos.getSubscriptionId() + "'");         return qos.toXml();      }      catch (XmlBlasterException e) {         log.warning(e.getMessage());         throw e;      }      catch (Throwable e) {         e.printStackTrace();         throw XmlBlasterException.convert(glob, ME, ErrorCode.INTERNAL_SUBSCRIBE.toString(), e);      }   }   /**    * Invoked by a client, to access one/many MsgUnit.    * <p />    * Synchronous read-access method.    * <p />    * In the cluster environment all messages are accessed from the master cluster node,    * tuning with XmlBlasterAccess.synchronousCache is not yet implemented.    *    * @param xmlKey  Key allowing XPath or exact selection<br>    *                See XmlKey.dtd for a description    * @param getQos  Quality of Service, flags to control subscription<br>    *                See XmlQoS.dtd for a description, XmlQoS.xml for examples<p />    * @return A sequence of 0 - n MsgUnit structs. 0 if no message matched.    *         They are clones from the internal messageUnit, so native clients can manipulate    *         them without danger    * @exception XmlBlasterException on internal errors    * @see org.xmlBlaster.client.qos.GetQos    * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.get.html">The interface.get requirement</a>    */   public MsgUnit[] get(SessionInfo sessionInfo, QueryKeyData xmlKey, GetQosServer getQos) throws XmlBlasterException   {      try {         if (log.isLoggable(Level.FINER)) log.finer("Entering get(oid='" + xmlKey.getOid() + "', queryType='" + xmlKey.getQueryType() + "', query='" + xmlKey.getQueryString() + "') from client '" + sessionInfo.getId() + " ...");         if ("__refresh".equals(xmlKey.getOid())) {            return new MsgUnit[0]; // get() with oid="__refresh" do only refresh the login session         }         if (xmlKey.isAdministrative()) {            if (!glob.supportAdministrative())               throw new XmlBlasterException(glob, ErrorCode.RESOURCE_ADMIN_UNAVAILABLE, ME, "Sorry administrative get() is not available, try to configure xmlBlaster.");            MsgUnit[] raw = glob.getMomClientGateway().getCommand(sessionInfo, xmlKey, getQos.getData());            if (getQos.getWantContent())  return raw;            MsgUnit[] msgUnitArr = new MsgUnit[raw.length];            for(int i=0; i<raw.length; i++) {               // byte[] cont = (getQos.getWantContent()) ? raw[i].getContent() : new byte[0];               // msgUnitArr[i] = new MsgUnit(key, cont, raw[i].getQos());               msgUnitArr[i] = new MsgUnit(raw[i], null, new byte[0], null);            }            return msgUnitArr;         }         if (Constants.JDBC_OID.equals(xmlKey.getOid()/*"__sys__jdbc"*/)) { // Query RDBMS !!! hack, we need a general service interface            org.xmlBlaster.protocol.jdbc.XmlDBAdapter adap = new org.xmlBlaster.protocol.jdbc.XmlDBAdapter(glob,                        xmlKey.getQueryString().getBytes(), (org.xmlBlaster.protocol.jdbc.NamedConnectionPool)this.glob.getObjectEntry("NamedConnectionPool-"+glob.getId()));            return adap.query();         }         KeyData[] keyDataArr = queryMatchingKeys(sessionInfo, xmlKey, getQos.getData());         ArrayList msgUnitList = new ArrayList(keyDataArr.length);         if (log.isLoggable(Level.FINE)) log.fine("get(): " + ((keyDataArr!=null&&keyDataArr.length>0&&keyDataArr[0]!=null)?"Found local match "+keyDataArr[0].toXml():"No local match"));         // Always forward the get request to the master         // even if there are no matching keys         // In the cluster environment all messages are accessed from the master cluster node,         // tuning with XmlBlasterAccess.synchronousCache is not yet implemented.         if (this.glob.isClusterManagerReady()) { // cluster support - forward erase to master           try {               MsgUnit tmp[] = glob.getClusterManager().forwardGet(sessionInfo, xmlKey, getQos);               if (tmp != null && tmp.length > 0) {                   log.info("get() access of " + tmp.length + " messages from cluster master");                   for (int jj=0; jj<tmp.length; jj++) {                       msgUnitList.add(tmp[jj]);                       // We currently don' cache the message here in the slave !!!                       // We could do it with the xmlBlasterConnection.initCache(int size)                   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -