📄 tcpserviceconnector.java
字号:
package net.sf.dz.daemon.tcp.client;import java.io.BufferedReader;import java.io.InputStreamReader;import java.io.IOException;import java.io.PrintWriter;import java.net.InetAddress;import java.net.Socket;import java.util.LinkedList;import java.util.List;import javax.net.ssl.SSLException;import org.freehold.jukebox.logger.LogChannel;import org.freehold.jukebox.service.ActiveService;import net.sf.dz.util.SSLContextFactory;/** * TCP service connector. * * <p> * * Connect the the given service and keeps the information going in both * directions. * * @author Copyright © <a href="mailto:vt@freehold.crocodile.org">Vadim Tkachenko</a> 2001-2004 * @version $Id: TcpServiceConnector.java,v 1.2 2004/06/28 20:35:48 vtt Exp $ */public class TcpServiceConnector extends ActiveService implements ServiceConnector { public static final LogChannel CH_TSC = new LogChannel("TcpServiceConnector"); /** * Service provider that created this connector. * * <p> * * VT: FIXME: This reference breaks the interface and creates a side * effect. Currently, the service provider is fed with the data received * from the connection directly. This must be replaced with * interface-compliant listener mechanism. */ private final ServiceProvider serviceProvider; /** * Remote host to connect to. */ private final InetAddress remoteHost; /** * Port on the remote host to connect to. */ private final int remotePort; /** * True if secure connection was requested. */ private boolean secure; /** * Password to access the certificate. */ private String password; private Socket socket; private BufferedReader br; private PrintWriter pw; private List readQueue = new LinkedList(); private int locked = 0; public TcpServiceConnector(ServiceProvider serviceProvider, InetAddress remoteHost, int remotePort, boolean secure, String password) { this.serviceProvider = serviceProvider; this.remoteHost = remoteHost; this.remotePort = remotePort; this.secure = secure; this.password = password; } public synchronized void send(String address, String message) throws IOException { if ( pw == null ) { throw new IOException("Stream to " + remoteHost + ":" + remotePort + " not available"); } // VT: FIXME: Make sure the service breaks and shuts down if there's // a problem sending data over pw.println(address + " " + message); pw.flush(); } private synchronized void getLock() { while ( locked > 0 ) { try { wait(); } catch ( InterruptedException iex ) { complain(LOG_WARNING, CH_TSC, "Interrupted, ignored:", iex); } } locked++; complain(LOG_DEBUG, CH_TSC, "getLock(): " + locked); } private synchronized void releaseLock() { if ( locked == 0 ) { throw new IllegalStateException("Lock is already released"); } locked--; complain(LOG_DEBUG, CH_TSC, "releaseLock(): " + locked); notify(); } public synchronized String sendExclusive(String address, String message) throws IOException { try { getLock(); flushReadQueue(); send(address, message); while ( readQueue.isEmpty() ) { wait(); if ( !isEnabled() || pw == null || br == null ) { throw new IOException("Shutting down?"); } } String response = (String)readQueue.remove(0); if ( response == null ) { // VT: FIXME: Can LinkedList contain nulls? pw = null; br = null; socket.close(); throw new IOException("EOF"); } complain(LOG_INFO, CH_TSC, "Command '" + address + " " + message + "', response " + response); return response; } catch ( InterruptedException iex ) { IOException ioex = new IOException("Interrupted"); ioex.initCause(iex); throw ioex; } finally { flushReadQueue(); releaseLock(); } } private synchronized void flushReadQueue() { while ( !readQueue.isEmpty() ) { synchronized ( readQueue ) { serviceProvider.serviceConnectorUpdate(this, (String)readQueue.remove(0)); } } } public String toString() { // VT: FIXME: Tell them about the current status and whether we're // currently secure and whether we were requested to be secure return "TCP[" + remoteHost + ":" + remotePort + ", " + (secure ? "secure" : "insecure") + "]"; } protected void startup() throws Throwable { if ( secure ) { complain(LOG_NOTICE, CH_TSC, "Secure connection requested"); try { socket = SSLContextFactory.createContext(password).getSocketFactory().createSocket(remoteHost, remotePort); } catch ( SSLException sslex ) { complain(LOG_WARNING, CH_TSC, "Can't establish a secure connection to " + remoteHost + ":" + remotePort, sslex); complain(LOG_WARNING, CH_TSC, "Reverting to insecure connection"); } } if ( socket == null ) { socket = new Socket(remoteHost, remotePort); } br = new BufferedReader(new InputStreamReader(socket.getInputStream())); if ( !secure ) { // In case we're talking to a secure socket implementation, // we'll get stuck reading the input stream - they won't tell us // anything, nor would they break away (well, maybe after a // really long timeout). Therefore, we'll have to give them a // kick so they kick us out and we'll get a visible indication // of a failure. pw = new PrintWriter(socket.getOutputStream()); pw.println(""); pw.println(""); pw.println(""); pw.println(""); pw.println(""); pw.println(""); pw.flush(); // If they're secure, and we're not, we'll get kicked out right // here... } } protected void execute() throws Throwable { while ( isEnabled() ) { // VT: NOTE: Screw the errors, we can afford to be dumb - the // connector factory will take care of us next time someone // requests this service // VT: FIXME: The only thing to take care of is the SSL // connection. There is just one problem - when we want the // other end to support SSL and it doesn't, in this case we'll // get the SSL exception right here. Have to notify our service // provider. String line = br.readLine(); if ( line == null ) { complain(LOG_ERR, CH_TSC, "Socket broken, exiting loop"); break; } complain(LOG_DEBUG, CH_TSC, "Line read: " + line); // Now, queue the data and let everybody fight over it synchronized ( this ) { readQueue.add(line); notify(); } // Including us - but instead of just waiting, we'll have to // keep rolling - 'cause *we* supply the data synchronized ( this ) { if ( locked == 0 ) { try { getLock(); flushReadQueue(); } finally { releaseLock(); } } } } } protected void shutdown(Throwable cause) throws Throwable { complain(LOG_NOTICE, CH_TSC, "Shutting down", cause); // We need to notify the service provider, which in turn must make // sure that all the consumers of devices formerly provided by this // connector are notified and this connector is removed from its // references. serviceProvider.connectorIsDead(this); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -