⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 discoverythread.java

📁 Network Administration Visualized 网络管理可视化源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
import java.io.*;import java.util.*;import java.util.regex.*;import java.net.*;import java.text.*;import java.sql.*;import no.ntnu.nav.logger.*;import no.ntnu.nav.ConfigParser.*;import no.ntnu.nav.Database.*;import no.ntnu.nav.SimpleSnmp.*;import no.ntnu.nav.event.*;import no.ntnu.nav.netboxinfo.*;import no.ntnu.nav.util.*;/** * This class schedules the netboxes, assigns them to threads and runs * the plugins. */public class DiscoveryThread extends Thread{	private static ConfigParser navCp;	private static ConfigParser myCp;	private static Timer timer;	private static Timer updateFromARPTimer;	private static CheckRunQTask checkRunQTask;	private static SortedMap ipRunQ;	private static Stack idleThreads;	private static int maxThreadCnt;	private static int threadCnt;	private static Integer idleThreadLock = new Integer(0);	private static Integer insertNetboxLock = new Integer(0);	private static Integer updateMacLock = new Integer(0);	private static Integer checkRouterLock = new Integer(0);	private static Integer fileLock = new Integer(0);	private static LinkedList threadIdList = new LinkedList();	private static int curMaxTid = 0;	private static int netboxCnt;	private static int netboxHigh;	private static long nbProcessedCnt;	private static List csList = new ArrayList();	private static int baseCheckInterval;	private static int maxMacChecks;	private static double checkMultiplier;	private static boolean ipScanMode;	private static LinkedList nextIpQueue = new LinkedList();	private static Set seenMacs = Collections.synchronizedSet(new HashSet());	private static Set macsInQueue = Collections.synchronizedSet(new HashSet());	private static int numFound = 0;	private static String allNetboxesBulkFile = "all-netboxes.bulk";	private static String newNetboxesBulkFile = "new-netboxes.bulk";	private static Set netboxesInAllFile = Collections.synchronizedSet(new HashSet());	private static Set netboxesInNewFile = Collections.synchronizedSet(new HashSet());	private static String DEFAULT_ROOMID = "dummy";	private static String DEFAULT_ORGID = "dummy";	private static String DEFAULT_LOCATIONID = "dummy";	private static boolean autoInsertNetbox = false;	// Object data	String tid;	IP ip;	SimpleSnmp sSnmp;	// Static init	public static void init(int numThreads, ConfigParser _myCp, ConfigParser _navCp) {		maxThreadCnt = numThreads;		myCp = _myCp;		navCp = _navCp;		myCp.setObject("navCp", navCp);		// Create the netbox map and the run queue		ipRunQ = new TreeMap();		// Read config		String cs = myCp.get("communitys");		String[] csA = cs.split(",");		for (int i=0; i < csA.length; i++) {			csList.add(csA[i].trim());		}		baseCheckInterval = Integer.parseInt(myCp.get("baseCheckInterval"));		maxMacChecks = Integer.parseInt(myCp.get("maxMacChecks"));		checkMultiplier = Double.parseDouble(myCp.get("checkMultiplier"));		timer = new Timer();		ipScanMode = true;		if (myCp.get("allNetboxesBulkFile") != null) allNetboxesBulkFile = myCp.get("allNetboxesBulkFile");		if (myCp.get("newNetboxesBulkFile") != null) newNetboxesBulkFile = myCp.get("newNetboxesBulkFile");		if (myCp.get("autoInsertNetbox") != null) {			try {				autoInsertNetbox = myCp.get("autoInsertNetbox").startsWith("t");			} catch (Exception e) {			}		}		parseBulkFile(allNetboxesBulkFile, netboxesInAllFile);		parseBulkFile(newNetboxesBulkFile, netboxesInNewFile);		// For dupe checking		loadSeenMacs();		// Fill the runq from db		fillRunQ();		updateFromARPTimer = new Timer();		long updateFromARPFrequency = 1800;		try {			updateFromARPFrequency = Integer.parseInt(myCp.get("updateFromARPFrequency"));		} catch (Exception e) {		}		updateFromARPFrequency *= 1000;		updateFromARPTimer.schedule(new UpdateFromARPTask(), updateFromARPFrequency, updateFromARPFrequency);		Log.d("INIT", "Starting timer for IP query scheduling");		scheduleCheckRunQ(0);	}	private static void loadSeenMacs() {		try {			ResultSet rs = Database.query("SELECT mac FROM autodisc_mac_scanned WHERE attempts IS NULL");			while (rs.next()) {				seenMacs.add(rs.getString("mac"));			}		} catch (SQLException e) {			e.printStackTrace(System.err);		}	}	private static void updateFromARP() {		if (true) return;		try {			// All macs we haven't tried before			ResultSet rs = Database.query("SELECT DISTINCT ip, mac FROM arp WHERE end_time='infinity' AND mac NOT IN (SELECT mac FROM autodisc_mac_scanned)");			while (rs.next()) {				IP ip = new IP(rs.getString("ip"), rs.getString("mac"));				addToRunQ(ip);			}		} catch (SQLException e) {			e.printStackTrace(System.err);		}	}	private static void fillRunQ() {		updateFromARP();		try {			// All macs we haven't given up on			ResultSet rs = Database.query("SELECT ip, autodisc_mac_scanned.mac, time, attempts FROM autodisc_mac_scanned JOIN arp ON (autodisc_mac_scanned.mac = arp.mac AND arp.end_time='infinity') WHERE attempts < " + maxMacChecks);			while (rs.next()) {				IP ip = new IP(rs.getString("ip"), rs.getString("mac"), rs.getLong("time"), rs.getInt("attempts"));				addToRunQ(ip);			}		} catch (SQLException e) {			e.printStackTrace(System.err);		}	}	private static void parseBulkFile(String fn, Set nbIpSet) {		try {			char sep = File.separatorChar;			File f = new File(navCp.get("NAVROOT") + sep + "var" + sep + "log" + sep + fn);			if (!f.exists()) return;			BufferedReader in = new BufferedReader(new FileReader(f));			String s;			while ( (s=in.readLine()) != null) {				String[] g = s.split(":");				if (g.length >= 2) {					nbIpSet.add(g[1]);				}			}		} catch (Exception e) {			e.printStackTrace(System.err);		}	}	private void writeBulkFile(String fn, String ip, String catid, String ro, String dns, String sysname, String sysobjectid, String sysdescr) {		synchronized (fileLock) {			try {				char sep = File.separatorChar;				File f = new File(navCp.get("NAVROOT") + sep + "var" + sep + "log" + sep + fn);				PrintStream out = new PrintStream(new FileOutputStream(f, true), true);						// # dnsnavn, sysobjectid, system.sysdescr (avkappet om den er lang)				String descr = "# " + dns + " ("+sysname+", " + ro + "), " + sysobjectid  + ", " + sysdescr.substring(0, Math.min(70, sysdescr.length()));				out.println(descr);				// #roomid:ip:orgid:catid:[ro:serial:rw:function:subcat1:subcat2..]				String s = DEFAULT_ROOMID + ":" + ip + ":" + DEFAULT_ORGID + ":" + catid + ":" + ro;				out.println(s);				out.close();			} catch (Exception e) {				e.printStackTrace(System.err);			}		}	}	private static IP getNextIp() {		synchronized (nextIpQueue) {			if (nextIpQueue.isEmpty()) {				// Init local IPs				try {					Enumeration e = NetworkInterface.getNetworkInterfaces();					while(e.hasMoreElements()) {						NetworkInterface netface = (NetworkInterface)e.nextElement();						Enumeration e2 = netface.getInetAddresses();						while (e2.hasMoreElements()){							InetAddress ip = (InetAddress) e2.nextElement();							if (ip instanceof Inet4Address && !ip.isLoopbackAddress()) {								long ipnum = ipToNum(ip.getHostAddress());								ipnum = ipToNum("129.241.190.1");								nextIpQueue.add(new String[] { numToIp(ipnum), "u" } );								nextIpQueue.add(new String[] { numToIp(ipnum-1), "d" } );							}						}					}				} catch (SocketException e) {					e.printStackTrace(System.err);				}				if (nextIpQueue.isEmpty()) return null;			}			// Fetch next IP from queue			String[] s = (String[])nextIpQueue.removeFirst();			String ipaddr = s[0];			long ipnum = ipToNum(ipaddr);			if (s[1].equals("u")) ipnum++;			else ipnum--;			nextIpQueue.add(new String[] { numToIp(ipnum), s[1] } );			return new IP(ipaddr, null);						}	}	private static void scheduleCheckRunQ(long l)	{		synchronized (timer) {			if (checkRunQTask != null) checkRunQTask.cancel();			checkRunQTask = new CheckRunQTask();			// The delay can actually be negative due to inaccuracy in the Java timer			l = Math.max(l, 0);			Log.d("QUERY_NETBOX", "SCHEDULE_CHECK_RUN_Q", "Schedule check runq in " + l + " ms");			try {				timer.schedule(checkRunQTask, l);			} catch (IllegalStateException e) {				timer = new Timer();				checkRunQTask = new CheckRunQTask();				timer.schedule(checkRunQTask, l);			}		}	}	private static void checkRunQ()	{		Log.setDefaultSubsystem("QUERY_IP");		// Try to get a free IP to check		Object o;		while ((o = removeRunQHead()) instanceof IP) {			IP ip = (IP)o;			Log.d("CHECK_RUN_Q", "Got ip: " + ip);			// Try to get a free thread			String tid = requestThread();			if (tid == null) {				Log.d("CHECK_RUN_Q", "IP is available, but no threads are idle");				// Re-insert into queue				addToRunQFront(ip);				return;			}			// OK, start a new thread			Log.d("CHECK_RUN_Q", "Starting new IP thread with id: " + tid + " to handle " + ip);			new DiscoveryThread(tid, ip).start();		} 		// No more IPs, schedule re-run when the next is ready		Long nextRun = (Long)o;		Log.d("CHECK_RUN_Q", "No available IP, scheduling next check in " + nextRun + " ms (" + (nextRun.longValue()/1000) + " s)");		scheduleCheckRunQ(nextRun.longValue());	}	private static void addToRunQ(IP ip) {		addToRunQ(ip, false);	}	private static void addToRunQFront(IP ip) {		addToRunQ(ip, true);	}	private static void addToRunQ(IP ip, boolean front) {		Long nextRun = new Long(ip.getNextRun());		synchronized (ipRunQ) {			if (ip.getMac() != null) {				// MAC should only be in queue once				if (macsInQueue.contains(ip.getMac())) return;			} 			LinkedList l;			if ( (l = (LinkedList)ipRunQ.get(nextRun)) == null) ipRunQ.put(nextRun, l = new LinkedList());			if (front) {				l.addFirst(ip);			} else {				l.add(ip);			}		}		if (ip.getMac() != null) {			seenMacs.add(ip.getMac());		}	}	private static Object removeRunQHead() {		Object o;		if ((o = removeRunQHeadNoCheck()) instanceof IP) {			IP ip = (IP)o;			return ip;		}		return o;	}	private static int ipRunQSize() {		synchronized (ipRunQ) {			return ipRunQ.size();		}	}	private static Object removeRunQHeadNoCheck() {		synchronized (ipRunQ) {			if (ipRunQ.isEmpty()) {				if (ipScanMode) {					return getNextIp();				}				return new Long(Long.MAX_VALUE / 2); // Infinity...			}			Long nextRun = (Long)ipRunQ.firstKey();			if (nextRun.longValue() > System.currentTimeMillis()) {				// Head of queue is not yet ready to be run				return new Long(nextRun.longValue() - System.currentTimeMillis());			}			LinkedList l = (LinkedList)ipRunQ.get(nextRun);			IP ip  = (IP)l.removeFirst();			if (ip.getMac() != null) {				macsInQueue.remove(ip.getMac());			}			if (l.isEmpty()) ipRunQ.remove(nextRun);			return ip;		}	}	private static long ipToNum(String ip) {		if (ip.length() == 0) return 0;		String[] s = ip.split("\\.");		long l = 0;		long mul = 1;		for (int i=0; i < s.length-1; i++) mul *= 255;		for (int i=0; i < s.length; i++) {			l += Integer.parseInt(s[i]) * mul;			mul /= 255;		}		return l;	}	private static String numToIp(long num) {		if (num == 0) return "";		String s = "";		long mul = 1;		int dig=0;		while (mul < num) mul *= 255;		while (num > 0) {			long l = num / mul;			if (num == l*mul && dig < 3) l--; // IPs must be 4 digits			if (l > 0) {				s += l + ".";				dig++;			}			num -= l * mul;			mul /= 255;		}		return s.substring(0, s.length()-1);	}	private static String requestThread() {		synchronized (idleThreadLock) {			if (threadCnt < maxThreadCnt) {				int tid = getFreeTid();				threadCnt++;				//System.err.println("New thread, cnt="+(threadCnt+1)+" max="+max);				return format(tid, String.valueOf(maxThreadCnt-1).length());			}			return null;		}	}	private static void incNumFound() {		synchronized (idleThreadLock) {			numFound++;		}	}	private static void threadIdle(String tid) {		synchronized (idleThreadLock) {			returnTid(Integer.parseInt(tid));			//System.err.println("Del thread, cnt="+(threadCnt-1));			threadCnt--;		}		scheduleCheckRunQ(0);	}	private static int getFreeTid() {		if (threadIdList.isEmpty()) {			threadIdList.add(new Integer(curMaxTid++));		}		return ((Integer)threadIdList.removeFirst()).intValue();	}	private static void returnTid(int tid) {		threadIdList.add(new Integer(tid));	}	// Constructor	public DiscoveryThread(String tid, IP ip)	{		this.tid = tid;		this.ip = ip;	}	public boolean runningSNMP(String host) {		try {			//Log.d("RUNNING_SNMP", "Checking " + host + " for open SNMP port");			InetAddress inet = InetAddress.getByName(host);			DatagramSocket s = new DatagramSocket();			s.setSoTimeout(100);			try {				s.connect(inet, 161);				DatagramPacket p = new DatagramPacket(new byte[0], 0);				s.send(p);				s.receive(p);			} catch (SocketTimeoutException e) {				return true;			} finally {				s.close();			}		} catch (Exception e) {			// We get java.net.PortUnreachableException: ICMP Port Unreachable if no SNMP is running		}		return false;	}	public String accessSNMP() {		sSnmp.setSocketTimeout(100);		for (Iterator it = csList.iterator(); it.hasNext();) {			String cs = (String) it.next();			sSnmp.setCs_ro(cs);			//Log.d("ACCESS_SNMP", "Trying SNMP with cs " + cs);			try {				sSnmp.setTimeoutLimit(1);				sSnmp.getNext("1", 1, false, true);				// Ok!				return cs;			} catch (Exception e) {			}		}		return null;	}	public void run()	{		Log.setDefaultSubsystem("QUERY_NETBOX_T"+tid);		Log.setThreadId(tid);		Log.d("RUN", "Thread " + tid + " starting work on ("+ip+")");		long beginTime = System.currentTimeMillis();		try {			while (true) {				String host = ip.getIp();				Log.setNetbox(ip.getIp());				sSnmp = SimpleSnmp.simpleSnmpFactory();				sSnmp.setHost(ip.getIp());				Log.d("RUN", "Testing ("+ip.getIp()+"): " + ip.getMac() + " (IPs in queue: " + ipRunQSize() + " Num found: " + numFound + ")");				long boksBeginTime = System.currentTimeMillis();				boolean accessible = false;				if (runningSNMP(ip.getIp())) {					//Log.d("RUN", "Host " + host + " has an open SNMP port");					String cs = accessSNMP();					if (cs != null) {						accessible = true;						boolean foundArp = collectARP(cs);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -