📄 broadcaster.java
字号:
package net.sf.dz.daemon.tcp;import java.io.BufferedReader;import java.io.PrintWriter;import java.net.Socket;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.TreeMap;import java.util.TreeSet;import org.freehold.jukebox.logger.LogChannel;import net.sf.dz.daemon.onewire.OneWireServer;import net.sf.dz.daemon.onewire.DeviceContainer;import net.sf.dz.daemon.onewire.TemperatureContainer;import net.sf.dz.daemon.onewire.SwitchContainerListener;import net.sf.dz.daemon.onewire.TemperatureContainerListener;/** * The TCP broadcaster. * * Broadcasts the device state change notifications over the TCP connection. * * @author Copyright © <a href="mailto:vt@freehold.crocodile.org">Vadim Tkachenko</a> 2001-2002 * @version $Id: Broadcaster.java,v 1.10 2004/06/28 20:35:47 vtt Exp $ */public class Broadcaster extends Connector implements TemperatureContainerListener { public static final LogChannel CH_BC = new LogChannel("TCP/Broadcaster"); /** * Set of known temperature sensor addresses. */ private Set deviceSet = new TreeSet(); /** * Mapping from the sensor address to the last known temperature value. */ private Map temperatureCache = new TreeMap(); protected LogChannel getLogChannel() { return CH_BC; } protected String getAnnounce() { StringBuffer sb = new StringBuffer(); sb.append("DZ DAC Sensors").append(super.getAnnounce()).append("/"); synchronized ( deviceSet ) { for ( Iterator i = deviceSet.iterator(); i.hasNext(); ) { sb.append(" ").append(i.next().toString()); } } return sb.toString(); } protected String getServiceSignature() { return "DZ DAC Sensors"; } protected int getDefaultListenPort() { return 5000; } protected int getDefaultBroadcastPort() { return 5001; } protected void shutdown2(Throwable cause) throws Throwable { } public synchronized void currentTemperatureChanged(String address, double temperature) { temperatureCache.put(address, new Double(temperature)); for ( Iterator i = iterator(); i.hasNext(); ) { ConnectionHandler ch = (ConnectionHandler)i.next(); try { ch.send(address + " " + temperature); } catch ( IllegalStateException ex ) { // VT: NOTE: This usually happens when the client has // already disconnected and the handler is stopped, but not // removed from the list yet. When time permits, more // elegant solution is due. i.remove(); } } } protected synchronized void broadcastArrival(String address, String type) { // VT: Type is ignored at this point for ( Iterator si = serverSet.iterator(); si.hasNext(); ) { OneWireServer s = (OneWireServer)si.next(); DeviceContainer dc = s.getDeviceContainer(address); // The only containers we care about are temperature containers if ( (dc != null) && (dc instanceof TemperatureContainer) ) { deviceSet.add(address); announce(getAnnounce()); for ( Iterator i = iterator(); i.hasNext(); ) { ConnectionHandler ch = (ConnectionHandler)i.next(); ch.iHave(); ch.send("+ " + address); } return; } } } protected synchronized void broadcastDeparture(String address) { if ( !deviceSet.contains(address) ) { // This is not ours return; } deviceSet.remove(address); temperatureCache.remove(address); announce(getAnnounce()); for ( Iterator i = iterator(); i.hasNext(); ) { ConnectionHandler ch = (ConnectionHandler)i.next(); ch.iHave(); ch.send("- " + address); } } protected synchronized void broadcastFault(String address, String message) { for ( Iterator i = iterator(); i.hasNext(); ) { ConnectionHandler ch = (ConnectionHandler)i.next(); ch.send("E " + address + " " + message); } } protected ConnectionHandler createHandler(Socket socket, BufferedReader br, PrintWriter pw) { return new BroadcastHandler(socket, br, pw); } protected boolean isUnique() { return false; } protected void cleanup() { } protected class BroadcastHandler extends ConnectionHandler { public BroadcastHandler(Socket socket, BufferedReader br, PrintWriter pw) { super(socket, br, pw); } protected void sayHello() { synchronized ( temperatureCache ) { // Tell them what we have iHave(); // Send the set of available temperatures for ( Iterator i = temperatureCache.keySet().iterator(); i.hasNext(); ) { String address = (String)i.next(); Double temperature = (Double)temperatureCache.get(address); send(address + " " + temperature); } } } /** * Send the 'IHAVE' string. * * This method should be called every time there's an arrival or a * departure. */ public void iHave() { StringBuffer sb = new StringBuffer(); Set addressSet = null; sb.append("IHAVE ").append(Integer.toString(deviceSet.size())).append(":"); for ( Iterator i = deviceSet.iterator(); i.hasNext(); ) { if ( i.hasNext() ) { sb.append(" "); } sb.append(i.next().toString()); } send(sb.toString()); } protected CommandParser createParser() { return new BroadcastParser(); } protected class BroadcastParser extends CommandParser { protected void parse2(String line) throws Throwable { // The broadcaster doesn't really parse anything, all the // input is ignored } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -