📄 packetprocessor.cc
字号:
} else if (fi->type == "Binary") { // insert length first *((unsigned int *) &rmvalues[rmval_len]) = fi->len; rmval_len += sizeof(unsigned int); memcpy(&rmvalues[rmval_len], tmp, fi->len); rmval_len += fi->len; } else { memcpy(&rmvalues[rmval_len], tmp, fi->len); rmval_len += fi->len; } } } if (ra->seppaths) { mvalues[mval_len] = 1; mval_len++; rmvalues[rmval_len] = 1; rmval_len++; } flowInfo_t *fi = ra->flows->getFlow(mvalues, mval_len); if ((fi == NULL) && ra->bidir) { // try reverse match fi = ra->flows->getFlow(rmvalues, rmval_len); if (fi != NULL) { // set backward indication meta->reverse = 1; if (ra->seppaths) { // generate extra flow entry for reverse path rmvalues[rmval_len-1] = 2; fi = ra->flows->getFlow(rmvalues, rmval_len); memcpy(mvalues, rmvalues, rmval_len); mval_len = rmval_len; } } else { // this is the first packet of a new flow if (meta->reverse) { // according to the classifier its in reverse direction -> swap directions memcpy(mvalues, rmvalues, rmval_len); mval_len = rmval_len; }#ifdef SWAP_HACK else if ((meta->layers[2] == T_UDP) || (meta->layers[2] == T_TCP)) { // use heuristic: swap direction if udp/tcp source port is well-known but dst port is not unsigned short src_port = ntohs(*((unsigned short *) &meta->payload[meta->offs[2]])); unsigned short dst_port = ntohs(*((unsigned short *) &meta->payload[meta->offs[2]+2])); if ( (src_port < 1024) && (dst_port >= 1024) ) { memcpy(mvalues, rmvalues, rmval_len); mval_len = rmval_len; // and change direction to backward! meta->reverse = 1; } }#endif } } if (fi == NULL) { // add new flow fi = ra->flows->addFlow(mvalues, mval_len); // initialize proc modules for (ppactionListIter_t i = ra->actions.begin(); i != ra->actions.end(); i++) { ppaction_t a; a.mapi = i->mapi; a.module = i->module; a.params = i->params; a.flowData = NULL; (a.mapi)->initFlowRec(a.params, &a.flowData); fi->actions.push_back(a); } } ra->flows->setLastTime(fi, meta->tv_sec + 1); ra->flowKeyLen = mval_len; acts = &fi->actions; flow = fi; } else { acts = &ra->actions; } int reset = 0; // apply all registered evaluation modules for the specified rule for (ppactionListIter_t i = acts->begin(); i != acts->end(); i++) {#ifdef PROFILING ini = PerfTimer::readTSC();#endif int doExport = (i->mapi)->processPacket((char *)meta->payload, meta, i->flowData); //#if 0 // flow can trigger its immediate export if (doExport == 1) { int size = 0; unsigned char *data = NULL; FlowRecord *frec = new FlowRecord(ruleId, ra->rule->getRuleName(), 1); MetricData *md = new MetricData(i->module->getModName(), i->module->getExportLists(), ra->flowKeyList, 0, NULL, 0, NULL);#ifdef DEBUG log->dlog(ch, "querying processing module '%s' for rule %i", i->module->getModName().c_str(), ruleId);#endif // fetch export data from processing module i->mapi->exportData((void* *)&data, &size, i->flowData); if (ra->auto_flows) { assert(flow != NULL); md->addFlowData(size, data, ra->flowKeyLen, flow->keyData, flow->newFlow, flow->flowId); } else { md->addFlowData(size, data, ra->flowKeyLen, NULL, 0, 0); } // reset module data i->mapi->destroyFlowRec(i->flowData); // add data to flow record frec->addData(md); // get all export modules names (FIXME put this list together before) exportList_t *exps = ra->rule->getExport(); expnames_t enames; for (exportListIter_t e = exps->begin(); e != exps->end(); e++) { enames.insert(e->name); } // give to Exporter if (expt != NULL) { expt->storeData(ruleId, enames, frec); } // flows that export here are reset after all action modules are // executed reset = 1; } //endif #ifdef PROFILING end = PerfTimer::readTSC(); perf->account(MS_MODULE, end - ini);#endif } // #if 0 if (reset == 1) { if (ra->auto_flows) { // delete flow assert(flow != NULL); flow->newFlow = 0; ra->flows->deleteFlow(flow); } else { ra->newFlow = 0; } } // #endif // save time for last packet of this flow (for idle flow detection) ra->lastPkt = meta->tv_sec + 1; // (rounded up) } }#ifdef PROFILING { static int n = 0; if (++n == 100000) { cerr << "process packet in " << perf->latest(MS_MODULE) << "(" << perf->avg(MS_MODULE) << ") ns " << endl; n = 0; } }#endif return 0;}int PacketProcessor::handleFDEvent(eventVec_t *e, fd_set *rset, fd_set *wset, fd_sets_t *fds){ metaData_t *meta; // get next entry from packet queue meta = queue->readBuffer(threaded); if (meta) { processPacket(meta); queue->releaseBuffer(); // restart waiting meter#if ENABLE_THREADS if (threaded && (queue->getUsedBuffers() == 0)) { threadCondSignal(&doneCond); }#endif return 1; } return 0;}void PacketProcessor::main(){ // this function will be run as a single thread inside the packet processor log->log(ch, "PacketProcessor thread running"); for (;;) { handleFDEvent(NULL, NULL,NULL, NULL); }} void PacketProcessor::waitUntilDone(void){#ifdef ENABLE_THREADS AUTOLOCK(threaded, &maccess); if (threaded) { while (queue->getUsedBuffers() > 0) { threadCondWait(&doneCond, &maccess); } }#endif}// This functions triggers the export of all rules into the flow record// if now is > 0 only idle flows are exported and their flow data reset// assert: if now > 0 than at least one flow has timeoutint PacketProcessor::exportRule( FlowRecord *frec, time_t now, unsigned long ival){ int size = 0; unsigned char *data = NULL; ruleActions_t *ra; AUTOLOCK(threaded, &maccess); ra = &rules[frec->getRuleId()]; if (ra->auto_flows) { int cnt = 0, flow = 0; MetricData *md[ra->actions.size()]; // fetch flow data from registered packet processing modules for this rule for (ppactionListIter_t i = ra->actions.begin(); i != ra->actions.end(); i++) { md[cnt] = new MetricData(i->module->getModName(), i->module->getExportLists(), ra->flowKeyList, 0, NULL, 0, NULL); cnt++; } flowListIter_t tmp; flowListIter_t f = ra->flows->getFlows()->begin(); while (f != ra->flows->getFlows()->end()) { cnt = 0; tmp = f; f++; if ((now == 0) || ((time_t)(tmp->second.lastPkt + ival) <= now)) { for (ppactionListIter_t j = tmp->second.actions.begin(); j != tmp->second.actions.end(); ++j) {#ifdef DEBUG log->dlog(ch, "querying processing module '%s' for rule %i and flow %i", j->module->getModName().c_str(), frec->getRuleId(), flow);#endif // fetch export data from processing module j->mapi->exportData((void* *)&data, &size, j->flowData); md[cnt]->addFlowData(size, data, ra->flowKeyLen, tmp->second.keyData, tmp->second.newFlow, tmp->second.flowId); tmp->second.newFlow = 0; if (now > 0) { j->mapi->destroyFlowRec(j->flowData); } cnt++; } if (now > 0) { // delete flow ra->flows->deleteFlow(tmp); } } flow++; } cnt = 0; for (ppactionListIter_t i = ra->actions.begin(); i != ra->actions.end(); i++) { frec->addData(md[cnt]); cnt++; } } else { MetricData *md; // fetch flow data from registered packet processing modules for this rule for (ppactionListIter_t i = ra->actions.begin(); i != ra->actions.end(); i++) {#ifdef DEBUG log->dlog(ch, "querying processing module '%s' for rule %i", i->module->getModName().c_str(), frec->getRuleId());#endif // fetch export data from processing module i->mapi->exportData((void* *)&data, &size, i->flowData); // store export data into flow record container object md = new MetricData(i->module->getModName(), i->module->getExportLists(), NULL, size, data, 0, NULL, ra->newFlow); // if requested: reset intermediate flow data using processing module // and reset flow to idle status if (now > 0) { i->mapi->resetFlowRec(i->flowData); } frec->addData(md); } ra->newFlow = 0; } if ((time_t)(ra->lastPkt + ival) <= now) { ra->lastPkt = 0; ra->flowKeyLen = 0; } return 0;}FlowRecord *PacketProcessor::exportRule(int rid, string rname, time_t now, unsigned long ival){ FlowRecord *f = new FlowRecord(rid, rname); exportRule(f, now, ival); return f;}/* ------------------------- ruleTimeout ------------------------- */// return 0 (if timeout), 1 (stays idle), >1 (active and no timeout yet)unsigned long PacketProcessor::ruleTimeout(int ruleID, unsigned long ival, time_t now){ AUTOLOCK(threaded, &maccess); time_t last = rules[ruleID].lastPkt; if (last > 0) { ruleActions_t *ra = &rules[ruleID]; if (ra->auto_flows) { // has any of the auto flows expired? (this is only accurate to +-1s) flowListIter_t f = ra->flows->getFlows()->begin(); while (f != ra->flows->getFlows()->end()) { if ((time_t)(f->second.lastPkt + ival) <= now) { // expired -> export#ifdef DEBUG log->dlog(ch,"auto flow idle, export: YES");#endif return 0; } f++; } } else { // check if timeout hasn't expired for the rule if ((time_t)(last + ival) > now) { #ifdef DEBUG log->dlog(ch,"flow idle for %d seconds, export: NO", (int)(now-last));#endif return last; } else { #ifdef DEBUG log->dlog(ch, "flow idle for %d seconds, export: YES", (int)(now-last));#endif return 0; } } } return 1;}string PacketProcessor::getInfo(){ ostringstream s; AUTOLOCK(threaded, &maccess); /* FIXME delete? * * uncomment to print out which task uses what modules for (unsigned int j = 0; j<rules.size(); j++) { if (rules[ j ].actions.size()>0) { s << "rule with id #" << j << " uses " << rules[ j ].actions.size() << " actions :"; s << " "; for (ppactionListIter_t i = rules[j].actions.begin(); i != rules[j].actions.end(); i++) { s << (i->module)->getModName(); s << ","; } s << endl; } } */ s << loader->getInfo(); // get the list of loaded modules return s.str();}/* -------------------- addTimerEvents -------------------- */void PacketProcessor::addTimerEvents( int ruleID, int actID, ppaction_t &act, EventScheduler &es ){ timers_t *timers = (act.mapi)->getTimers(act.flowData); if (timers != NULL) { while (timers->flags != TM_END) { es.addEvent(new ProcTimerEvent(ruleID, actID, timers++)); } }}// handle module timeoutsvoid PacketProcessor::timeout(int rid, int actid, unsigned int tmID){ ppaction_t *a; ruleActions_t *ra; AUTOLOCK(threaded, &maccess); ra = &rules[rid]; if (!ra->auto_flows) { a = &ra->actions[actid]; a->mapi->timeout(tmID, a->flowData); } else { for (flowListIter_t f = ra->flows->getFlows()->begin(); f != ra->flows->getFlows()->end(); ++f) { a = &f->second.actions[actid]; a->mapi->timeout(tmID, a->flowData); } }}/* -------------------- getModuleInfoXML -------------------- */string PacketProcessor::getModuleInfoXML( string modname ){ AUTOLOCK(threaded, &maccess); return loader->getModuleInfoXML( modname );}/* ------------------------- dump ------------------------- */void PacketProcessor::dump( ostream &os ){ os << "PacketProcessor dump :" << endl; os << getInfo() << endl;}/* ------------------------- operator<< ------------------------- */ostream& operator<< ( ostream &os, PacketProcessor &pe ){ pe.dump(os); return os;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -