📄 xmlblasteraccess.java
字号:
synchronized (this) { if (this.startupTime == 0) { this.startupTime = System.currentTimeMillis(); } if (isConnected() || this.connectInProgress) { String text = "connect() rejected, you are connected already, please check your code"; throw new XmlBlasterException(glob, ErrorCode.USER_CONNECT_MULTIPLE, ME, text); } this.connectInProgress = true; try { this.connectQos = (qos==null) ? new ConnectQos(glob) : qos; ClientProperty tmp = this.connectQos.getClientProperty(Constants.UPDATE_BULK_ACK); if (tmp != null) { if (tmp.getBooleanValue()) { log.info("Setting the flag '" + Constants.UPDATE_BULK_ACK + "' to 'true' since specified in ConnectQos"); this.updateBulkAck = true; } } // We need to set a unique ID for this client so that global.getId() is unique // which is used e.g. in the JDBC plugin SessionName sn = getSessionName(); if (sn != null) { if (sn.isPubSessionIdUser()) { this.glob.setId(sn.toString()); } else { this.glob.setId(sn.toString() + System.currentTimeMillis()); // Not secure if two clients start simultaneously } } else { this.glob.setId(getLoginName() + System.currentTimeMillis()); // Not secure if two clients start simultaneously } this.glob.resetInstanceId(); this.connectQos.getData().setInstanceId(this.glob.getInstanceId()); if (connectQos.getData().getGlobal().isServerSide()) { String text = "Your ConnectQos.getData() contains a ServerScope instead of a Global instance, this is not allowed"; throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text); } this.updateListener = updateListener; // TODO: This is done by ConnectQos already, isn't it? initSecuritySettings(this.connectQos.getData().getClientPluginType(), this.connectQos.getData().getClientPluginVersion()); this.ME = "XmlBlasterAccess-" + getId(); setContextNodeId(getServerNodeId()); try { ClientQueueProperty prop = this.connectQos.getClientQueueProperty(); // The storageId must remain the same after a client restart String storageIdStr = getId(); if (getPublicSessionId() == 0 ) { // having no public sessionId we need to generate a unique queue name storageIdStr += System.currentTimeMillis()+Global.getCounter(); } else { if (getStorageIdStr() != null && getStorageIdStr().length() > 0) { // client code forces a named client side storageId - dangerous if the name conflicts with server name in same DB storageIdStr = getStorageIdStr(); } } StorageId queueId = new StorageId(Constants.RELATING_CLIENT, storageIdStr); this.clientQueue = glob.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), queueId, this.connectQos.getClientQueueProperty()); if (this.clientQueue == null) { String text = "The client queue plugin is not found with this configuration, please check your connect QoS: " + prop.toXml(); throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME, text); } if (this.msgErrorHandler == null) { this.msgErrorHandler = new ClientErrorHandler(glob, this); } this.dispatchManager = new DispatchManager(glob, this.msgErrorHandler, getSecurityPlugin(), this.clientQueue, this, this.connectQos.getAddresses(), sn); getDispatchStatistic(); // Force creation of dispatchStatistic as this syncs on 'this' and could deadlock if don later from a update() this.dispatchManager.getDispatchConnectionsHandler().registerPostSendListener(this); if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Switching to synchronous delivery mode ..."); this.dispatchManager.trySyncMode(true); if (this.updateListener != null) { // Start a default callback server using same protocol createDefaultCbServer(); } if (this.connectQos.doSendConnect()) { // Try to connect to xmlBlaster ... sendConnectQos(); } else { log.info(getLogId()+"Initialized client library, but no connect() is send to xmlBlaster, a delegate should do any subscribe if required"); } } catch (XmlBlasterException e) { if (isConnected()) disconnect(null); throw e; } catch (Throwable e) { if (isConnected()) disconnect(null); throw XmlBlasterException.convert(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "Connection failed", e); } } finally { this.connectInProgress = false; } } // synchronized if (this.connectQos.getRefreshSession()) { startSessionRefresher(); } if (isAlive()) { if (this.connectionListener != null) { this.connectionListener.reachedAlive(ConnectionStateEnum.UNDEF, this); } log.info(glob.getReleaseId() + ": Successful " + this.connectQos.getAddress().getType() + " login as " + getId()); if (this.clientQueue.getNumOfEntries() > 0) { long num = this.clientQueue.getNumOfEntries(); log.info(getLogId()+"We have " + num + " client side queued tail back messages"); this.dispatchManager.switchToASyncMode(); while (this.clientQueue.getNumOfEntries() > 0) { try { Thread.sleep(20L); } catch( InterruptedException i) {} } log.info((num-this.clientQueue.getNumOfEntries()) + " client side queued tail back messages sent"); this.dispatchManager.switchToSyncMode(); } } else { if (this.connectionListener != null) { this.connectionListener.reachedPolling(ConnectionStateEnum.UNDEF, this); } log.info(glob.getReleaseId() + ": Login request as " + getId() + " is queued"); } if (this.connectReturnQos != null) { setContextNodeId(this.connectReturnQos.getServerInstanceId()); } return this.connectReturnQos; // new ConnectReturnQos(glob, ""); } /** * Sends the current connectQos to xmlBlaster and stores the connectReturnQos. * @throws XmlBlasterException */ public void sendConnectQos() throws XmlBlasterException { MsgQueueConnectEntry entry = new MsgQueueConnectEntry(this.glob, this.clientQueue.getStorageId(), this.connectQos.getData()); // Try to connect to xmlBlaster ... this.connectReturnQos = (ConnectReturnQos)queueMessage(entry); this.connectReturnQos.getData().setInitialConnectionState(this.dispatchManager.getDispatchConnectionsHandler().getState()); } public boolean isConnected() { return this.connectReturnQos != null; } private void startSessionRefresher() { if (this.connectQos == null) return; long sessionTimeout = this.connectQos.getSessionQos().getSessionTimeout(); final long MIN = 2000L; // Sessions which live less than 2 seconds are not supported if (sessionTimeout >= MIN) { long gap = (sessionTimeout < 60*1000L) ? sessionTimeout/2 : sessionTimeout-30*1000L; final long refreshTimeout = sessionTimeout - gap; final Timeout timeout = this.glob.getPingTimer(); this.sessionRefreshTimeoutHandle = timeout.addTimeoutListener(new I_Timeout() { public void timeout(Object userData) { if (isAlive()) { if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Refreshing session to not expire"); try { refreshSession(); } catch (XmlBlasterException e) { log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString()); } } else { if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Can't refresh session as we have no connection"); } try { sessionRefreshTimeoutHandle = timeout.addOrRefreshTimeoutListener(this, refreshTimeout, null, sessionRefreshTimeoutHandle) ; } catch (XmlBlasterException e) { log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString()); } } }, refreshTimeout, null); } else { log.warning(getLogId()+"Auto-refreshing session is not supported for session timeouts smaller " + MIN + " seconds"); } } /** * @see I_XmlBlasterAccess#refreshSession() */ public void refreshSession() throws XmlBlasterException { GetKey gk = new GetKey(glob, "__refresh"); GetQos gq = new GetQos(glob); get(gk, gq); } /** * Extracts address data from ConnectQos (or adds default if missing) * and instantiate a callback server as specified in ConnectQos */ private void createDefaultCbServer() throws XmlBlasterException { CbQueueProperty prop = connectQos.getSessionCbQueueProperty(); // Creates a default property for us if none is available CallbackAddress addr = prop.getCurrentCallbackAddress(); // may return null if (addr == null) addr = new CallbackAddress(glob); this.cbServer = initCbServer(getLoginName(), addr); addr.setType(this.cbServer.getCbProtocol()); addr.setRawAddress(this.cbServer.getCbAddress()); //addr.setVersion(this.cbServer.getVersion()); //addr.setSecretSessionId(cbSessionId); prop.setCallbackAddress(addr); log.info(getLogId()+"Callback settings: " + prop.getSettings()); } /** * @see I_XmlBlasterAccess#initCbServer(String, CallbackAddress) */ public I_CallbackServer initCbServer(String loginName, CallbackAddress callbackAddress) throws XmlBlasterException { if (callbackAddress == null) callbackAddress = new CallbackAddress(glob); callbackAddress.setSessionName(this.getSessionName()); if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Using 'client.cbProtocol=" + callbackAddress.getType() + "' to be used by " + getServerNodeId() + ", trying to create the callback server ..."); I_CallbackServer server = glob.getCbServerPluginManager().getPlugin(callbackAddress.getType(), callbackAddress.getVersion()); server.initialize(this.glob, loginName, callbackAddress, this); return server; } /** * Initializes the little client helper framework for authentication. * <p /> * The first goal is a proper loginQoS xml string for authentication. * <p /> * The second goal is to intercept the messages for encryption (or whatever the * plugin supports). * <p /> * See xmlBlaster.properties, for example: * <pre> * Security.Client.DefaultPlugin=gui,1.0 * Security.Client.Plugin[gui][1.0]=org.xmlBlaster.authentication.plugins.gui.ClientSecurityHelper * </pre> */ private void initSecuritySettings(String secMechanism, String secVersion) { PluginLoader secPlgnMgr = glob.getClientSecurityPluginLoader(); try { this.secPlgn = secPlgnMgr.getClientPlugin(secMechanism, secVersion); if (secMechanism != null) // to avoid double logging for login() log.info(getLogId()+"Loaded security plugin=" + secMechanism + " version=" + secVersion); } catch (XmlBlasterException e) { log.severe(getLogId()+"Security plugin '" + secMechanism + "/" + secVersion + "' initialization failed. Reason: "+e.getMessage()); this.secPlgn = null; } } public I_ClientPlugin getSecurityPlugin() { return this.secPlgn; } /** * @see org.xmlBlaster.client.I_XmlBlasterAccess#disconnect(DisconnectQos) * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.disconnect.html">interface.disconnect requirement</a> */ public synchronized boolean disconnect(DisconnectQos disconnectQos) { if (!this.isValid) return false; // Relaxed check to allow shutdown of database without successful connection if (this.connectQos == null /*!isConnected()*/) { log.warning(getLogId()+"You called disconnect() but you are are not logged in, we ignore it."); return false; } if (disconnectQos == null) disconnectQos = new DisconnectQos(glob); if (!disconnectQos.getClearClientQueueProp().isModified()) { boolean clearClientQueue = true; if (this.connectQos != null) { if (this.connectQos.getSessionName().isPubSessionIdUser()) clearClientQueue = false; // Keep tail back messages } disconnectQos.clearClientQueue(clearClientQueue); } return shutdown(disconnectQos); } private synchronized boolean shutdown(DisconnectQos disconnectQos) { if (this.disconnectInProgress) { log.warning(getLogId()+"Calling disconnect again is ignored, you are in shutdown progress already"); return false;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -