📄 postmaster.c
字号:
fd_set rmask; struct timeval timeout; /* * The timeout for the select() below is normally set on the basis * of the time to the next checkpoint. However, if for some * reason we don't have a next-checkpoint time, time out after 60 * seconds. This keeps checkpoint scheduling from locking up when * we get new connection requests infrequently (since we are * likely to detect checkpoint completion just after enabling * signals below, after we've already made the decision about how * long to wait this time). */ timeout.tv_sec = 60; timeout.tv_usec = 0; if (CheckPointPID == 0 && checkpointed && Shutdown == NoShutdown && !FatalError && random_seed != 0) { time_t now = time(NULL); if (CheckPointTimeout + checkpointed > now) { /* * Not time for checkpoint yet, so set select timeout */ timeout.tv_sec = CheckPointTimeout + checkpointed - now; } else { /* Time to make the checkpoint... */ CheckPointPID = CheckPointDataBase(); /* * if fork failed, schedule another try at 0.1 normal * delay */ if (CheckPointPID == 0) { timeout.tv_sec = CheckPointTimeout / 10; checkpointed = now + timeout.tv_sec - CheckPointTimeout; } } } /* * Wait for something to happen. */ memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set)); PG_SETMASK(&UnBlockSig); if (select(nSockets, &rmask, (fd_set *) NULL, (fd_set *) NULL, &timeout) < 0) { PG_SETMASK(&BlockSig); if (errno == EINTR || errno == EWOULDBLOCK) continue; ereport(LOG, (errcode_for_socket_access(), errmsg("select() failed in postmaster: %m"))); return STATUS_ERROR; } /* * Block all signals until we wait again. (This makes it safe for * our signal handlers to do nontrivial work.) */ PG_SETMASK(&BlockSig); /* * Select a random seed at the time of first receiving a request. */ while (random_seed == 0) { gettimeofday(&later, &tz); /* * We are not sure how much precision is in tv_usec, so we * swap the nibbles of 'later' and XOR them with 'now'. On the * off chance that the result is 0, we loop until it isn't. */ random_seed = now.tv_usec ^ ((later.tv_usec << 16) | ((later.tv_usec >> 16) & 0xffff)); } /* * New connection pending on any of our sockets? If so, fork a * child process to deal with it. */ for (i = 0; i < MAXLISTEN; i++) { if (ListenSocket[i] == -1) break; if (FD_ISSET(ListenSocket[i], &rmask)) { port = ConnCreate(ListenSocket[i]); if (port) { BackendStartup(port); /* * We no longer need the open socket or port structure * in this process */ StreamClose(port->sock); ConnFree(port); } } } /* If we have lost the stats collector, try to start a new one */ if (!pgstat_is_running) pgstat_start(); }}/* * Initialise the masks for select() for the ports * we are listening on. Return the number of sockets to listen on. */static intinitMasks(fd_set *rmask){ int nsocks = -1; int i; FD_ZERO(rmask); for (i = 0; i < MAXLISTEN; i++) { int fd = ListenSocket[i]; if (fd == -1) break; FD_SET(fd, rmask); if (fd > nsocks) nsocks = fd; } return nsocks + 1;}/* * Read the startup packet and do something according to it. * * Returns STATUS_OK or STATUS_ERROR, or might call ereport(FATAL) and * not return at all. * * (Note that ereport(FATAL) stuff is sent to the client, so only use it * if that's what you want. Return STATUS_ERROR if you don't want to * send anything to the client, which would typically be appropriate * if we detect a communications failure.) */static intProcessStartupPacket(Port *port, bool SSLdone){ enum CAC_state cac; int32 len; void *buf; ProtocolVersion proto; MemoryContext oldcontext; if (pq_getbytes((char *) &len, 4) == EOF) { /* * EOF after SSLdone probably means the client didn't like our * response to NEGOTIATE_SSL_CODE. That's not an error condition, * so don't clutter the log with a complaint. */ if (!SSLdone) ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("incomplete startup packet"))); return STATUS_ERROR; } len = ntohl(len); len -= 4; if (len < (int32) sizeof(ProtocolVersion) || len > MAX_STARTUP_PACKET_LENGTH) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid length of startup packet"))); return STATUS_ERROR; } /* * Allocate at least the size of an old-style startup packet, plus one * extra byte, and make sure all are zeroes. This ensures we will * have null termination of all strings, in both fixed- and * variable-length packet layouts. */ if (len <= (int32) sizeof(StartupPacket)) buf = palloc0(sizeof(StartupPacket) + 1); else buf = palloc0(len + 1); if (pq_getbytes(buf, len) == EOF) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("incomplete startup packet"))); return STATUS_ERROR; } /* * The first field is either a protocol version number or a special * request code. */ port->proto = proto = ntohl(*((ProtocolVersion *) buf)); if (proto == CANCEL_REQUEST_CODE) { processCancelRequest(port, buf); return 127; /* XXX */ } if (proto == NEGOTIATE_SSL_CODE && !SSLdone) { char SSLok;#ifdef USE_SSL /* No SSL when disabled or on Unix sockets */ if (!EnableSSL || IS_AF_UNIX(port->laddr.addr.ss_family)) SSLok = 'N'; else SSLok = 'S'; /* Support for SSL */#else SSLok = 'N'; /* No support for SSL */#endif if (send(port->sock, &SSLok, 1, 0) != 1) { ereport(COMMERROR, (errcode_for_socket_access(), errmsg("failed to send SSL negotiation response: %m"))); return STATUS_ERROR; /* close the connection */ }#ifdef USE_SSL if (SSLok == 'S' && secure_open_server(port) == -1) return STATUS_ERROR;#endif /* regular startup packet, cancel, etc packet should follow... */ /* but not another SSL negotiation request */ return ProcessStartupPacket(port, true); } /* Could add additional special packet types here */ /* * Set FrontendProtocol now so that ereport() knows what format to * send if we fail during startup. */ FrontendProtocol = proto; /* Check we can handle the protocol the frontend is using. */ if (PG_PROTOCOL_MAJOR(proto) < PG_PROTOCOL_MAJOR(PG_PROTOCOL_EARLIEST) || PG_PROTOCOL_MAJOR(proto) > PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST) || (PG_PROTOCOL_MAJOR(proto) == PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST) && PG_PROTOCOL_MINOR(proto) > PG_PROTOCOL_MINOR(PG_PROTOCOL_LATEST))) ereport(FATAL, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("unsupported frontend protocol %u.%u: server supports %u.0 to %u.%u", PG_PROTOCOL_MAJOR(proto), PG_PROTOCOL_MINOR(proto), PG_PROTOCOL_MAJOR(PG_PROTOCOL_EARLIEST), PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST), PG_PROTOCOL_MINOR(PG_PROTOCOL_LATEST)))); /* * Now fetch parameters out of startup packet and save them into the * Port structure. All data structures attached to the Port struct * must be allocated in TopMemoryContext so that they won't disappear * when we pass them to PostgresMain (see BackendFork). We need not * worry about leaking this storage on failure, since we aren't in the * postmaster process anymore. */ oldcontext = MemoryContextSwitchTo(TopMemoryContext); if (PG_PROTOCOL_MAJOR(proto) >= 3) { int32 offset = sizeof(ProtocolVersion); /* * Scan packet body for name/option pairs. We can assume any * string beginning within the packet body is null-terminated, * thanks to zeroing extra byte above. */ port->guc_options = NIL; while (offset < len) { char *nameptr = ((char *) buf) + offset; int32 valoffset; char *valptr; if (*nameptr == '\0') break; /* found packet terminator */ valoffset = offset + strlen(nameptr) + 1; if (valoffset >= len) break; /* missing value, will complain below */ valptr = ((char *) buf) + valoffset; if (strcmp(nameptr, "database") == 0) port->database_name = pstrdup(valptr); else if (strcmp(nameptr, "user") == 0) port->user_name = pstrdup(valptr); else if (strcmp(nameptr, "options") == 0) port->cmdline_options = pstrdup(valptr); else { /* Assume it's a generic GUC option */ port->guc_options = lappend(port->guc_options, pstrdup(nameptr)); port->guc_options = lappend(port->guc_options, pstrdup(valptr)); } offset = valoffset + strlen(valptr) + 1; } /* * If we didn't find a packet terminator exactly at the end of the * given packet length, complain. */ if (offset != len - 1) ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid startup packet layout: expected terminator as last byte"))); } else { /* * Get the parameters from the old-style, fixed-width-fields * startup packet as C strings. The packet destination was * cleared first so a short packet has zeros silently added. We * have to be prepared to truncate the pstrdup result for oversize * fields, though. */ StartupPacket *packet = (StartupPacket *) buf; port->database_name = pstrdup(packet->database); if (strlen(port->database_name) > sizeof(packet->database)) port->database_name[sizeof(packet->database)] = '\0'; port->user_name = pstrdup(packet->user); if (strlen(port->user_name) > sizeof(packet->user)) port->user_name[sizeof(packet->user)] = '\0'; port->cmdline_options = pstrdup(packet->options); if (strlen(port->cmdline_options) > sizeof(packet->options)) port->cmdline_options[sizeof(packet->options)] = '\0'; port->guc_options = NIL; } /* Check a user name was given. */ if (port->user_name == NULL || port->user_name[0] == '\0') ereport(FATAL, (errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION), errmsg("no PostgreSQL user name specified in startup packet"))); /* The database defaults to the user name. */ if (port->database_name == NULL || port->database_name[0] == '\0') port->database_name = pstrdup(port->user_name); if (Db_user_namespace) { /* * If user@, it is a global user, remove '@'. We only want to do * this if there is an '@' at the end and no earlier in the user * string or they may fake as a local user of another database * attaching to this database. */ if (strchr(port->user_name, '@') == port->user_name + strlen(port->user_name) - 1) *strchr(port->user_name, '@') = '\0'; else { /* Append '@' and dbname */ char *db_user; db_user = palloc(strlen(port->user_name) + strlen(port->database_name) + 2); sprintf(db_user, "%s@%s", port->user_name, port->database_name); port->user_name = db_user; } } /* * Truncate given database and user names to length of a Postgres * name. This avoids lookup failures when overlength names are given. */ if (strlen(port->database_name) >= NAMEDATALEN) port->database_name[NAMEDATALEN - 1] = '\0'; if (strlen(port->user_name) >= NAMEDATALEN) port->user_name[NAMEDATALEN - 1] = '\0'; /* * Done putting stuff in TopMemoryContext. */ MemoryContextSwitchTo(oldcontext); /* * If we're going to reject the connection due to database state, say * so now instead of wasting cycles on an authentication exchange. * (This also allows a pg_ping utility to be written.) */ cac = canAcceptConnections(); switch (cac) { case CAC_STARTUP: ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is starting up"))); break; case CAC_SHUTDOWN: ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is shutting down"))); break; case CAC_RECOVERY: ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is in recovery mode"))); break; case CAC_TOOMANY: ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already"))); break; case CAC_OK: default: break; } return STATUS_OK;}/* * The client has sent a cancel request packet, not a normal * start-a-new-connection packet. Perform the necessary processing. * Nothing is sent back to the client. */static voidprocessCancelRequest(Port *port, void *pkt){ CancelRequestPacket *canc = (CancelRequestPacket *) pkt; int backendPID; long cancelAuthCode; Dlelem *curr; Backend *bp; backendPID = (int) ntohl(canc->backendPID); cancelAuthCode = (long) ntohl(canc->cancelAuthCode); if (backendPID == CheckPointPID) { ereport(DEBUG2, (errmsg_internal("ignoring cancel request for checkpoint process %d", backendPID))); return; } else if (ExecBackend) AttachSharedMemoryAndSemaphores(); /* See if we have a matching backend */ for (curr = DLGetHead(BackendList); curr; curr = DLGetSucc(curr)) { bp = (Backend *) DLE_VAL(curr); if (bp->pid == backendPID) { if (bp->cancel_key == cancelAuthCode) { /* Found a match; signal that backend to cancel current op */ ereport(DEBUG2, (errmsg_internal("processing cancel request: sending SIGINT to process %d", backendPID))); kill(bp->pid, SIGINT); } else /* Right PID, wrong key: no way, Jose */ ereport(DEBUG2, (errmsg_internal("bad key in cancel request for process %d", backendPID))); return; } } /* No matching backend */ ereport(DEBUG2, (errmsg_internal("bad pid in cancel request for process %d", backendPID)));}/* * canAcceptConnections --- check to see if database state allows connections. */static enum CAC_statecanAcceptConnections(void){ /* Can't start backends when in startup/shutdown/recovery state. */ if (Shutdown > NoShutdown)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -