📄 broker.c
字号:
** that client from the queue (note - only the first). If we don't find an** open then the child is already holding the client socket. In that case** we have to queue up a CLOSE message and increment the ref count for that** socket.*/void brokerCloseClient(sock, sourceChild) int sock, sourceChild;{ ipc_t *curIPC; int curKid, command, state, refCount; mMsg_t message; mMsg_q *curMessage, *prevMessage, *tmpMessage; if (!initialised) _initialiseBroker(); if (clientSockRefCount[sock] > 0) { msqlDebug1(MOD_BROKER, "Ignoring close on client %d. Close already in queue\n", sock); return; } message.command = CMD_CLIENT_CLOSE; message.client = sock; refCount = 0; curKid = 0; while (curKid < numKids) { /* ** Do not process the close for the child that sent ** us the close command */ if (curKid == sourceChild) { curKid++; continue; } /* ** Scan the queue looking for only the first OPEN and DB ** for this client. Keep a track of our state so we ** don't blindly remove messages we shouldn't */ curIPC = ipcInfo + curKid; prevMessage = NULL; curMessage = curIPC->messages_head; state = 0; while(curMessage) { if (curMessage->message.client != sock) { prevMessage = curMessage; curMessage = curMessage->next; continue; } command = curMessage->message.command; if (state == 0 && command != CMD_CLIENT_OPEN) { /* We are looking for an OPEN */ prevMessage = curMessage; curMessage = curMessage->next; continue; } if (state == 1 && command != CMD_CLIENT_DB) { /* We are looking for a DB */ prevMessage = curMessage; curMessage = curMessage->next; continue; } msqlDebug2(MOD_BROKER, "Removing queued %s from kid %d\n", brokerCommandNames[command], curKid); state++; if (prevMessage == NULL) { curIPC->messages_head = curMessage->next; } else { prevMessage->next = curMessage->next; } tmpMessage = curMessage; curMessage = curMessage->next; tmpMessage->next = NULL; if (curIPC->messages_tail == tmpMessage) curIPC->messages_tail = prevMessage; _saveQueueEntry(tmpMessage); curIPC->messages--; msqlDebug2(MOD_BROKER, "Queued length for kid %d is now %d\n", curKid, curIPC->messages); } if (state == 0) { /* ** We didn't find the CLIENT_OPEN so the ** kid has already processed it. We need ** to send it a CLOSE and jack up the ** ref count */ brokerNotifyChild(curKid,CMD_CLIENT_CLOSE, sock, blank, blank, blank, 0, blank); refCount++; } curKid++; } msqlDebug2(MOD_BROKER,"Ref count for client %d set to %d\n", sock, refCount); clientSockRefCount[sock] = refCount; if (refCount == 0) { shutdown(sock,2); close(sock); freeClientConnection(NULL, sock); msqlDebug1(MOD_BROKER, "No pending closes for sock %d. Socket closed\n",sock); }}/*** Public** brokerCloseChildren*/void brokerCloseChildren(){ int count; ipc_t *curIPC; if (!initialised) _initialiseBroker(); for (count = 0; count < numKids; count++) { curIPC = ipcInfo + count; kill(curIPC->pid, SIGQUIT); count++; } while(1) { /* Wait for all the backends to terminate */ if (wait(NULL) < 0) break; }}/*** Public** brokerStartChildren*/int brokerStartChildren(server, socks, argc,argv) msqld *server; fd_set *socks; int argc; char *argv[];{ int count, pid, to[2], from[2], oob[2], max = 0; ipc_t *curIPC; char path[255]; if (!initialised) _initialiseBroker(); sprintf(path,"%s/trace", globalServer->config.instDir); mkdir(path,0770); fprintf(stderr,"\tStarting %d backends\n", numKids); ipcInfo = (ipc_t *)malloc(sizeof(ipc_t) * numKids); bzero(ipcInfo,sizeof(ipc_t) * numKids); count = 0; while(count < numKids) { curIPC = ipcInfo + count; if (socketpair(AF_UNIX, SOCK_STREAM,0,to) < 0) { perror("Failed to create socket pair"); exit(1); } if (socketpair(AF_UNIX, SOCK_STREAM,0,from) < 0) { perror("Failed to create socket pair"); exit(1); } if (socketpair(AF_UNIX, SOCK_STREAM,0,oob) < 0) { perror("Failed to create socket pair"); exit(1); }#ifdef NOTDEF setsockopt(to[0],SOL_SOCKET,SO_PASSCRED,&on, sizeof(on)); setsockopt(to[1],SOL_SOCKET, SO_PASSCRED,&on, sizeof(on)); setsockopt(from[0],SOL_SOCKET, SO_PASSCRED,&on, sizeof(on)); setsockopt(from[1],SOL_SOCKET, SO_PASSCRED,&on, sizeof(on));#endif /* ** DEBUGGING : change dir to a new directory so we ** can catch core dumps and profile outputs for this ** child */ sprintf(path,"%s/trace/%d",globalServer->config.instDir,count); mkdir(path,0770); chdir(path); pid = fork(); if (pid < 0) { perror("Fork failed"); exit(1); } if (pid == 0) { /* Child */ close(0); dup2(to[0],3); close(to[0]); close(to[1]); dup2(from[1],4); close(from[0]); close(from[1]); dup2(oob[0],5); close(oob[0]); close(oob[1]); childStartup(server); fprintf(stderr,"\nChild %d (%d) returned! Bailing\n\n", count, (int)getpid()); exit(1); } else { /* Parent */ curIPC->toSock = to[1]; curIPC->fromSock = from[0]; curIPC->oobSock = oob[1]; curIPC->pid = pid; curIPC->messages = 0; curIPC->jabbed = 0; curIPC->messages_head = NULL; curIPC->messages_tail = NULL; FD_SET(from[1], socks); if (curIPC->toSock > max) max = curIPC->toSock; if (curIPC->fromSock > max) max = curIPC->fromSock; close(to[0]); close(from[1]); } count++; } sprintf(path,"%s/trace/broker",globalServer->config.instDir); mkdir(path,0770); chdir(path); return(max);}/*** Public** brokerAddChildSocket*/void brokerAddChildSockets(fds) fd_set *fds;{ int count; ipc_t *curIPC; if (!initialised) _initialiseBroker(); for (count = 0; count < numKids; count++) { curIPC = ipcInfo + count; FD_SET(curIPC->fromSock, fds); } return;}/*** public** brokerReadChildMessage*/void brokerReadChildMessage(child) int child;{ mMsg_t message; char ack = 0; ipc_t *curIPC; int remain, numBytes; char *cp; if (!initialised) _initialiseBroker(); curIPC = ipcInfo + child; remain = sizeof(message); cp = (char *)&message; while(remain) { numBytes = read(curIPC->fromSock, cp, remain); if (numBytes < 0) { fprintf(stderr, "\nChild termination - shuting down\n\n"); puntServer(-1); exit(1); } cp = cp + numBytes; remain = remain - numBytes; } msqlDebug3(MOD_BROKER,"Got child message from %d pid %d (%s)\n", child, curIPC->pid, brokerCommandNames[message.command]); switch(message.command) { case CMD_CLIENT_CLOSE: puntClient(0); brokerCloseClient(message.client,child); break; case CMD_FLUSH_CACHE: case CMD_CLIENT_DB: brokerNotifyAllChildren(message.command,child, message.client, message.user, message.db, message.table, message.access, blank); break; case CMD_RUN_QUEUE: msqlDebug0(MOD_BROKER,"Sending ACK to child\n"); write(curIPC->oobSock, &ack, 1); brokerRunMessageQueue(child); break; } if (message.command != CMD_RUN_QUEUE) { /* ** We must ack a run_queue straight away otherwise ** we can fill the kernel's message queue to the ** requesting client while it waits for the ack. ** There's no race condition on a run queue so ** there's no problems doing this */ msqlDebug0(MOD_BROKER,"Sending ACK to child\n"); write(curIPC->oobSock, &ack, 1); }}/*** Public** brokerCheckChildren*/void brokerCheckChildren(fds) fd_set *fds;{ int count; ipc_t *curIPC; if (!initialised) _initialiseBroker(); for(count=0; count<numKids; count++) { curIPC = ipcInfo + count; if (FD_ISSET(curIPC->fromSock, fds)) brokerReadChildMessage(count); }}char *brokerGetCommandName(cmd) int cmd;{ return(brokerCommandNames[cmd]);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -