📄 sql_pool.c
字号:
/* * 实现了 mysql 连接管理,先申请者先获得连接,一个连接某一时刻只能被一个线程使用。 * * 实现机制:mysql连接队列 和 申请连接队列。队列中空闲的连接会每隔一段时间刷新一 * 次,防止mysqld断开连接。 * * 申请连接:检查连接队列,有空闲连接则返回,否则加入申请连接队列末尾。 * * 释放连接:检查申请连接队列,有申请唤醒,否则连接队列中对应的连接置为空闲状态。 * * 连接管理将被预分配线程池程序用到,各队列数据结构应该用共享存储。 * */#include <stdio.h>#include <string.h>#include <sys/ipc.h>#include <sys/shm.h>#include <sys/types.h>#include <unistd.h>#include <semaphore.h>#include <mysql/mysql.h>#include "sql_pool.h"int num;struct queue_mem *queue_ptr;struct sql_struct *sql_pool_head;struct wait_struct *wait_pool_head;/* * 刷新连接池。 */int sql_flush(){ int i; char *tmp = ""; struct sql_struct *ptr_s; for (i = 0, ptr_s = sql_pool_head; i < num; i++, ptr_s++) { sem_wait(&(ptr_s->sem)); if (ptr_s->status == IDLE) { printf("i = %d is IDLE\n", i); mysql_real_query(&(ptr_s->sql), tmp, strlen(tmp)); } else { printf("i = %d is BUSY\n", i); } sem_post(&(ptr_s->sem)); } printf("in = %d out = %d\n", queue_ptr->in - wait_pool_head, queue_ptr->out - wait_pool_head); return 0;}/* * 创建mysql连接队列及申请连接队列 */int init_sql_pool(void *args){ int i; pid_t pid; struct sql_struct *ptr_s; struct wait_struct *ptr_w; int shmid1, shmid2, shmid3; num = ((struct sql_par *)args)->num; if (num <= 0) return -1; // mysql连接池,共享内存。 shmid1 = shmget(IPC_PRIVATE, num * sizeof(struct sql_struct), 0600 | IPC_CREAT); if (shmid1 < 0) return -1; ptr_s = sql_pool_head = shmat(shmid1, NULL, 0); if (ptr_s == NULL) return -1; for (i = 0; i < num; i++, ptr_s++) { mysql_init(&(ptr_s->sql)); mysql_real_connect(&(ptr_s->sql), ((struct sql_par *)args)->host, "root", "", "", 0, NULL, 0); ptr_s->status = IDLE; //连接空闲 sem_init(&(ptr_s->sem), 1, 1); } // 这样所有的进程退出后将自动清空 shmctl(shmid1, IPC_RMID, 0); // 等待队列,共享内存。 shmid2 = shmget(IPC_PRIVATE, MAX_WAIT_NUM * sizeof(struct wait_struct), 0600 | IPC_CREAT); if (shmid2 < 0) return -1; ptr_w = wait_pool_head = shmat(shmid2, NULL, 0); if (ptr_w == NULL) return -1; for (i = 0; i < MAX_WAIT_NUM; i++, ptr_w++) { ptr_w->ptr = NULL; sem_init(&(ptr_w->sem), 1, 1); } shmctl(shmid2, IPC_RMID, 0); shmid3 = shmget(IPC_PRIVATE, sizeof(struct queue_mem), 0600 | IPC_CREAT); if (shmid3 < 0) return -1; queue_ptr = shmat(shmid3, NULL, 0); queue_ptr->in = wait_pool_head; queue_ptr->out = wait_pool_head; sem_init(&(queue_ptr->sem_in), 1, 1); sem_init(&(queue_ptr->sem_out), 1, 1); shmctl(shmid3, IPC_RMID, 0); printf("in = %d out = %d\n", queue_ptr->in - wait_pool_head, queue_ptr->out - wait_pool_head); pid = fork(); if (pid < 0) { return -1; } else if (pid == 0) { // 子进程,刷新空闲连接。 while (1) { sleep(10); sql_flush(); } } return 0;}/* * 从连接池获取一个连接,若无连接阻塞,直到其他地方归还连接。 */MYSQL *get_sql_connection(){ int i; int sval; MYSQL *sql; struct sql_struct *ptr_s; struct wait_struct *tmp; // 从连接池中获得未使用连接。 for (i = 0, ptr_s = sql_pool_head; i < num; i++, ptr_s++) { sem_wait(&(ptr_s->sem)); if (ptr_s->status == IDLE) { ptr_s->status = BUSY; sem_post(&(ptr_s->sem)); return &(ptr_s->sql); // 返回未被使用连接。 } sem_post(&(ptr_s->sem)); } // 连接池被用完,将在申请队列上睡眠。 sem_wait(&(queue_ptr->sem_in)); if (queue_ptr->in >= wait_pool_head + MAX_WAIT_NUM) queue_ptr->in = wait_pool_head; sem_getvalue(&(queue_ptr->in->sem), &sval); if (sval == 1) { sem_wait(&(queue_ptr->in->sem)); // 加锁 tmp = queue_ptr->in; queue_ptr->in++; printf("in = %d \n", queue_ptr->in - wait_pool_head); tmp->ptr = NULL; sem_wait(&(tmp->sem)); // 在此睡眠,等待其它地方唤醒。 printf("in..... = %d \n", queue_ptr->in - wait_pool_head); sql = &(tmp->ptr->sql); sem_post(&(queue_ptr->sem_in)); return sql; } sem_post(&(queue_ptr->sem_in)); // 申请连接队列已满。 return NULL;}/* * 释放连接到连接池 */int ret_sql_connection(MYSQL *sql){ int sval; struct sql_struct *ptr_s; ptr_s = (struct sql_struct *)sql; // 唤醒睡眠的申请。 sem_wait(&(queue_ptr->sem_out)); if (queue_ptr->out >= wait_pool_head + MAX_WAIT_NUM) queue_ptr->out = wait_pool_head; sem_getvalue(&(queue_ptr->out->sem), &sval); if (sval == 0) { queue_ptr->out->ptr = ptr_s; sem_post(&(queue_ptr->out->sem)); // 唤醒! sem_post(&(queue_ptr->out->sem)); // 解锁 queue_ptr->out++; sem_post(&(queue_ptr->sem_out)); return 0; } sem_post(&(queue_ptr->sem_out)); // 释放连接到连接池。 sem_wait(&(ptr_s->sem)); ptr_s->status = IDLE; sem_post(&(ptr_s->sem)); return 0;}/* * 释放所有的mysql连接 */int free_sql_pool(){ int i; struct sql_struct *ptr; for (i = 0, ptr = sql_pool_head; i < num; i++, ptr++) mysql_close(&(ptr->sql)); printf("free sql pool\n"); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -