📄 broker.c
字号:
/*** Copyright (c) 1995-2001 Hughes Technologies Pty Ltd. All rights** reserved. **** Terms under which this software may be used or copied are** provided in the specific license associated with this product.**** Hughes Technologies disclaims all warranties with regard to this ** software, including all implied warranties of merchantability and ** fitness, in no event shall Hughes Technologies be liable for any ** special, indirect or consequential damages or any damages whatsoever ** resulting from loss of use, data or profits, whether in an action of ** contract, negligence or other tortious action, arising out of or in ** connection with the use or performance of this software.****** $Id: broker.c,v 1.11 2002/07/17 04:48:39 bambi Exp $***//*** Module : broker : broker.c** Purpose : broker related routines** Exports : ** Depends Upon : *//*** Note on terminology - a child (or kid) is a child process created by** the main broker process and is responsible for handling query** requests. The children are the backend database processes. A client** is an unrelated process that is connecting to us to use the database*//**************************************************************************** STANDARD INCLUDES**************************************************************************/#include <stdio.h>#include <stdlib.h>#include <sys/types.h>#include <common/config.h>#ifdef HAVE_UNISTD_H# include <unistd.h>#endif#ifdef HAVE_STRING_H# include <string.h>#endif#ifdef HAVE_STRINGS_H# include <strings.h>#endif/**************************************************************************** MODULE SPECIFIC INCLUDES**************************************************************************/#include <signal.h>#include <sys/socket.h>#include <sys/stat.h>#include <sys/wait.h>#include "common/msql_defs.h"#include "common/debug/debug.h"#include "common/config/config.h"#include "msqld/broker/filelib.h"#include "msqld/index/index.h"#include "msqld/includes/msqld.h"#include "msqld/main/main.h"#include "msqld/broker/broker.h"#include "msqld/broker/broker_priv.h"/**************************************************************************** GLOBAL VARIABLES**************************************************************************/static ipc_t *ipcInfo;static int clientSockRefCount[255], numSpareQueueEntries = 0, numKids = -1, initialised = 0;static mMsg_q *spareQueueEntries[BROKER_QUEUE_MAX_SPARE];static char blank[] = "";static char *brokerCommandNames[] = { "???", "Client Open", "Client Close", "Flush Cache", "Client DB", "Run Queue", "Queue End", "???" };extern msqld* globalServer;/**************************************************************************** PRIVATE ROUTINES**************************************************************************//*** Private** _initialiseBroker*/static void _initialiseBroker(){ if (initialised) return; numKids = configGetIntEntry("system","num_children"); initialised++;}/*** Private** _getQueueEntry() - Grab a spare queue entry for reuse*/static mMsg_q *_getQueueEntry(){ mMsg_q *new; /* ** If there's a spare return it. Else create one */ if (numSpareQueueEntries > 0) { numSpareQueueEntries--; new = spareQueueEntries[numSpareQueueEntries]; } else { new = (mMsg_q *)malloc(sizeof(mMsg_q)); } new->message.command = new->message.access = new->message.client = 0; *new->message.db = *new->message.table = *new->message.user = *new->message.client_ip = 0; new->next = NULL; new->fd = -1; return(new);}/*** Private** _saveQueueEntry() - Save a now unused queue entry for later use*/static void _saveQueueEntry(entry) mMsg_q *entry;{ /* ** If we can handle more spares, keep the entry. Else free it. */ if (numSpareQueueEntries < BROKER_QUEUE_MAX_SPARE) { spareQueueEntries[numSpareQueueEntries] = entry; numSpareQueueEntries++; } else { free(entry); }}/**************************************************************************** PUBLIC ROUTINES**************************************************************************//*** Public** brokerRunMessageQueue - Send pending messages to kid*//*** We queue up all messages destined for the kids. We don't just write** the message and expect the kernel to handle the queueing. Expecting** the kernel to handle an unknown number of broker messages is not an** acceptable solution as a long running queury on a busy system can** exhaust the kernels queue capacity and cause the broker process to** block on a write - resulting in all further attempts to contact the** the broker to be locked out.*/void brokerRunMessageQueue(child) int child;{ ipc_t *curIPC; mMsg_q *curMessage, *tmpMessage; mMsg_t message; msqlDebug1(MOD_BROKER,"Running broker queue for kid %d\n",child); curIPC = ipcInfo + child; curMessage = curIPC->messages_head; while(curMessage) { /* ** Send the message to the child */ msqlDebug2(MOD_BROKER,"Sending %s to kid %d\n", brokerCommandNames[curMessage->message.command],child); write(curIPC->toSock, &(curMessage->message), sizeof(mMsg_t)); /* ** If it's an open, send the socket as well */ if (curMessage->message.command == CMD_CLIENT_OPEN) { msqlDebug3(MOD_BROKER, "Sending fd %d to kid %d pid %d\n", curMessage->fd,child,curIPC->pid); brokerSendFD(curIPC->toSock, curMessage->fd); } /* ** If it's a close, decrement the client's socket ref ** count. If the ref count hits zero then shut it down */ if (curMessage->message.command == CMD_CLIENT_CLOSE) { clientSockRefCount[curMessage->message.client]--; msqlDebug2(MOD_BROKER, "Ref count for client %d now %d\n", curMessage->message.client, clientSockRefCount[curMessage->message.client]); if(clientSockRefCount[curMessage->message.client] <= 0) { int sock; sock = curMessage->message.client; shutdown(sock,2); close(sock); freeClientConnection(NULL, sock); msqlDebug1(MOD_BROKER, "Client sock %d closed\n",sock); } } tmpMessage = curMessage; curMessage = curMessage->next; tmpMessage->next = NULL; _saveQueueEntry(tmpMessage); } /* ** Clear out the message queue and send an end of queue message */ curIPC->messages = curIPC->jabbed = 0; curIPC->messages_head = NULL; curIPC->messages_tail = NULL; message.command = CMD_QUEUE_END; write(curIPC->toSock, &message, sizeof(mMsg_t)); msqlDebug1(MOD_BROKER,"Sending EndOfQueue to kid %d\n",child);}/*** Public** brokerNotifyChild - Queue a message for a child*/void brokerNotifyChild(kid,command,sock,user,db,table,access,ipAddr) int kid, command, sock; char *user, *db, *table; int access; char *ipAddr;{ ipc_t *curIPC; mMsg_q *new; char ack=0; /* ** Create a message struct */ curIPC = ipcInfo + kid; msqlDebug3(MOD_BROKER,"Queueing %s for kid %d pid %d\n", brokerCommandNames[command],kid,curIPC->pid); new = _getQueueEntry(); new->message.command = command; new->message.client = sock; new->message.access = access; if (user) strncpy(new->message.user,user, NAME_LEN); if (db) strncpy(new->message.db,db, NAME_LEN); if (table) strncpy(new->message.table,table, NAME_LEN); if (ipAddr) strncpy(new->message.client_ip, ipAddr,15); /* ** Add it to the childs message queue */ if (curIPC->messages_head == NULL) { curIPC->messages_head = new; curIPC->messages_tail = curIPC->messages_head; } else { curIPC->messages_tail->next = new; curIPC->messages_tail = curIPC->messages_tail->next; } curIPC->messages_tail->next = NULL; curIPC->messages_tail->fd = -1; if(command == CMD_CLIENT_OPEN) { curIPC->messages_tail->fd = sock; } /* ** If we haven't 'jabbed' the client the do so to ** tell it that it has pending messages */ if (curIPC->jabbed == 0) { msqlDebug2(MOD_BROKER,"Jabbing kid %d pid %d\n", kid,curIPC->pid); write(curIPC->toSock,&ack,1); curIPC->jabbed = 1; } curIPC->messages++; msqlDebug2(MOD_BROKER, "Queued length for kid %d is now %d\n", kid, curIPC->messages);}/*** Public** brokerNotifyAllChildren - Queue a message to all kids*/void brokerNotifyAllChildren(command, exclude, sock, user, db, table, access, ipAddr) int command, exclude, sock; char *user, *db, *table; int access; char *ipAddr;{ int count; /* ** Send this message to all kids expcept the one listed in ** 'exclude' as it is the child that sent it to us in the first ** place. */ msqlDebug1(MOD_BROKER,"Sending %s to all kids\n", brokerCommandNames[command]); for(count=0;count<configGetIntEntry("system","num_children");count++) { if (count == exclude) { msqlDebug1(MOD_BROKER, "Kid %d ignored as it was the message source\n", exclude); continue; } brokerNotifyChild(count,command,sock,user,db, table,access,ipAddr); }}/*** Public** brokerCloseClient - Close a client connection*//*** Note 1:**** We can't just close the socket if a client has dropped it's connection.** If there is an outstanding CLIENT_OPEN for that client pending for** one of the kids then it will fail when we try to send it the FD** (because we closed it). Keep a reference count showing the number of** children that have the socket open and only close it when the ref** count hits 0.**** Note 2:**** If a child is processing a long running query it is possible for may** clients to come and go while it is still busy. This results in a** large number of open sockets in the broker (held open by the** reference count) and a long queue of OPEN/DB/CLOSE message sequences** for the busy child. Rather than build up the queue with useless** messages (the client is gone so why bother telling the kids that it** even existed), scan each childs queue for an OPEN message associated** with the client. If we find an open then we know the child isn't** holding the socket. In that case, remove the first OPEN and DB for
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -