📄 socketdriver.java
字号:
/*------------------------------------------------------------------------------Name: SocketDriver.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: SocketDriver class to invoke the xmlBlaster server in the same JVM.------------------------------------------------------------------------------*/package org.xmlBlaster.protocol.socket;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.qos.AddressServer;import org.xmlBlaster.protocol.I_Authenticate;import org.xmlBlaster.protocol.I_XmlBlaster;import org.xmlBlaster.protocol.I_Driver;import org.xmlBlaster.util.plugin.I_PluginConfig;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.protocol.socket.SocketUrl;import org.xmlBlaster.util.xbformat.MsgInfo;import java.net.DatagramSocket;import java.net.DatagramPacket;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;import java.util.Set;import java.util.HashSet;import java.util.Map;import java.util.HashMap;import java.util.Iterator;import java.io.ByteArrayInputStream;import java.io.InputStream;/** * Socket driver class to invoke the xmlBlaster server over a native message format * <p /> * This "SOCKET:" driver needs to be registered in xmlBlaster.properties * and will be started on xmlBlaster startup, for example: * <pre> * ProtocolPlugin[SOCKET][1.0]=org.xmlBlaster.protocol.socket.SocketDriver * * CbProtocolPlugin[SOCKET][1.0]=org.xmlBlaster.protocol.socket.CallbackSocketDriver * </pre> * * The variable plugin/socket/port (default 7607) sets the socket server port, * you may change it in xmlBlaster.properties or on command line: * <pre> * java -jar lib/xmlBlaster.jar -plugin/socket/port 9090 * </pre> * * The interface I_Driver is needed by xmlBlaster to instantiate and shutdown * this driver implementation. * <p /> * All adjustable parameters are explained in {@link org.xmlBlaster.protocol.socket.SocketDriver#usage()} * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> * @author <a href="mailto:bpoka@axelero.hu">Balázs Póka</a> (SSL embedding, zlib compression) * * @see org.xmlBlaster.util.xbformat.MsgInfo * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.socket.html">The protocol.socket requirement</a> */public class SocketDriver extends Thread implements I_Driver /* which extends I_Plugin */, SocketDriverMBean{ private String ME = "SocketDriver"; /** The global handle */ private Global glob; private static Logger log = Logger.getLogger(SocketDriver.class.getName()); /** The singleton handle for this authentication server */ private I_Authenticate authenticate; /** The singleton handle for this xmlBlaster server */ private I_XmlBlaster xmlBlasterImpl; /** The socket address info object holding hostname (useful for multi homed hosts) and port */ private SocketUrl socketUrl; /** The socket server */ private ServerSocket listen = null; /** The URL which clients need to use to access this server, e.g. "server.mars.univers:6701" */ private DatagramSocket socketUDP = null; /** State of server */ private Thread listenerUDP; private int sslMarker; private boolean running = true; private boolean runningUDP = true; private boolean listenerReady = false; private boolean listenerReadyUDP = false; /** Remember all client connections */ private Set handleClientSet = new HashSet(); private Map handleClientMap = new HashMap(); /** The address configuration */ private AddressServer addressServer; private PluginInfo pluginInfo; private boolean startUdpListener = false; /** * Setting by plugin configuration, see xmlBlasterPlugins.xml, for example * <br /> * <attribute id='useUdpForOneway'>true</attribute> */ private boolean useUdpForOneway = false; /** My JMX registration */ protected Object mbeanHandle; protected ContextNode contextNode; protected boolean isShutdown; void addClient(String sessionId, HandleClient h) { synchronized(handleClientMap) { handleClientMap.put(sessionId, h); } } HandleClient getClient(String sessionId) { synchronized(handleClientMap) { return (HandleClient) handleClientMap.get(sessionId); } } public I_PluginConfig getPluginConfig() { return this.pluginInfo; } /** * There is exactly one UDP listener thread which receives datagrams for all clients. * The datagrams are forwarded to the correct client with the sessionId * Only the methods publishOneway() and updateOneway() may use UDP, you can * choose TCP or UDP for those messages with the * <tt>plugin/socket/useUdpForOneway</tt> setting */ class UDPListener implements Runnable { static final int MAX_PACKET_SIZE = 1024 * 10; public void run() { try { try { socketUDP = new DatagramSocket(socketUrl.getPort(), socketUrl.getInetAddress()); } catch (java.net.SocketException e) { log.severe("Cannot open UDP socket '" + socketUrl.getUrl() + "' : " + e.toString()); return; } int threadPrio = getAddressServer().getEnv("threadPrio", Thread.NORM_PRIORITY).getValue(); try { Thread.currentThread().setPriority(threadPrio); if (log.isLoggable(Level.FINE)) log.fine("-" + getEnvPrefix() + "threadPrio " + threadPrio); } catch (IllegalArgumentException e) { log.warning("Your -" + getEnvPrefix() + "threadPrio " + threadPrio + " is out of range, we continue with default setting " + Thread.NORM_PRIORITY); } log.info("Started successfully " + getType() + " UDP driver on '" + socketUrl.getUrl() + "'"); byte packetBuffer[] = new byte[MAX_PACKET_SIZE]; DatagramPacket packet = new DatagramPacket(packetBuffer, packetBuffer.length); MsgInfo receiver = null; listenerReadyUDP = true; while (runningUDP) { try { socketUDP.receive(packet); } catch (IOException e) { if (e.toString().indexOf("closed") == -1) { log.severe("Error receiving packet from '" + socketUrl.getUrl() + "' : " + e.toString()); } else { if (log.isLoggable(Level.FINE)) log.fine("UDP datagram socket shutdown '" + socketUrl.getUrl() + "' : " + e.toString()); } return; } if (log.isLoggable(Level.FINE)) log.fine("UDP packet arrived, size=" + packet.getLength() + " bytes"); if (!runningUDP) { log.info("Closing server '" + socketUrl.getUrl() + "'"); return; } int actualSize = packet.getLength(); if (packet.getLength() > MAX_PACKET_SIZE) { log.warning("Packet has been truncated, size=" + packet.getLength() + ", MAX_PACKET_SIZE=" + MAX_PACKET_SIZE); actualSize = MAX_PACKET_SIZE; } InputStream iStream = new ByteArrayInputStream(packet.getData(), 0, actualSize); try { receiver = MsgInfo.parse(glob, null, iStream, null/*getMsgInfoParserClassName()*/, getPluginConfig())[0]; } catch (Throwable e) { log.severe("Error parsing data from UDP packet: " + e); continue; } String sessionId = receiver.getSecretSessionId(); HandleClient hh = getClient(sessionId); if (hh == null) log.severe("Request from unknown client, sessionId: " + sessionId); else hh.handleMessage(receiver, true); } // while (runningUDP) { } finally { listenerReadyUDP = false; if (socketUDP != null) { socketUDP.close(); socketUDP = null; } } } } /** * Creates the driver. * Note: getName() is enforced by interface I_Driver, but is already defined in Thread class */ public SocketDriver() { super("XmlBlaster.SocketDriver"); setDaemon(true); } /** * Access the xmlBlaster internal name of the protocol driver. * @return The configured [type] in xmlBlaster.properties, defaults to "SOCKET" */ public String getProtocolId() { return (this.pluginInfo == null) ? "SOCKET" : this.pluginInfo.getType(); } /** * Enforced by I_Plugin * @return The configured type in xmlBlaster.properties, defaults to "SOCKET" */ public String getType() { return getProtocolId(); } /** * The command line key prefix * @return The configured type in xmlBlasterPlugins.xml, defaults to "plugin/socket" */ public String getEnvPrefix() { return (addressServer != null) ? addressServer.getEnvPrefix() : "plugin/"+getType().toLowerCase(); } /** Enforced by I_Plugin */ public String getVersion() { return (this.pluginInfo == null) ? "1.0" : this.pluginInfo.getVersion(); } /** * Switch on/off UDP socket listener */ public boolean startUdpListener() { return this.startUdpListener; } /** * Configuration option to use UDP for updateOneway() calls. * <br /> * Typically a setting from the plugin configuration, see xmlBlasterPlugins.xml, for example * <br /> * <attribute id='useUdpForOneway'>true</attribute> */ public boolean useUdpForOneway() { return this.useUdpForOneway; } /** * This method is called by the PluginManager (enforced by I_Plugin). * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global,org.xmlBlaster.util.plugin.PluginInfo) */ public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo) throws XmlBlasterException { this.pluginInfo = pluginInfo; this.glob = glob; this.ME = getType(); org.xmlBlaster.engine.ServerScope engineGlob = (org.xmlBlaster.engine.ServerScope)glob.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope); if (engineGlob == null) throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "could not retreive the ServerNodeScope. Am I really on the server side ?"); // For JMX instanceName may not contain "," String vers = ("1.0".equals(getVersion())) ? "" : getVersion(); this.contextNode = new ContextNode(ContextNode.SERVICE_MARKER_TAG, "SocketDriver[" + getType() + vers + "]", glob.getContextNode()); this.mbeanHandle = this.glob.registerMBean(this.contextNode, this); try { this.authenticate = engineGlob.getAuthenticate(); if (this.authenticate == null) { throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "authenticate object is null"); } I_XmlBlaster xmlBlasterImpl = this.authenticate.getXmlBlaster(); if (xmlBlasterImpl == null) { throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "xmlBlasterImpl object is null"); } init(glob, new AddressServer(glob, getType(), glob.getId(), pluginInfo.getParameters()), this.authenticate, xmlBlasterImpl); this.useUdpForOneway = this.addressServer.getEnv("useUdpForOneway", this.useUdpForOneway).getValue(); this.startUdpListener = this.addressServer.getEnv("startUdpListener", this.startUdpListener).getValue(); // Now we have logging ... if (log.isLoggable(Level.FINE)) log.fine("Using pluginInfo=" + this.pluginInfo.toString() + ", startUdpListener=" + this.startUdpListener + ", useUdpForOneway=" + this.useUdpForOneway); activate(); } catch (XmlBlasterException ex) { throw ex; } catch (Throwable ex) { throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "init. Could'nt initialize the driver.", ex); } } /** * Get the address how to access this driver. * @return "server.mars.univers:6701" */ public String getRawAddress() { return this.socketUrl.getUrl(); // this.socketUrl.getHostname() + ":" + this.socketUrl.getPort();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -