⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 broker.c

📁 uClinux下用的数据库
💻 C
📖 第 1 页 / 共 2 页
字号:
** that client from the queue (note - only the first).  If we don't find an** open then the child is already holding the client socket.  In that case** we have to queue up a CLOSE message and increment the ref count for that** socket.*/void brokerCloseClient(sock, sourceChild)	int	sock,		sourceChild;{	ipc_t	*curIPC;	int	curKid,		command,		state,		refCount;	mMsg_t	message;	mMsg_q	*curMessage,		*prevMessage,		*tmpMessage;	if (!initialised)		_initialiseBroker();	if (clientSockRefCount[sock] > 0)	{		msqlDebug1(MOD_BROKER,		    "Ignoring close on client %d.  Close already in queue\n",		    sock);		return;	}	message.command = CMD_CLIENT_CLOSE;	message.client = sock;	refCount = 0;	curKid = 0;	while (curKid < numKids)	{		/*		** Do not process the close for the child that sent		** us the close command		*/		if (curKid == sourceChild)		{			curKid++;			continue;		}		/*		** Scan the queue looking for only the first OPEN and DB		** for this client.  Keep a track of our state so we		** don't blindly remove messages we shouldn't		*/		curIPC = ipcInfo + curKid;		prevMessage = NULL;		curMessage = curIPC->messages_head;		state = 0;		while(curMessage)		{			if (curMessage->message.client != sock)			{				prevMessage = curMessage;				curMessage = curMessage->next;				continue;			}			command = curMessage->message.command;			if (state == 0 && command != CMD_CLIENT_OPEN)			{				/* We are looking for an OPEN */				prevMessage = curMessage;				curMessage = curMessage->next;				continue;			}			if (state == 1 && command != CMD_CLIENT_DB)			{				/* We are looking for a DB */				prevMessage = curMessage;				curMessage = curMessage->next;				continue;			}			msqlDebug2(MOD_BROKER,				"Removing queued %s from kid %d\n",				brokerCommandNames[command], curKid);			state++;			if (prevMessage == NULL)			{				curIPC->messages_head = curMessage->next;			}			else			{				prevMessage->next = curMessage->next;			}			tmpMessage = curMessage;			curMessage = curMessage->next;			tmpMessage->next = NULL;			if (curIPC->messages_tail == tmpMessage)				curIPC->messages_tail = prevMessage;			_saveQueueEntry(tmpMessage);			curIPC->messages--;			msqlDebug2(MOD_BROKER,				"Queued length for kid %d is now %d\n",				curKid, curIPC->messages);		}		if (state == 0)		{			/*			** We didn't find the CLIENT_OPEN so the			** kid has already processed it.  We need			** to send it a CLOSE and jack up the			** ref count			*/			brokerNotifyChild(curKid,CMD_CLIENT_CLOSE,				sock, blank, blank, blank, 0, blank);			refCount++;		}		curKid++;	}	msqlDebug2(MOD_BROKER,"Ref count for client %d set to %d\n",		sock, refCount);	clientSockRefCount[sock] = refCount;	if (refCount == 0)	{		shutdown(sock,2);		close(sock);		freeClientConnection(NULL, sock);		msqlDebug1(MOD_BROKER,			"No pending closes for sock %d. Socket closed\n",sock);	}}/*** Public** brokerCloseChildren*/void brokerCloseChildren(){	int	count;	ipc_t	*curIPC;	if (!initialised)		_initialiseBroker();	for (count = 0; count < numKids; count++)	{                curIPC = ipcInfo + count;                kill(curIPC->pid, SIGQUIT);                count++;	}        while(1)        {                /* Wait for all the backends to terminate */                if (wait(NULL) < 0)                        break;        }}/*** Public** brokerStartChildren*/int brokerStartChildren(server, socks, argc,argv)	msqld	*server;	fd_set	*socks;	int	argc;	char	*argv[];{	int	count,		pid,		to[2],		from[2],		oob[2],		max = 0;	ipc_t	*curIPC;	char	path[255];	if (!initialised)		_initialiseBroker();	sprintf(path,"%s/trace", globalServer->config.instDir);	mkdir(path,0770);	fprintf(stderr,"\tStarting %d backends\n", numKids);	ipcInfo = (ipc_t *)malloc(sizeof(ipc_t) * numKids);	bzero(ipcInfo,sizeof(ipc_t) * numKids);	count = 0;	while(count < numKids)	{		curIPC = ipcInfo + count;		if (socketpair(AF_UNIX, SOCK_STREAM,0,to) < 0)		{			perror("Failed to create socket pair");			exit(1);		}		if (socketpair(AF_UNIX, SOCK_STREAM,0,from) < 0)		{			perror("Failed to create socket pair");			exit(1);		}		if (socketpair(AF_UNIX, SOCK_STREAM,0,oob) < 0)		{			perror("Failed to create socket pair");			exit(1);		}#ifdef NOTDEF		setsockopt(to[0],SOL_SOCKET,SO_PASSCRED,&on, sizeof(on));		setsockopt(to[1],SOL_SOCKET, SO_PASSCRED,&on, sizeof(on));		setsockopt(from[0],SOL_SOCKET, SO_PASSCRED,&on, sizeof(on));		setsockopt(from[1],SOL_SOCKET, SO_PASSCRED,&on, sizeof(on));#endif		/*		** DEBUGGING : change dir to a new directory so we		** can catch core dumps and profile outputs for this		** child		*/		sprintf(path,"%s/trace/%d",globalServer->config.instDir,count);		mkdir(path,0770);		chdir(path);		pid = fork();		if (pid < 0)		{			perror("Fork failed");			exit(1);		}		if (pid == 0)		{			/* Child */			close(0);			dup2(to[0],3);			close(to[0]);			close(to[1]);			dup2(from[1],4);			close(from[0]);			close(from[1]);			dup2(oob[0],5);			close(oob[0]);			close(oob[1]);			childStartup(server);			fprintf(stderr,"\nChild %d (%d) returned!  Bailing\n\n",				count, (int)getpid());			exit(1);		}		else		{			/* Parent */			curIPC->toSock = to[1];			curIPC->fromSock = from[0];			curIPC->oobSock = oob[1];			curIPC->pid = pid;			curIPC->messages = 0;			curIPC->jabbed = 0;			curIPC->messages_head = NULL;			curIPC->messages_tail = NULL;			FD_SET(from[1], socks);			if (curIPC->toSock > max)				max = curIPC->toSock;			if (curIPC->fromSock > max)				max = curIPC->fromSock;			close(to[0]);			close(from[1]);		}		count++;	}	sprintf(path,"%s/trace/broker",globalServer->config.instDir);	mkdir(path,0770);	chdir(path);	return(max);}/*** Public** brokerAddChildSocket*/void brokerAddChildSockets(fds)	fd_set	*fds;{	int	count;	ipc_t	*curIPC;	if (!initialised)		_initialiseBroker();	for (count = 0; count < numKids; count++)	{		curIPC = ipcInfo + count;		FD_SET(curIPC->fromSock, fds);	}	return;}/*** public** brokerReadChildMessage*/void brokerReadChildMessage(child)	int	child;{	mMsg_t	message;	char	ack = 0;	ipc_t	*curIPC;	int	remain,		numBytes;	char	*cp;	if (!initialised)		_initialiseBroker();	curIPC = ipcInfo + child;	remain = sizeof(message);	cp = (char *)&message;	while(remain)	{		numBytes = read(curIPC->fromSock, cp, remain);		if (numBytes < 0)		{			fprintf(stderr,				"\nChild termination - shuting down\n\n");			puntServer(-1);			exit(1);		}		cp = cp + numBytes;		remain = remain - numBytes;	}		msqlDebug3(MOD_BROKER,"Got child message from %d pid %d (%s)\n",		child, curIPC->pid, brokerCommandNames[message.command]);	switch(message.command)	{		case CMD_CLIENT_CLOSE:			puntClient(0);			brokerCloseClient(message.client,child);			break;		case CMD_FLUSH_CACHE:		case CMD_CLIENT_DB:			brokerNotifyAllChildren(message.command,child,				message.client, message.user, message.db, 				message.table, message.access, blank);			break;		case CMD_RUN_QUEUE:			msqlDebug0(MOD_BROKER,"Sending ACK to child\n");			write(curIPC->oobSock, &ack, 1);			brokerRunMessageQueue(child);			break;	}	if (message.command != CMD_RUN_QUEUE)	{		/* 		** We must ack a run_queue straight away otherwise		** we can fill the kernel's message queue to the		** requesting client while it waits for the ack.		** There's no race condition on a run queue so		** there's no problems doing this		*/		msqlDebug0(MOD_BROKER,"Sending ACK to child\n");		write(curIPC->oobSock, &ack, 1);	}}/*** Public** brokerCheckChildren*/void brokerCheckChildren(fds)	fd_set	*fds;{	int	count;	ipc_t	*curIPC;	if (!initialised)		_initialiseBroker();	for(count=0; count<numKids; count++)	{		curIPC = ipcInfo + count;		if (FD_ISSET(curIPC->fromSock, fds))			brokerReadChildMessage(count);	}}char *brokerGetCommandName(cmd)	int	cmd;{	return(brokerCommandNames[cmd]);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -