📄 discoverythread.java
字号:
import java.io.*;import java.util.*;import java.util.regex.*;import java.net.*;import java.text.*;import java.sql.*;import no.ntnu.nav.logger.*;import no.ntnu.nav.ConfigParser.*;import no.ntnu.nav.Database.*;import no.ntnu.nav.SimpleSnmp.*;import no.ntnu.nav.event.*;import no.ntnu.nav.netboxinfo.*;import no.ntnu.nav.util.*;/** * This class schedules the netboxes, assigns them to threads and runs * the plugins. */public class DiscoveryThread extends Thread{ private static ConfigParser navCp; private static ConfigParser myCp; private static Timer timer; private static Timer updateFromARPTimer; private static CheckRunQTask checkRunQTask; private static SortedMap ipRunQ; private static Stack idleThreads; private static int maxThreadCnt; private static int threadCnt; private static Integer idleThreadLock = new Integer(0); private static Integer insertNetboxLock = new Integer(0); private static Integer updateMacLock = new Integer(0); private static Integer checkRouterLock = new Integer(0); private static Integer fileLock = new Integer(0); private static LinkedList threadIdList = new LinkedList(); private static int curMaxTid = 0; private static int netboxCnt; private static int netboxHigh; private static long nbProcessedCnt; private static List csList = new ArrayList(); private static int baseCheckInterval; private static int maxMacChecks; private static double checkMultiplier; private static boolean ipScanMode; private static LinkedList nextIpQueue = new LinkedList(); private static Set seenMacs = Collections.synchronizedSet(new HashSet()); private static Set macsInQueue = Collections.synchronizedSet(new HashSet()); private static int numFound = 0; private static String allNetboxesBulkFile = "all-netboxes.bulk"; private static String newNetboxesBulkFile = "new-netboxes.bulk"; private static Set netboxesInAllFile = Collections.synchronizedSet(new HashSet()); private static Set netboxesInNewFile = Collections.synchronizedSet(new HashSet()); private static String DEFAULT_ROOMID = "dummy"; private static String DEFAULT_ORGID = "dummy"; private static String DEFAULT_LOCATIONID = "dummy"; private static boolean autoInsertNetbox = false; // Object data String tid; IP ip; SimpleSnmp sSnmp; // Static init public static void init(int numThreads, ConfigParser _myCp, ConfigParser _navCp) { maxThreadCnt = numThreads; myCp = _myCp; navCp = _navCp; myCp.setObject("navCp", navCp); // Create the netbox map and the run queue ipRunQ = new TreeMap(); // Read config String cs = myCp.get("communitys"); String[] csA = cs.split(","); for (int i=0; i < csA.length; i++) { csList.add(csA[i].trim()); } baseCheckInterval = Integer.parseInt(myCp.get("baseCheckInterval")); maxMacChecks = Integer.parseInt(myCp.get("maxMacChecks")); checkMultiplier = Double.parseDouble(myCp.get("checkMultiplier")); timer = new Timer(); ipScanMode = true; if (myCp.get("allNetboxesBulkFile") != null) allNetboxesBulkFile = myCp.get("allNetboxesBulkFile"); if (myCp.get("newNetboxesBulkFile") != null) newNetboxesBulkFile = myCp.get("newNetboxesBulkFile"); if (myCp.get("autoInsertNetbox") != null) { try { autoInsertNetbox = myCp.get("autoInsertNetbox").startsWith("t"); } catch (Exception e) { } } parseBulkFile(allNetboxesBulkFile, netboxesInAllFile); parseBulkFile(newNetboxesBulkFile, netboxesInNewFile); // For dupe checking loadSeenMacs(); // Fill the runq from db fillRunQ(); updateFromARPTimer = new Timer(); long updateFromARPFrequency = 1800; try { updateFromARPFrequency = Integer.parseInt(myCp.get("updateFromARPFrequency")); } catch (Exception e) { } updateFromARPFrequency *= 1000; updateFromARPTimer.schedule(new UpdateFromARPTask(), updateFromARPFrequency, updateFromARPFrequency); Log.d("INIT", "Starting timer for IP query scheduling"); scheduleCheckRunQ(0); } private static void loadSeenMacs() { try { ResultSet rs = Database.query("SELECT mac FROM autodisc_mac_scanned WHERE attempts IS NULL"); while (rs.next()) { seenMacs.add(rs.getString("mac")); } } catch (SQLException e) { e.printStackTrace(System.err); } } private static void updateFromARP() { if (true) return; try { // All macs we haven't tried before ResultSet rs = Database.query("SELECT DISTINCT ip, mac FROM arp WHERE end_time='infinity' AND mac NOT IN (SELECT mac FROM autodisc_mac_scanned)"); while (rs.next()) { IP ip = new IP(rs.getString("ip"), rs.getString("mac")); addToRunQ(ip); } } catch (SQLException e) { e.printStackTrace(System.err); } } private static void fillRunQ() { updateFromARP(); try { // All macs we haven't given up on ResultSet rs = Database.query("SELECT ip, autodisc_mac_scanned.mac, time, attempts FROM autodisc_mac_scanned JOIN arp ON (autodisc_mac_scanned.mac = arp.mac AND arp.end_time='infinity') WHERE attempts < " + maxMacChecks); while (rs.next()) { IP ip = new IP(rs.getString("ip"), rs.getString("mac"), rs.getLong("time"), rs.getInt("attempts")); addToRunQ(ip); } } catch (SQLException e) { e.printStackTrace(System.err); } } private static void parseBulkFile(String fn, Set nbIpSet) { try { char sep = File.separatorChar; File f = new File(navCp.get("NAVROOT") + sep + "var" + sep + "log" + sep + fn); if (!f.exists()) return; BufferedReader in = new BufferedReader(new FileReader(f)); String s; while ( (s=in.readLine()) != null) { String[] g = s.split(":"); if (g.length >= 2) { nbIpSet.add(g[1]); } } } catch (Exception e) { e.printStackTrace(System.err); } } private void writeBulkFile(String fn, String ip, String catid, String ro, String dns, String sysname, String sysobjectid, String sysdescr) { synchronized (fileLock) { try { char sep = File.separatorChar; File f = new File(navCp.get("NAVROOT") + sep + "var" + sep + "log" + sep + fn); PrintStream out = new PrintStream(new FileOutputStream(f, true), true); // # dnsnavn, sysobjectid, system.sysdescr (avkappet om den er lang) String descr = "# " + dns + " ("+sysname+", " + ro + "), " + sysobjectid + ", " + sysdescr.substring(0, Math.min(70, sysdescr.length())); out.println(descr); // #roomid:ip:orgid:catid:[ro:serial:rw:function:subcat1:subcat2..] String s = DEFAULT_ROOMID + ":" + ip + ":" + DEFAULT_ORGID + ":" + catid + ":" + ro; out.println(s); out.close(); } catch (Exception e) { e.printStackTrace(System.err); } } } private static IP getNextIp() { synchronized (nextIpQueue) { if (nextIpQueue.isEmpty()) { // Init local IPs try { Enumeration e = NetworkInterface.getNetworkInterfaces(); while(e.hasMoreElements()) { NetworkInterface netface = (NetworkInterface)e.nextElement(); Enumeration e2 = netface.getInetAddresses(); while (e2.hasMoreElements()){ InetAddress ip = (InetAddress) e2.nextElement(); if (ip instanceof Inet4Address && !ip.isLoopbackAddress()) { long ipnum = ipToNum(ip.getHostAddress()); ipnum = ipToNum("129.241.190.1"); nextIpQueue.add(new String[] { numToIp(ipnum), "u" } ); nextIpQueue.add(new String[] { numToIp(ipnum-1), "d" } ); } } } } catch (SocketException e) { e.printStackTrace(System.err); } if (nextIpQueue.isEmpty()) return null; } // Fetch next IP from queue String[] s = (String[])nextIpQueue.removeFirst(); String ipaddr = s[0]; long ipnum = ipToNum(ipaddr); if (s[1].equals("u")) ipnum++; else ipnum--; nextIpQueue.add(new String[] { numToIp(ipnum), s[1] } ); return new IP(ipaddr, null); } } private static void scheduleCheckRunQ(long l) { synchronized (timer) { if (checkRunQTask != null) checkRunQTask.cancel(); checkRunQTask = new CheckRunQTask(); // The delay can actually be negative due to inaccuracy in the Java timer l = Math.max(l, 0); Log.d("QUERY_NETBOX", "SCHEDULE_CHECK_RUN_Q", "Schedule check runq in " + l + " ms"); try { timer.schedule(checkRunQTask, l); } catch (IllegalStateException e) { timer = new Timer(); checkRunQTask = new CheckRunQTask(); timer.schedule(checkRunQTask, l); } } } private static void checkRunQ() { Log.setDefaultSubsystem("QUERY_IP"); // Try to get a free IP to check Object o; while ((o = removeRunQHead()) instanceof IP) { IP ip = (IP)o; Log.d("CHECK_RUN_Q", "Got ip: " + ip); // Try to get a free thread String tid = requestThread(); if (tid == null) { Log.d("CHECK_RUN_Q", "IP is available, but no threads are idle"); // Re-insert into queue addToRunQFront(ip); return; } // OK, start a new thread Log.d("CHECK_RUN_Q", "Starting new IP thread with id: " + tid + " to handle " + ip); new DiscoveryThread(tid, ip).start(); } // No more IPs, schedule re-run when the next is ready Long nextRun = (Long)o; Log.d("CHECK_RUN_Q", "No available IP, scheduling next check in " + nextRun + " ms (" + (nextRun.longValue()/1000) + " s)"); scheduleCheckRunQ(nextRun.longValue()); } private static void addToRunQ(IP ip) { addToRunQ(ip, false); } private static void addToRunQFront(IP ip) { addToRunQ(ip, true); } private static void addToRunQ(IP ip, boolean front) { Long nextRun = new Long(ip.getNextRun()); synchronized (ipRunQ) { if (ip.getMac() != null) { // MAC should only be in queue once if (macsInQueue.contains(ip.getMac())) return; } LinkedList l; if ( (l = (LinkedList)ipRunQ.get(nextRun)) == null) ipRunQ.put(nextRun, l = new LinkedList()); if (front) { l.addFirst(ip); } else { l.add(ip); } } if (ip.getMac() != null) { seenMacs.add(ip.getMac()); } } private static Object removeRunQHead() { Object o; if ((o = removeRunQHeadNoCheck()) instanceof IP) { IP ip = (IP)o; return ip; } return o; } private static int ipRunQSize() { synchronized (ipRunQ) { return ipRunQ.size(); } } private static Object removeRunQHeadNoCheck() { synchronized (ipRunQ) { if (ipRunQ.isEmpty()) { if (ipScanMode) { return getNextIp(); } return new Long(Long.MAX_VALUE / 2); // Infinity... } Long nextRun = (Long)ipRunQ.firstKey(); if (nextRun.longValue() > System.currentTimeMillis()) { // Head of queue is not yet ready to be run return new Long(nextRun.longValue() - System.currentTimeMillis()); } LinkedList l = (LinkedList)ipRunQ.get(nextRun); IP ip = (IP)l.removeFirst(); if (ip.getMac() != null) { macsInQueue.remove(ip.getMac()); } if (l.isEmpty()) ipRunQ.remove(nextRun); return ip; } } private static long ipToNum(String ip) { if (ip.length() == 0) return 0; String[] s = ip.split("\\."); long l = 0; long mul = 1; for (int i=0; i < s.length-1; i++) mul *= 255; for (int i=0; i < s.length; i++) { l += Integer.parseInt(s[i]) * mul; mul /= 255; } return l; } private static String numToIp(long num) { if (num == 0) return ""; String s = ""; long mul = 1; int dig=0; while (mul < num) mul *= 255; while (num > 0) { long l = num / mul; if (num == l*mul && dig < 3) l--; // IPs must be 4 digits if (l > 0) { s += l + "."; dig++; } num -= l * mul; mul /= 255; } return s.substring(0, s.length()-1); } private static String requestThread() { synchronized (idleThreadLock) { if (threadCnt < maxThreadCnt) { int tid = getFreeTid(); threadCnt++; //System.err.println("New thread, cnt="+(threadCnt+1)+" max="+max); return format(tid, String.valueOf(maxThreadCnt-1).length()); } return null; } } private static void incNumFound() { synchronized (idleThreadLock) { numFound++; } } private static void threadIdle(String tid) { synchronized (idleThreadLock) { returnTid(Integer.parseInt(tid)); //System.err.println("Del thread, cnt="+(threadCnt-1)); threadCnt--; } scheduleCheckRunQ(0); } private static int getFreeTid() { if (threadIdList.isEmpty()) { threadIdList.add(new Integer(curMaxTid++)); } return ((Integer)threadIdList.removeFirst()).intValue(); } private static void returnTid(int tid) { threadIdList.add(new Integer(tid)); } // Constructor public DiscoveryThread(String tid, IP ip) { this.tid = tid; this.ip = ip; } public boolean runningSNMP(String host) { try { //Log.d("RUNNING_SNMP", "Checking " + host + " for open SNMP port"); InetAddress inet = InetAddress.getByName(host); DatagramSocket s = new DatagramSocket(); s.setSoTimeout(100); try { s.connect(inet, 161); DatagramPacket p = new DatagramPacket(new byte[0], 0); s.send(p); s.receive(p); } catch (SocketTimeoutException e) { return true; } finally { s.close(); } } catch (Exception e) { // We get java.net.PortUnreachableException: ICMP Port Unreachable if no SNMP is running } return false; } public String accessSNMP() { sSnmp.setSocketTimeout(100); for (Iterator it = csList.iterator(); it.hasNext();) { String cs = (String) it.next(); sSnmp.setCs_ro(cs); //Log.d("ACCESS_SNMP", "Trying SNMP with cs " + cs); try { sSnmp.setTimeoutLimit(1); sSnmp.getNext("1", 1, false, true); // Ok! return cs; } catch (Exception e) { } } return null; } public void run() { Log.setDefaultSubsystem("QUERY_NETBOX_T"+tid); Log.setThreadId(tid); Log.d("RUN", "Thread " + tid + " starting work on ("+ip+")"); long beginTime = System.currentTimeMillis(); try { while (true) { String host = ip.getIp(); Log.setNetbox(ip.getIp()); sSnmp = SimpleSnmp.simpleSnmpFactory(); sSnmp.setHost(ip.getIp()); Log.d("RUN", "Testing ("+ip.getIp()+"): " + ip.getMac() + " (IPs in queue: " + ipRunQSize() + " Num found: " + numFound + ")"); long boksBeginTime = System.currentTimeMillis(); boolean accessible = false; if (runningSNMP(ip.getIp())) { //Log.d("RUN", "Host " + host + " has an open SNMP port"); String cs = accessSNMP(); if (cs != null) { accessible = true; boolean foundArp = collectARP(cs);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -