📄 itgrecv.cpp
字号:
paraThread[flowPosition].meter = *(unsigned int *) ptrMeter;
strncpy(paraThread[flowPosition].serial, ptrSerialReceiver, DIM_NAME_SERIAL_INTERFACE);
paraThread[flowPosition].fileLog = fileLog;
paraThread[flowPosition].logSock = logSock;
paraThread[flowPosition].logHost = logHost;
#ifdef DEBUG
printf("Signal Manager : Meter Received : %s\n",
invFindMeter(paraThread[flowPosition].meter));
#endif
switch (flowIdNum[flowPosition].l4Proto) {
case L4_PROTO_UDP :
if ( CREATE_THREAD(¶Thread[flowPosition], udpSock, NULL,
hThr[flowPosition]) < 0)
reportErrorAndExit("typeParser","CREATE_THREAD - type = TSP_SEND_FLOW(3",
"Cannot create thread for udpSock");
#ifdef DEBUG
printf("Return value CREATE_THREAD udpSock hThr[flowPosition] : %d \n",(int)hThr[flowPosition]);
#endif
break;
case L4_PROTO_TCP :
if ( CREATE_THREAD(¶Thread[flowPosition], tcpSock, NULL,
hThr[flowPosition]) < 0)
reportErrorAndExit("typeParser","CREATE_THREAD - type = TSP_SEND_FLOW(3",
"Cannot create thread for tcpSock");
#ifdef DEBUG
printf("Return value CREATE_THREAD tcpSock hThr[flowPosition] : %d \n",(int)hThr[flowPosition]);
#endif
break;
case L4_PROTO_ICMP :
if ( CREATE_THREAD(¶Thread[flowPosition], icmpSock, NULL,
hThr[flowPosition]) < 0)
reportErrorAndExit("typeParser","CREATE_THREAD - type = TSP_SEND_FLOW(3",
"Cannot create thread for icmpSock");
#ifdef DEBUG
printf("Return value CREATE_THREAD icmpSock hThr[flowPosition] : %d \n",(int)hThr[flowPosition]);
#endif
break;
default :
break;
}
#ifdef WIN32
if (setPriority)
if (SetThreadPriority(hThr[flowPosition], THREAD_PRIORITY_TIME_CRITICAL) == 0)
printf("Error - Impossible set priority for thread - %d \n", GetLastError());
#endif
numFlow++;
}
else
reportErrorAndExit("typeParser","flowPosition","Too many thread");
} else if (type == TSP_CLOSED_FLOW) {
#ifdef DEBUG
printf("Signal Manager : Received TSP_CLOSED_FLOW(4) message\n");
#endif
int sizeMsg = sizeof(int);
char buffer[sizeMsg];
char *ptrFlowId = buffer;
if ( recv(newSockSignaling, (char *) buffer, sizeof(buffer), 0) < 0)
reportErrorAndExit("typeParser","recv - type = TSP_CLOSED_FLOW(4)","Cannot receive data");
#ifdef DEBUG
printf("Buffer with TYPE = TSP_CLOSED_FLOW(4) received\n");
#endif
int k = 0;
while (*(int *) ptrFlowId != flowIdNum[k].flowId) {
k++;
}
if ((logCheck == 1) || (logCheck == 2))
flushBuffer((ofstream *) paraThread[k].fileLog, paraThread[k].addressInfos, paraThread[k].count);
else if ((logRemote ==1) || (logRemote == 2)) {
int size = 0;
if ( (size = sendto(logSock, (char *) paraThread[k].addressInfos,
paraThread[k].count * sizeof(struct info), 0,
logHost->ai_addr, logHost->ai_addrlen)) < 0)
reportErrorAndExit("typeParser","sent infos to LOG_SERVER",
"Cannot send infos to logServer");
#ifdef DEBUG
printf("size send %d \n", size);
printf("TCP Pkt Receiver : Sent infos to LogServer\n");
#endif
}
if (logCheck == 2)
closeFileLog((ofstream *)paraThread[k].fileLog);
if (flowIdNum[k].l4Proto == L4_PROTO_ICMP)
printf("Finish ICMP packets!\n");
else {
#ifdef DEBUG
if (flowIdNum[k].l4Proto == L4_PROTO_TCP)
printf("Finish TCP packets!\n");
if (flowIdNum[k].l4Proto == L4_PROTO_UDP)
printf("Finish UDP packets!\n");
#endif
in_port_t tmpPort = 0;
GET_PORT((&(paraThread[k].destHost)), tmpPort);
printf("Finish on port : %d\n",ntohs(tmpPort));
}
sleep(1);
if ( terminateThread(hThr[k]) < 0)
reportErrorAndExit("typeParser","terminateThread - type = 4",
"Cannot terminate thread hThr[k]");
if ( closeSock(paraThread[k].socketClose) < 0)
reportErrorAndExit("typeParser","closeSock - type = 4",
"Cannot close socket socketClose");
free(paraThread[k].addressInfos);
flowIdNum[k].flowId = -1;
numFlow--;
if (sendAckFlow(newSockSignaling, TSP_ACK_CLOSED_FLOW, *(int *) ptrFlowId) < 0)
reportErrorAndExit("typeParser"," - type = TSP_ACK_CLOSED_FLOW(6)",
"Cannot send ack closed flow id on newSockSignaling");
#ifdef DEBUG
printf("Signal Manager : Sent TSP_ACK_CLOSED_FLOW(6) message\n");
printf("Closed Flow %d \n",*(int*)ptrFlowId);
#endif
} else if (type == TSP_CLOSED_ERR) {
#ifdef DEBUG
printf("Signal Manager : Received TSP_CLOSED_ERR(17) message\n");
#endif
int sizeMsg = sizeof(int);
char buffer[sizeMsg];
char *ptrFlowId = buffer;
if ( recv(newSockSignaling, (char *) buffer, sizeof(buffer), 0) < 0)
reportErrorAndExit("typeParser","recv - type = TSP_CLOSED_FLOW(17)","Cannot receive data");
#ifdef DEBUG
printf("Buffer with TYPE = TSP_CLOSED_ERR(17) received\n");
#endif
int k = 0;
while (*(int *) ptrFlowId != flowIdNum[k].flowId) {
k++;
}
if ( terminateThread(hThr[k]) < 0)
reportErrorAndExit("typeParser","terminateThread - type = 4",
"Cannot terminate thread hThr[k]");
if ( closeSock(paraThread[k].socketClose) < 0)
reportErrorAndExit("typeParser","closeSock - type = 4",
"Cannot close socket socketClose");
if (logCheck == 2)
closeFileLog((ofstream *)paraThread[k].fileLog);
free(paraThread[k].addressInfos);
flowIdNum[k].flowId = -1;
numFlow--;
#ifdef DEBUG
printf("Closed Flow %d \n",*(int*)ptrFlowId);
#endif
} else if (type == TSP_RELEASE) {
#ifdef DEBUG
printf("Signal Manager : Received TSP_RELEASE(11) message\n");
#endif
return -1;
} else if (type == TSP_ERR_MSG_1) {
#ifdef DEBUG
printf("Signal Manager : Received TSP_ERR_MSG_1(14) message : authentication not successful\n");
#endif
return -1;
} else if (type == TSP_SENDER_DOWN) {
#ifdef DEBUG
printf("Signal Manager : Received TSP_SENDER_DOWN(21) message\n");
#endif
return -1;
}
return 0;
}
inline void flushBuffer(ofstream *pointerLog, struct info *infos , int count)
{
if ( MUTEX_THREAD_LOCK(mutexLog) < 0)
reportErrorAndExit("flushBuffer","MUTEX_THREAD_LOCK","Cannot lock mutex");
(*pointerLog).write((const char *) infos,count * sizeof(struct info));
if ( MUTEX_THREAD_UNLOCK(mutexLog) < 0)
reportErrorAndExit("flusBuffer","MUTEX_THREAD_UNLOCK","Cannot unlock mutex");
}
void copia(const struct addrinfo* src, struct addrinfo & dst)
{
if (src->ai_family == PF_INET) {
dst.ai_family = src->ai_family;
dst.ai_addrlen = sizeof(struct sockaddr_in);
dst.ai_addr =
(struct sockaddr *) malloc(sizeof(struct sockaddr_in));
((struct sockaddr_in *) dst.ai_addr)->sin_family = AF_INET;
((struct sockaddr_in *) dst.ai_addr)->sin_addr.s_addr =
((struct sockaddr_in *) src->ai_addr)->sin_addr.s_addr;
((struct sockaddr_in *) dst.ai_addr)->sin_port =
((struct sockaddr_in *) src->ai_addr)->sin_port;
}
}
void parserRecv(int argc , char *argv[])
{
while (argc > 0) {
if (argv[0][0] == '-') {
switch (argv[0][1]) {
case 'l':
logCheck = 1;
if ((argc < 2) || (argv[1][0] == '-')) {
strcpy(logFile, DEFAULT_LOG_FILE);
argc -= 1;
argv += 1;
} else {
strcpy(logFile, argv[1]);
argc -= 2;
argv += 2;
}
break;
case 'L':
logRemote = 1;
if ((argc < 2) || (argv[1][0] == '-')) {
argc -= 1;
argv += 1;
} else {
if (findL4Proto(argv[1]) == LX_ERROR_BYTE){
if (globaleLogHost)
freeaddrinfo(globaleLogHost);
if (getaddrinfo(argv[1], NULL, NULL, &globaleLogHost))
reportErrorAndExit("main","general parser","Invalid log-server address or protocol");
argc -= 1;
argv += 1;
} else {
l4ProtoLog = findL4Proto(argv[1]);
argc -= 1;
argv += 1;
}
if ((argc >= 2) && (argv[1][0] != '-') && (findL7Proto(argv[1]) == LX_ERROR_BYTE)) {
if (findL4Proto(argv[1]) == LX_ERROR_BYTE){
if (globaleLogHost)
freeaddrinfo(globaleLogHost);
if (getaddrinfo(argv[1], NULL, NULL, &globaleLogHost))
reportErrorAndExit("main","general parser","Invalid log-server address or protocol");
argc -= 2;
argv += 2;
} else {
l4ProtoLog = findL4Proto(argv[1]);
argc -= 2;
argv += 2;
}
}
else {
argc -= 1;
argv += 1;
}
}
break;
#ifdef WIN32
case 'P':
if (SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS) == 0){
printf("Error - Impossible set priority class - %d \n", GetLastError());
}
#ifdef DEBUG
printf(" Enabled thread priority \n");
#endif
setPriority = true;
argv += 1;
argc -= 1;
break;
#endif
case 'h':
printHelpAndExit();
break;
default:
char* tail = (char *) malloc(sizeof("Unknown option ") + sizeof(argv[0][1]));
if (tail == NULL)
reportErrorAndExit("main","malloc1 - general parser case h",
"Insufficient memory available");
sprintf(tail,"Unknow option : %s",argv[0]);
#ifdef LINUX_OS
printf("Type: ./ITGRecv -h for help\n");
#endif
#ifdef WIN32
printf("Type: ITGRecv.exe -h for help\n");
#endif
reportErrorAndExit("main","general parser",tail);
break;
}
} else {
char* tail = (char *) malloc(sizeof("Unknown option ") + sizeof(argv[0]));
if (tail == NULL)
reportErrorAndExit("main","malloc2 - general parser case h",
"Insufficient memory available");
sprintf(tail,"Unknow option : %s",argv[0]);
#ifdef LINUX_OS
printf("Type: ./ITGRecv -h for help\n");
#endif
#ifdef WIN32
printf("Type: ITGRecv.exe -h for help\n");
#endif
reportErrorAndExit("main","general parser",tail);
}
}
}
int main(int argc, char *argv[])
{
paramThread para[MAX_NUM_THREAD];
pthread_t hThr[MAX_NUM_THREAD];
int newSockSignaling = 0;
struct addrinfo* sockAddress = 0;
signal(SIGINT, terminate);
for (int i = 0; i < MAX_NUM_THREAD; i++) {
hThr[i] = 0;
para[i].flowId = 0;
para[i].count = 0;
para[i].socket = 0;
para[i].socketClose = 0;
memLogFile[i].num = -1;
strcpy(memLogFile[i].logFile, " ");
}
recvInit();
argv++;
argc--;
parserRecv(argc,argv);
if ((logCheck) && (logRemote))
logCheck =0;
if ((logCheck) && (logRemote == 0)) {
out.open(logFile, ios::out | ios::binary | ios::trunc);
if (!out) {
char* tail = (char *) malloc(sizeof("Error into open this file : ") + sizeof(logFile));
if (tail == NULL)
reportErrorAndExit("main","malloc3",
"Insufficient memory available");
sprintf(tail,"Error into open this file : %s",logFile);
reportErrorAndExit("main","open",tail);
}
}
else if (logRemote)
createRemoteLogFile((*globaleLogHost), logFile, l4ProtoLog, globaleLogSockSignaling, globaleLogSock);
printf("Press Ctrl-C to terminate\n");
#ifdef LINUX_OS
if ( getaddrinfo("::", NULL, NULL, &sockAddress) ||
((sockSignaling = socket(sockAddress->ai_family, SOCK_STREAM, 0)) < 0 ) ) {
if (sockAddress) freeaddrinfo(sockAddress);
if ( getaddrinfo("0.0.0.0", NULL, NULL, &sockAddress) ||
((sockSignaling = socket(sockAddress->ai_family, SOCK_STREAM, 0)) < 0 ) ) {
reportErrorAndExit("main","socket","Cannot create a socket for signaling");
}
}
#endif
#ifdef WIN32
#ifdef IPv6RECV
if ( getaddrinfo("::", NULL, NULL, &sockAddress) ||
#else
if ( getaddrinfo("0.0.0.0", NULL, NULL, &sockAddress) ||
#endif
((sockSignaling = socket(sockAddress->ai_family, SOCK_STREAM, 0)) < 0 ) ) {
reportErrorAndExit("main","socket","Cannot create a socket for signaling");
}
#endif
int reuse = 1;
int optlen = sizeof(reuse);
if (setsockopt(sockSignaling, SOL_SOCKET, SO_REUSEADDR, (char*)&reuse, optlen)<0){
reportErrorAndExit("main","socket","Error setting socket option");
}
SET_PORT(sockAddress, htons(DEFAULT_PORT_SIGNALING));
if (bind(sockSignaling, sockAddress->ai_addr, sockAddress->ai_addrlen)) {
char* tail = (char *) malloc(sizeof("Cannot bind a socket on port ") + sizeof(DEFAULT_PORT_SIGNALING) +
sizeof(" for signaling"));
if (tail == NULL)
reportErrorAndExit("main","malloc4 - bind","Insufficient memory available");
sprintf(tail,"Cannot bind a socket on port %d for signaling",DEFAULT_PORT_SIGNALING);
reportErrorAndExit("main","general parser",tail);
}
if (listen(sockSignaling, SOMAXCONN) < 0) {
char* tail = (char *) malloc(sizeof("Cannot listen for signaling connections on port ")
+ sizeof(DEFAULT_PORT_SIGNALING));
if (tail == NULL)
reportErrorAndExit("main","malloc5 - listen","Insufficient memory available");
sprintf(tail,"Cannot listen for signaling connections on port %d",DEFAULT_PORT_SIGNALING);
reportErrorAndExit("main","general parser",tail);
}
int i = 0;
while (1) {
if ((newSockSignaling = accept(sockSignaling, NULL, NULL)) < 0) {
reportErrorAndExit("main","accept","Connection fault on port for signaling");
}
#ifdef WIN32
#ifdef IPv6RECV
printf("*** New Socket IPv6 created for signaling ***\n");
#else
printf("*** New Socket IPv4 created for signaling ***\n");
#endif
#endif
para[i].socket = newSockSignaling;
if ( CREATE_THREAD(¶[i], signalManager, NULL, hThr[i]) < 0)
reportErrorAndExit("main","createThread","Cannot create thread");
#ifdef DEBUG
printf("Return value CREATE_THREAD signalManager hThr[i] : %d \n",(int)hThr[i]);
#endif
i++;
}
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -