📄 main_broker.c
字号:
} if (IPsock >= 0) { shutdown(IPsock,2); close(IPsock); } if (UNIXsock >= 0) { shutdown(UNIXsock,2); close(UNIXsock); unlink(unixPort); free(unixPort); } /* ** Close off the children */ sync(); fprintf(stderr,"Closing backend connections.\n\n"); brokerCloseChildren(); printf("\n"); abort();}void setupSignals(){#ifdef SIGSEGV signal(SIGSEGV,sigTrap);#endif#ifdef SIGBUS signal(SIGBUS,sigTrap);#endif#ifdef SIGINT signal(SIGINT,puntServer);#endif#ifdef SIGQUIT signal(SIGQUIT,puntServer);#endif#ifdef SIGKILL signal(SIGKILL,puntServer);#endif#ifdef SIGPIPE signal(SIGPIPE,puntClient);#endif#ifdef SIGTERM signal(SIGTERM,puntServer);#endif#ifdef SIGHUP signal(SIGHUP,SIG_IGN);#endif#ifdef SIGCHLD signal(SIGCHLD,childHandler);#endif}void childStartup(server) msqld *server;{ mQuery_t *query, dummyQuery; int count, clientCommand, remain, numBytes, offset, comSock, endOfQueue, haveLock, clientsProcessed; char dbname[255], *arg, *arg2, *cp, go, tmpChar; fd_set readFDs, clientFDs; mMsg_t message; struct timeval timeout; /* Ensure there's no wierd FD's open */ for ( count = BROKER_OOB_FD + 1; count<=255; count++) { close(count); } /* Try to set the locale but don't bail if it fails (freeBSD) */ setlocale(LC_ALL, ""); /* ** Initialise the connection array */ count = 0; while (count < 255) { conArray[count].sock = -1; conArray[count].db = NULL; count++; } /* ** OK, on with the show */ bzero(&dummyQuery, sizeof(dummyQuery)); strcpy(PROGNAME,"child"); yytext = NULL; umask(0077); setupSignals(); aclLoadFile(0); (void)bzero(&readFDs,sizeof(fd_set)); msqlDebug0(MOD_GENERAL, "miniSQL debug mode. Waiting for connections.\n"); FD_ZERO(&clientFDs); while(1) { /* ** Before we waste cycles getting the IPC lock (which ** we only need if we select() on the client connection ** sockets), check to see if there's anything waiting ** on the broker socket (which is dedicated to us and ** we can do with as we please). */ haveLock = 0; FD_ZERO(&readFDs); FD_SET(BROKER_FROM_FD,&readFDs); timeout.tv_sec = 0; timeout.tv_usec = 0; if(select(255,&readFDs,0,0,&timeout) == 1) { /* ** The broker has jabbed us. Run the queue ** without worrying about locking for access ** to the client sockets. We won't touch them ** this time. */ msqlDebug0(MOD_GENERAL, "Found something on the broker sock\n"); } else { /* ** OK, a poll of the broker socket didn't get ** us anywhere. Grab the lock and check out all ** the descriptors */ bcopy(&clientFDs, &readFDs, sizeof(readFDs)); FD_SET(BROKER_FROM_FD,&readFDs); lockGetIpcLock(server); haveLock = 1; if(select(255,&readFDs,0,0,0) < 0) { if (errno == EINTR) continue; puntServer(0); } msqlDebug0(MOD_GENERAL,"Select broken\n"); } /* ** Is there something in the broker's queue? */ if (FD_ISSET(BROKER_FROM_FD,&readFDs)) { /* ** Hmmm, something came in while we were ** sleeping in the select. ** ** The broker has jabbed us. Drop the lock, ** read the jab byte, and ask it for any ** queued messages. */ if (haveLock) lockReleaseIpcLock(server); read(BROKER_FROM_FD,&go,1); _zeroMessageStruct(&message); message.command = CMD_RUN_QUEUE; message.client = 0; brokerChildSendMessage(&message); endOfQueue = 0; while(endOfQueue == 0) { remain = sizeof(mMsg_t); offset = 0; while(remain) { cp = ((char *)&message) + offset; numBytes = read(BROKER_FROM_FD,cp, remain); if (numBytes <= 0) puntServer(SIGQUIT); remain -= numBytes; offset += numBytes; } switch(message.command) { case CMD_QUEUE_END: msqlDebug0(MOD_BROKER, "Processing Broker QUEUE_END\n"); endOfQueue = 1; break; case CMD_CLIENT_OPEN: comSock = brokerRecvFD(BROKER_FROM_FD); if (comSock < 1) { puntServer(0); } msqlDebug0(MOD_BROKER, "Processing Broker CLIENT_OPEN\n"); msqlDebug2(MOD_BROKER, "Got new connection (%d on %d)\n", message.client, comSock); FD_SET(comSock, &clientFDs); conArray[message.client].sock = comSock; conArray[message.client].user = (char *)strdup(message.user); conArray[message.client].connectTime = time(NULL); conCount++; numCons++; break; case CMD_FLUSH_CACHE: msqlDebug2(MOD_BROKER, "Processing Broker FLUSH(%s.%s)\n", message.db, message.table); if (*message.table) cacheInvalidateTable(server, message.db, message.table); else cacheInvalidateDatabase( server, message.db); break; case CMD_CLIENT_DB: msqlDebug0(MOD_BROKER, "Processing Broker CLIENT_DB\n"); if(conArray[message.client].db) free(conArray[message.client].db); conArray[message.client].db = strdup(message.db); conArray[message.client].access = message.access; break; case CMD_CLIENT_CLOSE: count = message.client; comSock = conArray[count].sock; msqlDebug2(MOD_BROKER, "Processing Broker CLOSE (%d on %d)\n", count, comSock); if (comSock < 0) { msqlDebug2(MOD_BROKER, "Close ignored (%d on %d)\n", count, comSock); break; } FD_CLR(comSock,&clientFDs); shutdown(comSock,2); close(comSock); conArray[count].sock = -1; if(conArray[count].user) { free(conArray[count].user); conArray[count].user = NULL; } if(conArray[count].host) { free(conArray[count].host); conArray[count].host = NULL; } if(conArray[count].db) { free(conArray[count].db); conArray[count].db = NULL; } numCons--; break; } } continue; } /* ** Must be coming from an active client */ count = 0; clientsProcessed = 0; while(count <= 255) { if (conArray[count].sock == -1) { count++; continue; } if (!FD_ISSET(conArray[count].sock,&readFDs)) { count++; continue; } /* Got one ! */ clientsProcessed++; comSock = conArray[count].sock; if (netReadPacket(comSock) <= 0) { msqlDebug1(MOD_GENERAL, "Command read on sock %d failed!\n", comSock); clientCommand = QUIT; } else { clientCommand = atoi(packet); } msqlDebug3(MOD_GENERAL,"Command on sock %d = %d (%s)\n", comSock, clientCommand, comTable[clientCommand]); /* ** with it. if (clientCommand != QUIT && clientCommand != INIT_DB) lockReleaseIpcLock(server); */ switch(clientCommand) { case QUIT: msqlDebug0(MOD_GENERAL,"DB QUIT!\n"); FD_CLR(comSock,&clientFDs); shutdown(comSock,2); close(comSock); if(conArray[count].user) { free(conArray[count].user); conArray[count].user = NULL; } if(conArray[count].host) { free(conArray[count].host); conArray[count].host = NULL; } if(conArray[count].db) { free(conArray[count].db); conArray[count].db = NULL; } conArray[count].sock = -1; /* Send broker update request */ _zeroMessageStruct(&message); message.command = CMD_CLIENT_CLOSE; message.client = count; brokerChildSendMessage(&message); break; case INIT_DB: cp=(char *)strtok(packet+2,"\n\r"); if (!cp) { netError(comSock,NO_DB_ERROR); break; } strcpy(dbname,cp); msqlDebug1(MOD_GENERAL,"DBName = %s\n", dbname); conArray[count].access = aclCheckAccess( dbname, conArray + count); if(conArray[count].access == NO_ACCESS) { netError(comSock,ACCESS_DENIED_ERROR); break; } if(conArray[count].db) { free(conArray[count].db); conArray[count].db = NULL; } conArray[count].db = (char *)strdup(dbname); if (utilCheckDB(server, dbname) < 0) { netError(comSock,errMsg); break; } /* Send broker update */ _zeroMessageStruct(&message); message.command = CMD_CLIENT_DB; message.access = conArray[count].access; message.client = count; strcpy(message.db,dbname); brokerChildSendMessage(&message); netOK(comSock); break; case QUERY: if (!conArray[count].db) { netError(comSock,NO_DB_ERROR); break; } curSock = comSock; cp=(char *)(packet+2); arg = (char *)strdup(cp); tmpChar = 0; if (strlen(arg) > 4096) { tmpChar = *(arg+4096); *(arg+4096) = 0; } msqlDebug1(MOD_QUERY,"Query = %s",arg); if (tmpChar) *(arg+4096) = tmpChar; aclSetPerms(conArray[count].access); query = parseQuery(server, arg,comSock, conArray[count].user, conArray[count].db); if (query) { processQuery(server,query,count,arg); parseCleanQuery(query); } if(arg) free(arg); /* cacheSyncCache(); */ break; case EXPLAIN: if (!conArray[count].db) { netError(comSock,NO_DB_ERROR); break; } curSock = comSock; cp=(char *)(packet+2); while(*cp != ':') cp++; arg = (char *)strdup(cp + 1); tmpChar = 0; if (strlen(arg) > 4096) { tmpChar = *(arg+4096); *(arg+4096) = 0; } msqlDebug1(MOD_QUERY,"Explain = %s",arg); if (tmpChar) *(arg+4096) = tmpChar; aclSetPerms(conArray[count].access); query = parseQuery(server, arg,comSock, conArray[count].user, conArray[count].db); if (query) { query->explainOnly = 1; processQuery(server,query,count,arg); parseCleanQuery(query); } if(arg) free(arg); /* cacheSyncCache(); */ break; case DB_LIST: curSock = comSock; processListDBs(server, comSock); break; case SEQ_INFO: if (!conArray[count].db) { netError(comSock,NO_DB_ERROR); break; } cp = (char *)index(packet,':'); cp=(char *)strtok(cp+1, "\n\r"); arg = (char *)strdup(cp); curSock = comSock; processSequenceInfo(server, comSock, arg,conArray[count].db); if(arg) free(arg); break; case TABLE_LIST: if (!conArray[count].db) { netError(comSock, NO_DB_ERROR); break; } curSock = comSock; processListTables(server, comSock, conArray[count].db); break; case FIELD_LIST: if (!conArray[count].db) { netError(comSock,NO_DB_ERROR); break; } cp=(char *)strtok(packet+2, "\n\r"); arg = (char *)strdup(cp); curSock = comSock; processListFields(server, comSock, arg, conArray[count].db); if(arg) free(arg); break; case INDEX_LIST: if (!conArray[count].db) { netError(comSock,NO_DB_ERROR); break; } cp=(char *)strtok(packet+2, ":\n\r"); arg = (char *)strdup(cp); cp=(char *)strtok(NULL,"\n\r"); arg2 = (char *)strdup(cp); curSock = comSock; processListIndex(server, comSock, arg2, arg, conArray[count].db); if(arg) free(arg); if(arg2) free(arg2); break; case EXPORT: if (!conArray[count].db) { netError(comSock,NO_DB_ERROR); break; } cp=(char *)strtok(packet+2, ":\n\r"); arg = (char *)strdup(cp); cp=(char *)strtok(NULL,":\n\r"); arg2 = (char *)strdup(cp); curSock = comSock; dummyQuery.curUser = conArray[count].user; dummyQuery.curDB = conArray[count].db; processExportTable(server,comSock, conArray[count].db, arg,arg2, &dummyQuery); if(arg) free(arg); if(arg2) free(arg2); break; case CREATE_DB: if (!aclCheckLocal(conArray + count)) { netError(comSock,PERM_DENIED_ERROR); break; } cp=(char *)strtok(packet+2, "\n\r"); arg = (char *)strdup(cp); processCreateDB(server, comSock,arg); if(arg) free(arg); break; case COPY_DB: if (!aclCheckLocal(conArray + count)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -