📄 helper.c
字号:
#include "squid.h"#define HELPER_MAX_ARGS 64static PF helperHandleRead;static PF helperServerFree;static void Enqueue(helper * hlp, helper_request *);static helper_request *Dequeue(helper * hlp);static helper_server *GetFirstAvailable(helper * hlp);static void helperDispatch(helper_server * srv, helper_request * r);static void helperKickQueue(helper * hlp);static void helperRequestFree(helper_request * r);voidhelperOpenServers(helper * hlp){ char *s; char *progname; char *shortname; char *procname; char *args[HELPER_MAX_ARGS]; char fd_note_buf[FD_DESC_SZ]; helper_server *srv; int nargs = 0; int k; int x; int rfd; int wfd; wordlist *w; if (hlp->cmdline == NULL) return; progname = hlp->cmdline->key; if ((s = strrchr(progname, '/'))) shortname = xstrdup(s + 1); else shortname = xstrdup(progname); debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n", hlp->n_to_start, shortname); procname = xmalloc(strlen(shortname) + 3); snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); args[nargs++] = procname; for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) args[nargs++] = w->key; args[nargs++] = NULL; assert(nargs <= HELPER_MAX_ARGS); for (k = 0; k < hlp->n_to_start; k++) { getCurrentTime(); rfd = wfd = -1; x = ipcCreate(hlp->ipc_type, progname, args, shortname, &rfd, &wfd); if (x < 0) { debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname); continue; } hlp->n_running++; srv = memAllocate(MEM_HELPER_SERVER); cbdataAdd(srv, memFree, MEM_HELPER_SERVER); srv->flags.alive = 1; srv->index = k; srv->rfd = rfd; srv->wfd = wfd; srv->buf = memAllocate(MEM_8K_BUF); srv->buf_sz = 8192; srv->offset = 0; srv->parent = hlp; cbdataLock(hlp); /* lock because of the parent backlink */ dlinkAddTail(srv, &srv->link, &hlp->servers); if (rfd == wfd) { snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); fd_note(rfd, fd_note_buf); } else { snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1); fd_note(rfd, fd_note_buf); snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1); fd_note(wfd, fd_note_buf); } commSetNonBlocking(rfd); if (wfd != rfd) commSetNonBlocking(wfd); comm_add_close_handler(rfd, helperServerFree, srv); } safe_free(shortname); safe_free(procname); helperKickQueue(hlp);}voidhelperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data){ helper_request *r = memAllocate(MEM_HELPER_REQUEST); helper_server *srv; if (hlp == NULL) { debug(29, 3) ("helperSubmit: hlp == NULL\n"); callback(data, NULL); return; } r->callback = callback; r->data = data; r->buf = xstrdup(buf); cbdataLock(r->data); if ((srv = GetFirstAvailable(hlp))) helperDispatch(srv, r); else Enqueue(hlp, r);}voidhelperStats(StoreEntry * sentry, helper * hlp){ helper_server *srv; dlink_node *link; double tt; storeAppendPrintf(sentry, "number running: %d of %d\n", hlp->n_running, hlp->n_to_start); storeAppendPrintf(sentry, "requests sent: %d\n", hlp->stats.requests); storeAppendPrintf(sentry, "replies received: %d\n", hlp->stats.replies); storeAppendPrintf(sentry, "queue length: %d\n", hlp->stats.queue_size); storeAppendPrintf(sentry, "avg service time: %d msec\n", hlp->stats.avg_svc_time); storeAppendPrintf(sentry, "\n"); storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n", "#", "FD", "# Requests", "Flags", "Time", "Offset"); for (link = hlp->servers.head; link; link = link->next) { srv = link->data; tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time); storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n", srv->index + 1, srv->rfd, srv->stats.uses, srv->flags.alive ? 'A' : ' ', srv->flags.busy ? 'B' : ' ', srv->flags.closing ? 'C' : ' ', srv->flags.shutdown ? 'S' : ' ', tt < 0.0 ? 0.0 : tt, (int) srv->offset); } storeAppendPrintf(sentry, "\nFlags key:\n\n"); storeAppendPrintf(sentry, " A = ALIVE\n"); storeAppendPrintf(sentry, " B = BUSY\n"); storeAppendPrintf(sentry, " C = CLOSING\n"); storeAppendPrintf(sentry, " S = SHUTDOWN\n");}voidhelperShutdown(helper * hlp){ dlink_node *link = hlp->servers.head; helper_server *srv; while (link) { srv = link->data; link = link->next; if (!srv->flags.alive) { debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n", hlp->id_name, srv->index + 1); continue; } srv->flags.shutdown = 1; /* request it to shut itself down */ if (srv->flags.busy) { debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n", hlp->id_name, srv->index + 1); continue; } if (srv->flags.closing) { debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n", hlp->id_name, srv->index + 1); continue; } srv->flags.closing = 1; comm_close(srv->rfd); }}helper *helperCreate(const char *name){ helper *hlp = memAllocate(MEM_HELPER); cbdataAdd(hlp, memFree, MEM_HELPER); hlp->id_name = name; return hlp;}voidhelperFree(helper * hlp){ /* note, don't free hlp->name, it probably points to static memory */ if (hlp->queue.head) debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n", hlp->id_name, hlp->stats.queue_size); cbdataFree(hlp);}/* ====================================================================== *//* LOCAL FUNCTIONS *//* ====================================================================== */static voidhelperServerFree(int fd, void *data){ helper_server *srv = data; helper *hlp = srv->parent; helper_request *r; assert(srv->rfd == fd); if (srv->buf) { memFree(srv->buf, MEM_8K_BUF); srv->buf = NULL; } if ((r = srv->request)) { if (cbdataValid(r->data)) r->callback(r->data, srv->buf); helperRequestFree(r); srv->request = NULL; } if (srv->wfd != srv->rfd) comm_close(srv->wfd); dlinkDelete(&srv->link, &hlp->servers); hlp->n_running--; assert(hlp->n_running >= 0); if (!srv->flags.shutdown) { debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n", hlp->id_name, srv->index + 1, fd); assert(hlp->n_running >= hlp->n_to_start / 2); if (hlp->n_running < hlp->n_to_start / 2) fatalf("Too few %s processes are running", hlp->id_name); } cbdataUnlock(srv->parent); cbdataFree(srv);}static voidhelperHandleRead(int fd, void *data){ int len; char *t = NULL; helper_server *srv = data; helper_request *r; helper *hlp = srv->parent; assert(fd == srv->rfd); assert(cbdataValid(data)); Counter.syscalls.sock.reads++; len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset); fd_bytes(fd, len, FD_READ); debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n", len, hlp->id_name, srv->index + 1); if (len <= 0) { if (len < 0) debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror()); comm_close(fd); return; } srv->offset += len; srv->buf[srv->offset] = '\0'; r = srv->request; if (r == NULL) { /* someone spoke without being spoken to */ debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, len); srv->offset = 0; } else if ((t = strchr(srv->buf, '\n'))) { /* end of reply found */ debug(29, 3) ("helperHandleRead: end of reply found\n"); *t = '\0'; if (cbdataValid(r->data)) r->callback(r->data, srv->buf); srv->flags.busy = 0; srv->offset = 0; helperRequestFree(r); srv->request = NULL; hlp->stats.replies++; hlp->stats.avg_svc_time = intAverage(hlp->stats.avg_svc_time, tvSubMsec(srv->dispatch_time, current_time), hlp->stats.replies, REDIRECT_AV_FACTOR); if (srv->flags.shutdown) comm_close(srv->wfd); else helperKickQueue(hlp); } else { commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0); }}static voidEnqueue(helper * hlp, helper_request * r){ dlink_node *link = memAllocate(MEM_DLINK_NODE); dlinkAddTail(r, link, &hlp->queue); hlp->stats.queue_size++; if (hlp->stats.queue_size < hlp->n_running) return; if (squid_curtime - hlp->last_queue_warn < 600) return; if (shutting_down || reconfiguring) return; hlp->last_queue_warn = squid_curtime; debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name); debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size); if (hlp->stats.queue_size > hlp->n_running * 2) fatalf("Too many queued %s requests", hlp->id_name); debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);}static helper_request *Dequeue(helper * hlp){ dlink_node *link; helper_request *r = NULL; if ((link = hlp->queue.head)) { r = link->data; dlinkDelete(link, &hlp->queue); memFree(link, MEM_DLINK_NODE); hlp->stats.queue_size--; } return r;}static helper_server *GetFirstAvailable(helper * hlp){ dlink_node *n; helper_server *srv = NULL; if (hlp->n_running == 0) return NULL; for (n = hlp->servers.head; n != NULL; n = n->next) { srv = n->data; if (srv->flags.busy) continue; if (!srv->flags.alive) continue; return srv; } return NULL;}static voidhelperDispatch(helper_server * srv, helper_request * r){ helper *hlp = srv->parent; if (!cbdataValid(r->data)) { debug(29, 1) ("helperDispatch: invalid callback data\n"); helperRequestFree(r); return; } assert(!srv->flags.busy); srv->flags.busy = 1; srv->request = r; srv->dispatch_time = current_time; comm_write(srv->wfd, r->buf, strlen(r->buf), NULL, /* Handler */ NULL, /* Handler-data */ NULL); /* free */ commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0); debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, strlen(r->buf)); srv->stats.uses++; hlp->stats.requests++;}static voidhelperKickQueue(helper * hlp){ helper_request *r; helper_server *srv; while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp))) helperDispatch(srv, r);}static voidhelperRequestFree(helper_request * r){ cbdataUnlock(r->data); xfree(r->buf); memFree(r, MEM_HELPER_REQUEST);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -