📄 xmlblasteraccess.java
字号:
} this.disconnectInProgress = true; this.glob.unregisterMBean(this.mbeanHandle); if (disconnectQos == null) disconnectQos = new DisconnectQos(glob); if (isConnected()) { if (this.clientQueue != null) { long remainingEntries = this.clientQueue.getNumOfEntries(); if (remainingEntries > 0) { if (disconnectQos.clearClientQueue()) log.warning(getLogId()+"You called disconnect(). Please note that there are " + remainingEntries + " unsent invocations/messages in the queue which are discarded now."); else log.info(getLogId()+"You called disconnect(). Please note that there are " + remainingEntries + " unsent invocations/messages in the queue which are sent on next connect of the same client with the same public session ID."); } } String[] subscriptionIdArr = this.updateDispatcher.getSubscriptionIds(); for (int ii=0; ii<subscriptionIdArr.length; ii++) { String subscriptionId = subscriptionIdArr[ii]; UnSubscribeKey key = new UnSubscribeKey(glob, subscriptionId); try { unSubscribe(key, null); } catch(XmlBlasterException e) { if (e.isCommunication()) { break; } log.warning(getLogId()+"Couldn't unsubscribe '" + subscriptionId + "' : " + e.getMessage()); } } this.updateDispatcher.clear(); // Now send the disconnect() to the server ... if (!disconnectQos.isLeaveServer() && this.clientQueue != null) { try { MsgQueueDisconnectEntry entry = new MsgQueueDisconnectEntry(this.glob, this.clientQueue.getStorageId(), disconnectQos); queueMessage(entry); // disconnects are always transient log.info(getLogId()+"Successful disconnect from " + getServerNodeId()); } catch(Throwable e) { e.printStackTrace(); log.warning(e.toString()); } } } if (this.synchronousCache != null) { this.synchronousCache.clear(); } if (this.clientQueue != null && disconnectQos.clearClientQueue()) { this.clientQueue.clear(); } if (disconnectQos.shutdownDispatcher()) { if (this.dispatchManager != null) { this.dispatchManager.shutdown(); this.dispatchManager = null; } if (this.clientQueue != null) { this.clientQueue.shutdown(); // added to make hsqldb shutdown this.clientQueue = null; } } if (disconnectQos.shutdownCbServer() && this.cbServer != null) { try { this.cbServer.shutdown(); this.cbServer = null; } catch (Throwable e) { e.printStackTrace(); log.warning(e.toString()); } } if (this.secPlgn != null) { this.secPlgn = null; } this.connectQos = null; this.connectReturnQos = null; this.disconnectInProgress = false; this.msgErrorHandler = null; this.updateListener = null; this.streamingCb = null; super.glob.shutdown(); this.shutdown = true; return true; } /** * @return true if shutdown was called, typically by disconnect() */ public boolean isShutdown() { return this.shutdown; } /** * Access the callback server. * @return null if no callback server is established */ public I_CallbackServer getCbServer() { return this.cbServer; } /** * Create a descriptive ME, for logging only * @return e.g. "/node/heron/client/joe/3" or "UNKNOWN_SESSION" if connect() was not successful */ public String getId() { SessionName sessionName = getSessionName(); return (sessionName == null) ? "UNKNOWN_SESSION" : sessionName.getAbsoluteName(); } /** * Useful as a logging prefix. * @return For example "client/TheDesperate/-6: " */ public String getLogId() { SessionName sessionName = getSessionName(); return (sessionName == null) ? "" : sessionName.getRelativeName() + ": "; } /** * The public session ID of this login session. */ public SessionName getSessionName() { if (this.connectReturnQos != null) return this.connectReturnQos.getSessionName(); if (this.connectQos != null) { return this.connectQos.getSessionName(); } return null; } /** * @see I_XmlBlasterAccess#getStorageIdStr() */ public String getStorageIdStr() { return this.storageIdPrefix; } /** * @see I_XmlBlasterAccess#setStorageIdStr(String) */ public void setStorageIdStr(String prefix) { this.storageIdPrefix = Global.getStrippedString(prefix); } /** * Allows to set the node name for nicer logging. * Typically used by cluster clients and not by ordinary clients * @param serverNodeId For example "/node/heron/instanceId/1233435" or "/node/heron" */ public void setServerNodeId(String nodeId) { if (nodeId == null) return; if (nodeId.startsWith("/node") || nodeId.startsWith("/xmlBlaster/node")) this.serverNodeId = nodeId; else this.serverNodeId = "/node/" + nodeId; } /** * The cluster node id (name) to which we want to connect. * <p /> * Needed only for nicer logging when running in a cluster.<br /> * Is configurable with "-server.node.id golan" until a successful connect * @return e.g. "/node/golan" or /xmlBlaster/node/heron" */ public String getServerNodeId() { if (this.contextNode != null) return this.contextNode.getParent(ContextNode.CLUSTER_MARKER_TAG).getAbsoluteName(); if (this.serverNodeId != null) return this.serverNodeId; return this.glob.getInstanceId(); // Changes for each restart } /** * Set my identity. * @param serverNodeId For example "/node/heron/instanceId/1233435" or "/node/heron" */ private void setContextNodeId(String nodeId) { if (nodeId == null) return; if (nodeId.indexOf("/") == -1) nodeId = "/node/"+nodeId; // add CLUSTER_MARKER_TAG to e.g. "/node/avalon.mycomp.com" String oldClusterObjectName = ""; // e.g. "org.xmlBlaster:nodeClass=node,node=clientSUB1" String oldServerNodeInstanceName = ""; // e.g. "clientSUB1" ContextNode clusterContext = null; if (this.contextNode != null) { // same instance as glob.getContextNode(): clusterContext = this.contextNode.getParent(ContextNode.CLUSTER_MARKER_TAG); oldServerNodeInstanceName = clusterContext.getInstanceName(); oldClusterObjectName = clusterContext.getAbsoluteName(ContextNode.SCHEMA_JMX); } // Verify the publicSessionId ... try { if (this.mbeanHandle != null && this.jmxPublicSessionId != getPublicSessionId()) { /*int count = */this.glob.getJmxWrapper().renameMBean(this.mbeanHandle.getObjectInstance().getObjectName().toString(), ContextNode.SESSION_MARKER_TAG, ""+getPublicSessionId()); this.mbeanHandle.getContextNode().setInstanceName(""+getPublicSessionId()); this.jmxPublicSessionId = getPublicSessionId(); } if (this.mbeanHandle == null && this.contextNode != null && !this.contextNode.getInstanceName().equals(""+getPublicSessionId())) { this.contextNode.setInstanceName(""+getPublicSessionId()); } } catch (XmlBlasterException e) { log.warning(getLogId()+"Ignoring problem during JMX session registration: " + e.toString()); } // parse new cluster node name ... ContextNode tmp = ContextNode.valueOf(nodeId); ContextNode tmpClusterContext = (tmp==null)?null:tmp.getParent(ContextNode.CLUSTER_MARKER_TAG); if (tmpClusterContext == null) { log.severe(getLogId()+"Ignoring unknown serverNodeId '" + nodeId + "'"); return; } String newServerNodeInstanceName = tmpClusterContext.getInstanceName(); // e.g. "heron" if (oldServerNodeInstanceName.equals(newServerNodeInstanceName)) { return; // nothing to do, same cluster name } this.glob.getContextNode().setInstanceName(newServerNodeInstanceName); if (clusterContext == null) { clusterContext = this.glob.getContextNode(); if (getLoginName() != null && getLoginName().length() > 0) { String instanceName = this.glob.validateJmxValue(getLoginName()); ContextNode contextNodeSubject = new ContextNode(ContextNode.CONNECTION_MARKER_TAG, instanceName, clusterContext); this.contextNode = new ContextNode(ContextNode.SESSION_MARKER_TAG, ""+getPublicSessionId(), contextNodeSubject); } } else { clusterContext.setInstanceName(newServerNodeInstanceName); } this.glob.setScopeContextNode(this.contextNode); try { // Query all "org.xmlBlaster:nodeClass=node,node=clientSUB1" + ",*" sub-nodes and replace the name by "heron" // For example our connectionQueue or our plugins like Pop3Driver if (oldClusterObjectName.length() > 0) { int num = this.glob.getJmxWrapper().renameMBean(oldClusterObjectName, ContextNode.CLUSTER_MARKER_TAG, this.contextNode); if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Renamed " + num + " jmx nodes to new '" + nodeId + "'"); } if (this.mbeanHandle == null && this.contextNode != null) { // "org.xmlBlaster:nodeClass=node,node=heron" this.mbeanHandle = this.glob.registerMBean(this.contextNode, this); } } catch (XmlBlasterException e) { log.warning(getLogId()+"Ignoring problem during JMX registration: " + e.toString()); } } /** * Put the given message entry into the queue */ private Object queueMessage(MsgQueueEntry entry) throws XmlBlasterException { try { this.clientQueue.put(entry, I_Queue.USE_PUT_INTERCEPTOR); if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Forwarded one '" + entry.getEmbeddedType() + "' message, current state is " + getState().toString()); return entry.getReturnObj(); } catch (XmlBlasterException e) { if (log.isLoggable(Level.FINE)) log.fine(e.getMessage()); throw e; } catch (Throwable e) { if (log.isLoggable(Level.FINE)) log.fine(e.toString()); XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,null,null,e); //msgErrorHandler.handleError(new MsgErrorInfo(glob, entry, null, xmlBlasterException)); throw xmlBlasterException; // internal errors or not in failsafe mode: throw back to client } } /** * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos) */ public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException { return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)), new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) ); } /** * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos) */ public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe"); if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME); if (getSessionName().isPubSessionIdUser() && subscribeQos.getData().getMultiSubscribe()==false && !subscribeQos.getData().hasSubscriptionId()) { // For failsave clients we generate on client side the subscriptionId // In case of offline/clientSideQueued operation we guarantee like this a not changing // subscriptionId and the client code can reliably use the subscriptionId for further dispatching // of update() messages. subscribeQos.getData().generateSubscriptionId(getSessionName(), subscribeKey.getData()); } MsgQueueSubscribeEntry entry = new MsgQueueSubscribeEntry(glob, this.clientQueue.getStorageId(), subscribeKey.getData(), subscribeQos.getData());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -