📄 handleclient.java
字号:
} public void handleMessage(MsgInfo receiver, boolean udp) { try { if (log.isLoggable(Level.FINE)) log.fine("Receiving message " + receiver.getMethodName() + "(" + receiver.getRequestId() + ")"); // receive() processes all invocations, only connect()/disconnect() we do locally ... if (receiveReply(receiver, udp) == false) { if (MethodName.CONNECT == receiver.getMethodName()) { // TODO: crypt.importMessage(receiver.getQos()); see also ClientDispatchConnection.java:440 Socket socket = this.sock; if (socket == null) return; // Is possible when EOF arrived inbetween ConnectQosServer conQos = new ConnectQosServer(driver.getGlobal(), receiver.getQos()); if (conQos.getSecurityQos() == null) throw new XmlBlasterException(glob, ErrorCode.USER_SECURITY_AUTHENTICATION_ILLEGALARGUMENT, ME, "connect() without securityQos"); conQos.getSecurityQos().setClientIp (socket.getInetAddress().getHostAddress()); conQos.setAddressServer(this.driver.getAddressServer()); setLoginName(conQos.getSessionName().getRelativeName()); Thread.currentThread().setName("XmlBlaster." + this.driver.getType() + (this.driver.isSSL()?".SSL":"") + ".tcpListener-" + conQos.getUserId()); this.ME = this.driver.getType() + "-HandleClient-" + this.loginName; // getInetAddress().toString() does no reverse DNS lookup (no blocking danger) ... log.info(ME+": Client connected, coming from host=" + socket.getInetAddress().toString() + " port=" + socket.getPort()); CallbackAddress[] cbArr = conQos.getSessionCbQueueProperty().getCallbackAddresses(); for (int ii=0; cbArr!=null && ii<cbArr.length; ii++) { SocketUrl cbUrl = new SocketUrl(glob, cbArr[ii].getRawAddress()); SocketUrl remoteUrl = new SocketUrl(glob, socket.getInetAddress().getHostAddress(), socket.getPort()); if (driver.getAddressServer() != null) { driver.getAddressServer().setRemoteAddress(remoteUrl); } if (log.isLoggable(Level.FINE)) log.fine(ME+": remoteUrl='" + remoteUrl.getUrl() + "' cbUrl='" + cbUrl.getUrl() + "'"); if (true) { // !!!!! TODO remoteUrl.equals(cbUrl)) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Tunneling callback messages through same SOCKET to '" + remoteUrl.getUrl() + "'"); this.callback = new CallbackSocketDriver(this.loginName, this); cbArr[ii].setCallbackDriver(this.callback); } else { log.severe(ME+": Creating SEPARATE callback " + this.driver.getType() + " connection to '" + remoteUrl.getUrl() + "'"); this.callback = new CallbackSocketDriver(this.loginName); // DispatchConnection.initialize() -> CbDispatchConnection.connectLowlevel() // will later call callback.initialize(loginName, callbackAddress) } } ConnectReturnQosServer retQos = authenticate.connect(driver.getAddressServer(), conQos); this.secretSessionId = retQos.getSecretSessionId(); receiver.setSecretSessionId(retQos.getSecretSessionId()); // executeResponse needs it executeResponse(receiver, retQos.toXml(), SocketUrl.SOCKET_TCP); driver.addClient(this.secretSessionId, this); } else if (MethodName.DISCONNECT == receiver.getMethodName()) { this.disconnectIsCalled = true; executeResponse(receiver, Constants.RET_OK, SocketUrl.SOCKET_TCP); // ACK the disconnect to the client and then proceed to the server core // Note: the disconnect will call over the CbInfo our shutdown as well // setting sessionId = null prevents that our shutdown calls disconnect() again. authenticate.disconnect(driver.getAddressServer(), receiver.getSecretSessionId(), receiver.getQos()); shutdown(); } } } catch (XmlBlasterException e) { if (log.isLoggable(Level.FINE)) log.fine("Can't handle message, throwing exception back to client: " + e.toString()); try { if (receiver.getMethodName() != MethodName.PUBLISH_ONEWAY) executeException(receiver, e, false); else log.warning("Can't handle publishOneway message, ignoring exception: " + e.toString()); if (e.getErrorCode().equals(ErrorCode.USER_SECURITY_AUTHENTICATION_ACCESSDENIED) || e.getErrorCode().equals(ErrorCode.USER_SECURITY_AUTHENTICATION_ILLEGALARGUMENT)) { shutdown(); // cleanup to avoid thread/memory leak for a client trying again an again } } catch (Throwable e2) { log.severe("Lost connection, can't deliver exception message: " + e.toString() + " Reason is: " + e2.toString()); shutdown(); } } catch (IOException e) { if (running != false) { // Only if not triggered by our shutdown:sock.close() if (log.isLoggable(Level.FINE)) log.fine("Lost connection to client: " + e.toString()); shutdown(); } } catch (Throwable e) { e.printStackTrace(); log.severe("Lost connection to client: " + e.toString()); shutdown(); } } /** * Flush the data to the socket. * Overwrites SocketExecutor.sendMessage() */ protected void sendMessage(byte[] msg, boolean udp) throws IOException { I_ProgressListener listener = this.progressListener; try { if (listener != null) { listener.progressWrite("", 0, msg.length); } else log.fine("The progress listener is null"); if (udp && this.sockUDP!=null && this.sock!=null) { DatagramPacket dp = new DatagramPacket(msg, msg.length, this.sock.getInetAddress(), this.sock.getPort()); //DatagramPacket dp = new DatagramPacket(msg, msg.length, sock.getInetAddress(), 32001); this.sockUDP.send(dp); if (log.isLoggable(Level.FINE)) log.fine("UDP datagram is send"); } else { int bytesLeft = msg.length; int bytesRead = 0; synchronized (oStream) { while (bytesLeft > 0) { int toRead = bytesLeft > this.maxChunkSize ? this.maxChunkSize : bytesLeft; oStream.write(msg, bytesRead, toRead); oStream.flush(); bytesRead += toRead; bytesLeft -= toRead; if (listener != null) listener.progressWrite("", bytesRead, msg.length); } } } if (listener != null) { listener.progressWrite("", msg.length, msg.length); } } catch (IOException ex) { if (listener != null) { listener.clearCurrentWrites(); listener.clearCurrentReads(); } throw ex; } } /** * Serve a client, we block until a message arrives ... */ public void run() { if (log.isLoggable(Level.FINER)) log.finer("Handling client request ..."); try { if (log.isLoggable(Level.FINE)) { Socket socket = this.sock; if (socket != null) log.fine("Client accepted, coming from host=" + socket.getInetAddress().toString() + " port=" + socket.getPort()); } while (running) { try { // blocks until a message arrives (see XbfParser.java) final MsgInfo[] msgInfoArr = MsgInfo.parse(glob, progressListener, iStream, getMsgInfoParserClassName(), driver.getPluginConfig()); if (msgInfoArr.length < 1) { log.warning(toString() + ": Got unexpected empty data from SOCKET, closing connection now"); break; } final MsgInfo msgInfo = msgInfoArr[0]; if (this.callCoreInSeparateThread) { executorService.execute(new Runnable() { public void run() { handleMessage(msgInfo, false); } }); } else { handleMessage(msgInfo, false); } } catch (Throwable e) { if (e.toString().indexOf("closed") != -1 || (e instanceof java.net.SocketException && e.toString().indexOf("Connection reset") != -1)) { if (log.isLoggable(Level.FINE)) log.fine(toString() + ": TCP socket is shutdown: " + e.toString()); } else if (e.toString().indexOf("EOF") != -1) { if (this.disconnectIsCalled) log.fine(toString() + ": Lost TCP connection after sending disconnect(): " + e.toString()); else log.warning(toString() + ": Lost TCP connection: " + e.toString()); } else { log.warning(toString() + ": Error parsing TCP data from '" + remoteSocketStr + "', check if client and server have identical compression or SSL settings: " + e.toString()); } if (e instanceof OutOfMemoryError || e instanceof IllegalArgumentException) { e.printStackTrace(); } if (e.getCause() != null && (e.getCause() instanceof OutOfMemoryError || e.getCause() instanceof IllegalArgumentException)) { e.printStackTrace(); } I_Authenticate auth = this.authenticate; if (auth != null) { // From the point of view of the incoming client connection we are dead // The callback dispatch framework may have another point of view (which is not of interest here) auth.connectionState(this.secretSessionId, ConnectionStateEnum.DEAD); } shutdown(); break; } } } finally { driver.removeClient(this); closeSocket(); if (log.isLoggable(Level.FINE)) log.fine("Deleted thread for '" + loginName + "'."); } } /** * @return Returns the secretSessionId. */ public String getSecretSessionId() { return this.secretSessionId; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -