⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scheme_aggregator.java

📁 接收网络设备上NetFlow工具导出的NetFlow数据
💻 JAVA
字号:
package cai.flow.collector;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Hashtable;

import cai.flow.packets.FlowPacket;
import cai.sql.SQL;
import cai.utils.ServiceThread;
import cai.utils.Syslog;
import cai.utils.Util;
/**
 * 所有归并的父类
 * @author CaiMao
 *
 */
public abstract class Scheme_Aggregator extends Hashtable {
	long Start, Stop;

	long interval;

	ServiceThread th;

	SQL sql;

	String scheme;

	protected PreparedStatement add_stm = null;

	protected String add_sql = null;

	public Scheme_Aggregator(SQL sql, String scheme, long interval) {
		this.scheme = scheme;//like "SrcAS"
		this.interval = interval;

		if (interval != 0) {
			add_sql = SQL.resources.getAndTrim("SQL.Add." + scheme);
			add_stm = sql.prepareStatement("准备插入" + scheme
					+ "表", add_sql);
			//入库线程
			th = new ServiceThread(this, Syslog.log, "Scheme " + scheme
					+ " with " + Util.toInterval(interval) + " interval",
					scheme) {
				public void exec() throws Throwable {
					((Scheme_Aggregator) o).save_loop();
				}
			};

			th.start();
		} else
			Syslog.log.syslog(Syslog.LOG_NOTICE, "Scheme " + scheme
					+ " disabled");
	}
	/**
	 * 由子类在add(FlowPacket packet)中调用
	 * @param it
	 */
	public void add(Scheme_Item it) {
		Integer hash = new Integer(it.hashCode());

		if (it.getData().RouterIP == null)
			throw new RuntimeException("it.getData().RouterIP == null for "
					+ it.toString());

		synchronized (this) {
			Object o = get(hash);

			if (o == null)
				put(hash, it);//增加这个scheme item到列表中
			else
				((Scheme_Item) o).add(it);//增加流量数据
		}
	}
	/**
	 * 由collector来的包到达这里
	 * @param packet
	 */
	public abstract void add(FlowPacket packet);

	private void init_times() {
		Start = System.currentTimeMillis() / 1000;
		Stop = Start + interval;
	}
	/**
	 * 入库线程的主循环
	 *
	 */
	public void save_loop() {
		init_times();//像系统时间对齐

		while (true) {
			try {
				long wait = Stop - Start;

				if (wait >= 0)
					Thread.sleep(wait * 1000);

				synchronized (this) {//和add同步
					for (Enumeration f = elements(); f.hasMoreElements();) {
						Scheme_Item item = (Scheme_Item) f.nextElement();

						try {
							//头四个字段是固定的
							add_stm.setDate(1, new java.sql.Date(Start * 1000));
							add_stm.setTime(2, new java.sql.Time(Start * 1000));
							add_stm.setDate(3, new java.sql.Date(Stop * 1000));
							add_stm.setTime(4, new java.sql.Time(Stop * 1000));
							item.fill(add_stm, 5);
							add_stm.executeUpdate();
						} catch (SQLException e) {
							SQL.error_msg("INSERT to " + scheme + " table", e,
									add_sql);
						}
					}

					clear();//包入库以后,清除Hashtable
				}

				init_times();
			} catch (InterruptedException e) {
			}
		}
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -