📄 handleclient.java
字号:
/*------------------------------------------------------------------------------Name: HandleClient.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.protocol.socket;import java.io.IOException;import java.net.DatagramPacket;import java.net.DatagramSocket;import java.net.Socket;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.engine.qos.ConnectReturnQosServer;import org.xmlBlaster.protocol.I_Authenticate;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.protocol.socket.SocketExecutor;import org.xmlBlaster.util.protocol.socket.SocketUrl;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.xbformat.I_ProgressListener;import org.xmlBlaster.util.xbformat.MsgInfo;import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;import edu.emory.mathcs.backport.java.util.concurrent.Executors;/** * Holds one socket connection to a client and handles * all requests from one client with plain socket messaging. * <p /> * <ol> * <li>We block on the socket input stream to read incoming messages * in a separate thread (see run() method)</li> * <li>We send update() and ping() back to the client</li> * </ol> * * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a>. */public class HandleClient extends SocketExecutor implements Runnable{ private String ME = "HandleClient"; private static Logger log = Logger.getLogger(HandleClient.class.getName()); private SocketDriver driver; /** The singleton handle for this authentication server */ private I_Authenticate authenticate; private CallbackSocketDriver callback; /** The socket connection to/from one client */ protected DatagramSocket sockUDP; //private String cbKey = null; // Remember the key for the Global map /** Holds remote "host:port" for logging */ protected String remoteSocketStr; /** The socket connection to/from one client */ protected Socket sock; /** The unique client sessionId */ private String secretSessionId = null; private boolean callCoreInSeparateThread=true; protected volatile static ExecutorService executorService; protected boolean disconnectIsCalled = false; /** * Creates an instance which serves exactly one client. */ public HandleClient(Global glob, SocketDriver driver, Socket sock, DatagramSocket sockUDP) throws IOException { this.driver = driver; this.sock = sock; this.sockUDP = sockUDP; this.authenticate = driver.getAuthenticate(); this.ME = driver.getType()+"-HandleClient"; if (executorService == null) { synchronized (HandleClient.class) { if (executorService == null) { executorService = Executors.newCachedThreadPool(); } } } super.initialize(glob, driver.getAddressServer(), this.sock.getInputStream(), this.sock.getOutputStream()); super.setXmlBlasterCore(driver.getXmlBlaster()); this.remoteSocketStr = this.sock.getInetAddress().toString() + ":" + this.sock.getPort(); // You should not activate SoTimeout, as this timeouts if InputStream.read() blocks too long. // But we always block on input read() to receive update() messages. setSoTimeout(driver.getAddressServer().getEnv("SoTimeout", 0L).getValue()); // switch off this.sock.setSoTimeout((int)this.soTimeout); if (log.isLoggable(Level.FINE)) log.fine(this.driver.getAddressServer().getEnvLookupKey("SoTimeout") + "=" + this.soTimeout); setSoLingerTimeout(driver.getAddressServer().getEnv("SoLingerTimeout", soLingerTimeout).getValue()); if (log.isLoggable(Level.FINE)) log.fine(this.driver.getAddressServer().getEnvLookupKey("SoLingerTimeout") + "=" + getSoLingerTimeout()); if (getSoLingerTimeout() >= 0L) { // >0: Try to send any unsent data on close(socket) (The UNIX kernel waits very long and ignores the given time) // =0: Discard remaining data on close() <-- CHOOSE THIS TO AVOID BLOCKING close() calls this.sock.setSoLinger(true, (int)this.soLingerTimeout); } else this.sock.setSoLinger(false, 0); // false: default handling, kernel tries to send queued data after close() (the 0 is ignored) this.callCoreInSeparateThread = driver.getAddressServer().getEnv("callCoreInSeparateThread", callCoreInSeparateThread).getValue(); Thread t = new Thread(this, "XmlBlaster."+this.driver.getType() + (this.driver.isSSL()?".SSL":"")); int threadPrio = driver.getAddressServer().getEnv("threadPrio", Thread.NORM_PRIORITY).getValue(); try { t.setPriority(threadPrio); if (log.isLoggable(Level.FINE)) log.fine("-plugin/socket/threadPrio "+threadPrio); } catch (IllegalArgumentException e) { log.warning("Your -plugin/socket/threadPrio " + threadPrio + " is out of range, we continue with default setting " + Thread.NORM_PRIORITY); } t.start(); } public String getType() { return this.driver.getType(); } synchronized public boolean isShutdown() { return (this.running == false); } /* * TODO: Is this needed anymore? * @return */ protected boolean hasConnection() { return (this.sock != null); } /** * Close connection for one specific client */ public void shutdown() { if (!running) return; synchronized (this) { if (!running) return; if (log.isLoggable(Level.FINE)) log.fine("Shutdown cb connection to " + loginName + " ..."); //if (cbKey != null) // driver.getGlobal().removeNativeCallbackDriver(cbKey); running = false; driver.removeClient(this); clearResponseListenerMap(); freePendingThreads(); } closeSocket(); } public String toString() { StringBuffer ret = new StringBuffer(256); ret.append(getType()).append("-").append(this.addressConfig.getName()); if (loginName != null && loginName.length() > 0) ret.append("-").append(loginName); else ret.append("-").append(getSecretSessionId()); ret.append("-").append(remoteSocketStr); return ret.toString(); } private void closeSocket() { try { if (iStream != null) { iStream.close(); /*iStream=null;*/ } } catch (IOException e) { log.warning(e.toString()); } try { if (oStream != null) { oStream.close(); /*oStream=null;*/ } } catch (IOException e) { log.warning(e.toString()); } Socket sock = this.sock; try { if (sock != null) { sock.close(); this.sock=null; } } catch (IOException e) { log.warning(e.toString()); } if (log.isLoggable(Level.FINE)) log.fine("Closed socket for '" + loginName + "'."); } /** * Updating multiple messages in one sweep, callback to client. * <p /> * @param expectingResponse is WAIT_ON_RESPONSE or ONEWAY * @return null if oneway * @see org.xmlBlaster.engine.RequestBroker */ public final String[] sendUpdate(String cbSessionId, MsgUnitRaw[] msgArr, boolean expectingResponse) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering update: id=" + cbSessionId + " numSend=" + msgArr.length + " oneway=" + !expectingResponse); if (!running) throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION, ME, "update() invocation ignored, we are shutdown."); if (msgArr == null || msgArr.length < 1) { log.severe("The argument of method update() are invalid"); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Illegal sendUpdate() argument"); } try { if (expectingResponse) { MsgInfo parser = new MsgInfo(glob, MsgInfo.INVOKE_BYTE, MethodName.UPDATE, cbSessionId, progressListener); parser.addMessage(msgArr); Object response = requestAndBlockForReply(parser, SocketExecutor.WAIT_ON_RESPONSE, false); if (log.isLoggable(Level.FINE)) log.fine("Got update response " + response.toString()); return (String[])response; // return the QoS } else { MsgInfo parser = new MsgInfo(glob, MsgInfo.INVOKE_BYTE, MethodName.UPDATE_ONEWAY, cbSessionId, progressListener); parser.addMessage(msgArr); requestAndBlockForReply(parser, SocketExecutor.ONEWAY, this.driver.useUdpForOneway()); return null; } } catch (XmlBlasterException e) { throw XmlBlasterException.tranformCallbackException(e); } catch (IOException e1) { if (log.isLoggable(Level.FINE)) log.fine("IO exception: " + e1.toString()); throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION, ME, "Callback of " + msgArr.length + " messages failed", e1); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -