⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 v9_packet.java

📁 接收网络设备上NetFlow工具导出的NetFlow数据
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 *
 */
package cai.flow.packets;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Enumeration;
import java.util.Vector;

import cai.flow.packets.v9.OptionFlow;
import cai.flow.packets.v9.OptionTemplate;
import cai.flow.packets.v9.OptionTemplateManager;
import cai.flow.packets.v9.Template;
import cai.flow.packets.v9.TemplateManager;
import cai.flow.struct.Scheme_DataPrefix;
import cai.sql.SQL;
import cai.utils.*;

/**
 * @author CaiMao V9 Flow Packet UDP包的解析,内部包含多个FlowSet DataSet的流对象
 *
 * -------*---------------*------------------------------------------------------* |
 * Bytes | Contents | Description |
 * -------*---------------*------------------------------------------------------* |
 * 0-1 | version | The version of NetFlow records exported 009 |
 * -------*---------------*------------------------------------------------------* |
 * 2-3 | count | Number of flows exported in this packet (1-30) |
 * -------*---------------*------------------------------------------------------* |
 * 4-7 | SysUptime | Current time in milliseconds since the export device | | | |
 * booted |
 * -------*---------------*------------------------------------------------------* |
 * 8-11 | unix_secs | Current count of seconds since 0000 UTC 1970 |
 * -------*---------------*------------------------------------------------------* |
 * 12-15 |PackageSequence| pk id of all flows |
 * -------*---------------*------------------------------------------------------* |
 * 16-19 | Source ID | engine type+engine id |
 * -------*---------------*------------------------------------------------------* |
 * 20- | others | Unused (zero) bytes |
 * -------*---------------*------------------------------------------------------*
 *
 */
public class V9_Packet implements FlowPacket {
    long count;

    String routerIP;

    long SysUptime, unix_secs, packageSequence;

    long sourceId;

    Vector flows = null;

    Vector optionFlows = null;

    public static final int V9_Header_Size = 20;
    public static void main(String[] args) {
//        for (int i = 0; i < 10; i++) {
//            Runnable run = new Runnable() {
//                public void run() {
                    new V9_Packet("127.0.0.0", new byte[] {}, 0);
//                }
//            };
//            Thread t1 = new Thread(run);
//            Thread t2 = new Thread(run);
//            Thread t3 = new Thread(run);
//            t1.start();
//            t2.start();
//            t3.start();
//        }
    }

    /**
     * 解析UDP包头,把所有的flows解析存储到内存Vector中
     *
     * @param RouterIP
     * @param buf
     * @param len
     * @throws DoneException
     */
    public V9_Packet(String RouterIP, byte[] buf, int len) throws DoneException {
        if (Params.DEBUG) {
            // 仅仅实验
//            File tmpFile = new File("D:\\Dev\\netflow\\jnca\\savePacketT_211.98.0.147_256.cache.tmp");
            File tmpFile = new File(
                    "D:\\Dev\\netflow\\jnca\\savePacketT_211.98.0.147_256.cache.tmp");
            if (tmpFile.exists()) {
                try {
                    ObjectInputStream fIn = new ObjectInputStream(
                            new FileInputStream(tmpFile));
                    System.out.println("Directly read from " + fIn);
                    try {
                        buf = (byte[]) fIn.readObject();
                        len = ((Integer) fIn.readObject()).intValue();
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                    }
                    fIn.close();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    ObjectOutputStream fOut;
                    fOut = new ObjectOutputStream(new FileOutputStream(tmpFile));
                    fOut.writeObject(buf);
                    fOut.writeObject(new Integer(len));
                    fOut.flush();
                    fOut.close();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            // 仅仅实验
        }
        if (len < V9_Header_Size) {
            throw new DoneException("    * incomplete header *");
        }

        this.routerIP = RouterIP;
        count = Util.to_number(buf, 2, 2); // 包括template data flowset数目

        SysUptime = Util.to_number(buf, 4, 4);
        Variation vrat = Variation.getInstance();
        vrat.setVary(Util.convertIPS2Long(RouterIP), SysUptime);
        unix_secs = Util.to_number(buf, 8, 4);
        packageSequence = Util.to_number(buf, 12, 4);
        sourceId = Util.to_number(buf, 16, 4);

        flows = new Vector((int) count * 30); // Let's first make some space
        optionFlows = new Vector();
        // 链表
        long flowsetLength = 0l;
        // 处理flowset的循环
        for (int flowsetCounter = 0, packetOffset = V9_Header_Size;
                flowsetCounter < count
                && packetOffset < len; flowsetCounter++,
                packetOffset += flowsetLength) {
            // 处理flowset内部
            long flowsetId = Util.to_number(buf, packetOffset, 2);
            flowsetLength = Util.to_number(buf, packetOffset + 2, 2);
            if (flowsetLength == 0) {
                throw new DoneException(
                        "there is a flowset len=0,packet invalid");
            }
            if (flowsetId == 0) {
                // template flowset,这里templateid是一个内容,不像data flowset
                // 处理template flowset
                int thisTemplateOffset = packetOffset + 4;
                do {
                    // 定义一个template
                    long templateId = Util
                                      .to_number(buf, thisTemplateOffset, 2);
                    long fieldCount = Util.to_number(buf,
                            thisTemplateOffset + 2, 2);
                    if (TemplateManager.getTemplateManager().getTemplate(
                            this.routerIP, (int) templateId) == null
                        || Params.v9TemplateOverwrite) {
                        try {
                            TemplateManager.getTemplateManager().acceptTemplate(
                                    this.routerIP, buf, thisTemplateOffset);
                        } catch (Exception e) {
                            if (Params.DEBUG) {
                                e.printStackTrace();
                            }
                            if ((e.toString() != null)
                                && (!e.toString().equals(""))) {
                                if (e.toString().startsWith("savePacket")) {
                                    try {
                                        ObjectOutputStream fOut;
                                        fOut = new ObjectOutputStream(new
                                                FileOutputStream("./" +
                                                e.toString() + ".cache.tmp"));
                                        fOut.writeObject(buf);
                                        fOut.writeObject(new Integer(len));
                                        fOut.flush();
                                        fOut.close();
                                        System.err.println("Saved ");
                                    } catch (FileNotFoundException e2) {
                                        e2.printStackTrace();
                                    } catch (IOException e1) {
                                        e1.printStackTrace();
                                    }
                                } else {
                                    System.err.println("An Error without save:" +
                                            e.toString());
                                }
                            }
                        }

                    }
                    thisTemplateOffset += fieldCount * 4 + 4; //这里似乎有问题
                } while (thisTemplateOffset - packetOffset < flowsetLength);
            } else if (flowsetId == 1) { // options flowset
                continue;
//                int thisOptionTemplateOffset = packetOffset + 4;
//                // bypass flowsetID and flowset length
//                do {
//                    // 定义一个template
//                    long optionTemplateId = Util.to_number(buf,
//                            thisOptionTemplateOffset, 2);
//                    long scopeLen = Util.to_number(buf,
//                            thisOptionTemplateOffset + 2, 2);
//                    long optionLen = Util.to_number(buf,
//                            thisOptionTemplateOffset + 4, 2);
//                    if (OptionTemplateManager.getOptionTemplateManager()
//                        .getOptionTemplate(this.routerIP,
//                                           (int) optionTemplateId) == null
//                        || Params.v9TemplateOverwrite) {
//                        OptionTemplateManager.getOptionTemplateManager()
//                                .acceptOptionTemplate(this.routerIP, buf,
//                                thisOptionTemplateOffset);
//                    }
//                    thisOptionTemplateOffset += scopeLen + optionLen + 6;
//                } while (thisOptionTemplateOffset -
//                         packetOffset < flowsetLength);
            } else if (flowsetId > 255) {
                // data flowset
                // templateId==flowsetId
                Template tOfData = TemplateManager.getTemplateManager()
                                   .getTemplate(this.routerIP, (int) flowsetId); // flowsetId==templateId
                if (tOfData != null) {
                    int dataRecordLen = tOfData.getTypeOffset( -1); // 每个流记录的长度
                    // packetOffset+4 让出flowsetId 和 length空间
                    for (int idx = 0, p = packetOffset + 4;
                                          (p - packetOffset + dataRecordLen) <
                                          flowsetLength; //consider padding
                                          idx++, p += dataRecordLen) { //+5 makes OK
                        // 对当前IP网络管理,v9的数据仍然仅仅利用其v5就具有的部分

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -