📄 mq.c
字号:
return false; } else { return bufferWriteComplete(outgoing); }}/** Writes packet bytes to peer. */static void peerProxyWriteBytes(PeerProxy* peerProxy) { Buffer* buffer = peerProxy->currentPacket->bytes; if (peerProxyWriteFromBuffer(peerProxy, buffer)) { LOGD("Bytes written."); peerProxyNextPacket(peerProxy); } }/** Sends a socket to the peer. */static void peerProxyWriteConnection(PeerProxy* peerProxy) { int socket = peerProxy->currentPacket->socket; // Why does sending and receiving fds have to be such a PITA? struct msghdr msg; struct iovec iov[1]; union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr *cmptr; msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); cmptr = CMSG_FIRSTHDR(&msg); cmptr->cmsg_len = CMSG_LEN(sizeof(int)); cmptr->cmsg_level = SOL_SOCKET; cmptr->cmsg_type = SCM_RIGHTS; // Store the socket in the message. *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket; msg.msg_name = NULL; msg.msg_namelen = 0; iov[0].iov_base = ""; iov[0].iov_len = 1; msg.msg_iov = iov; msg.msg_iovlen = 1; ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0); if (result < 0) { peerProxyHandleError(peerProxy, "sendmsg"); } else { // Success. Queue up the next packet. peerProxyNextPacket(peerProxy); }}/** * Writes some outgoing data. */static void peerProxyWrite(SelectableFd* fd) { // TODO: Try to write header and body with one system call. PeerProxy* peerProxy = (PeerProxy*) fd->data; OutgoingPacket* current = peerProxy->currentPacket; if (current == NULL) { // We have nothing left to write. return; } // Write the header. Buffer* outgoingHeader = &peerProxy->outgoingHeader; bool headerWritten = bufferWriteComplete(outgoingHeader); if (!headerWritten) { LOGD("Writing header..."); headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader); if (headerWritten) { LOGD("Header written."); } } // Write body. if (headerWritten) { PacketType type = current->header.type; switch (type) { case CONNECTION: peerProxyWriteConnection(peerProxy); break; case BYTES: peerProxyWriteBytes(peerProxy); break; case CONNECTION_REQUEST: case CONNECTION_ERROR: // These packets consist solely of a header. peerProxyNextPacket(peerProxy); break; default: LOG_ALWAYS_FATAL("Unknown packet type: %d", type); } }}/** * Sets up a peer proxy's fd before we try to select() it. */static void peerProxyBeforeSelect(SelectableFd* fd) { LOGD("Before select..."); PeerProxy* peerProxy = (PeerProxy*) fd->data; peerLock(peerProxy->peer); bool hasPackets = peerProxy->currentPacket != NULL; peerUnlock(peerProxy->peer); if (hasPackets) { LOGD("Packets found. Setting onWritable()."); fd->onWritable = &peerProxyWrite; } else { // We have nothing to write. fd->onWritable = NULL; }}/** Prepare to read bytes from the peer. */static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) { LOGD("Expecting %d bytes.", header->size); peerProxy->inputState = READING_BYTES; if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) { LOGW("Couldn't allocate memory for incoming data. Size: %u", (unsigned int) header->size); // TODO: Ignore the packet and log a warning? peerProxyKill(peerProxy, false); }}/** * Gets a peer proxy for the given ID. Creates a peer proxy if necessary. * Sends a connection request to the master if desired. * * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died * or ENOMEM if memory couldn't be allocated. */static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid, bool requestConnection) { if (pid == peer->pid) { errno = EINVAL; return NULL; } if (peerIsDead(peer, pid)) { errno = EHOSTDOWN; return NULL; } PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid); if (peerProxy != NULL) { return peerProxy; } // If this is the master peer, we already know about all peers. if (peer->master) { errno = EHOSTDOWN; return NULL; } // Try to create a peer proxy. Credentials credentials; credentials.pid = pid; // Fake gid and uid until we have the real thing. The real creds are // filled in by masterProxyExpectConnection(). These fake creds will // never be exposed to the user. credentials.uid = 0; credentials.gid = 0; // Make sure we can allocate the connection request packet. OutgoingPacket* packet = NULL; if (requestConnection) { packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { errno = ENOMEM; return NULL; } packet->header.type = CONNECTION_REQUEST; packet->header.credentials = credentials; packet->free = &outgoingPacketFree; } peerProxy = peerProxyCreate(peer, credentials); if (peerProxy == NULL) { free(packet); errno = ENOMEM; return NULL; } else { // Send a connection request to the master. if (requestConnection) { PeerProxy* masterProxy = peer->masterProxy; peerProxyEnqueueOutgoingPacket(masterProxy, packet); } return peerProxy; }}/** * Switches the master peer proxy into a state where it's waiting for a * connection from the master. */static void masterProxyExpectConnection(PeerProxy* masterProxy, Header* header) { // TODO: Restructure things so we don't need this check. // Verify that this really is the master. if (!masterProxy->master) { LOGW("Non-master process %d tried to send us a connection.", masterProxy->credentials.pid); // Kill off the evil peer. peerProxyKill(masterProxy, false); return; } masterProxy->inputState = ACCEPTING_CONNECTION; Peer* localPeer = masterProxy->peer; // Create a peer proxy so we have somewhere to stash the creds. // See if we already have a proxy set up. pid_t pid = header->credentials.pid; peerLock(localPeer); PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false); if (peerProxy == NULL) { LOGW("Peer proxy creation failed: %s", strerror(errno)); } else { // Fill in full credentials. peerProxy->credentials = header->credentials; } peerUnlock(localPeer); // Keep track of which peer proxy we're accepting a connection for. masterProxy->connecting = peerProxy;}/** * Reads input from a peer process. */static void peerProxyRead(SelectableFd* fd);/** Sets up fd callbacks. */static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) { peerProxy->fd = fd; fd->data = peerProxy; fd->onReadable = &peerProxyRead; fd->beforeSelect = &peerProxyBeforeSelect; // Make the socket non-blocking. setNonBlocking(fd->fd);}/** * Accepts a connection sent by the master proxy. */static void masterProxyAcceptConnection(PeerProxy* masterProxy) { struct msghdr msg; struct iovec iov[1]; ssize_t size; char ignored; int incomingFd; // TODO: Reuse code which writes the connection. Who the heck designed // this API anyway? union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr *cmptr; msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); msg.msg_name = NULL; msg.msg_namelen = 0; // We sent 1 byte of data so we can detect EOF. iov[0].iov_base = &ignored; iov[0].iov_len = 1; msg.msg_iov = iov; msg.msg_iovlen = 1; size = recvmsg(masterProxy->fd->fd, &msg, 0); if (size < 0) { if (errno == EINTR) { // Log interruptions but otherwise ignore them. LOGW("recvmsg() interrupted."); return; } else if (errno == EAGAIN) { // Keep waiting for the connection. return; } else { LOG_ALWAYS_FATAL("Error reading connection from master: %s", strerror(errno)); } } else if (size == 0) { // EOF. LOG_ALWAYS_FATAL("Received EOF from master."); } // Extract fd from message. if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) { if (cmptr->cmsg_level != SOL_SOCKET) { LOG_ALWAYS_FATAL("Expected SOL_SOCKET."); } if (cmptr->cmsg_type != SCM_RIGHTS) { LOG_ALWAYS_FATAL("Expected SCM_RIGHTS."); } incomingFd = *((int*) CMSG_DATA(cmptr)); } else { LOG_ALWAYS_FATAL("Expected fd."); } // The peer proxy this connection is for. PeerProxy* peerProxy = masterProxy->connecting; if (peerProxy == NULL) { LOGW("Received connection for unknown peer."); closeWithWarning(incomingFd); } else { Peer* peer = masterProxy->peer; SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd); if (selectableFd == NULL) { LOGW("Error adding fd to selector for %d.", peerProxy->credentials.pid); closeWithWarning(incomingFd); peerProxyKill(peerProxy, false); } peerProxySetFd(peerProxy, selectableFd); } peerProxyExpectHeader(masterProxy);}/** * Frees an outgoing packet containing a connection. */static void outgoingPacketFreeSocket(OutgoingPacket* packet) { closeWithWarning(packet->socket); outgoingPacketFree(packet);}/** * Connects two known peers. */static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) { int sockets[2]; int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets); if (result == -1) { LOGW("socketpair() error: %s", strerror(errno)); // TODO: Send CONNECTION_FAILED packets to peers. return; } OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket)); OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket)); if (packetA == NULL || packetB == NULL) { free(packetA); free(packetB); LOGW("malloc() error. Failed to tell process %d that process %d is" " dead.", peerA->credentials.pid, peerB->credentials.pid); return; } packetA->header.type = CONNECTION; packetB->header.type = CONNECTION; packetA->header.credentials = peerB->credentials; packetB->header.credentials = peerA->credentials; packetA->socket = sockets[0]; packetB->socket = sockets[1]; packetA->free = &outgoingPacketFreeSocket; packetB->free = &outgoingPacketFreeSocket; peerLock(peerA->peer); peerProxyEnqueueOutgoingPacket(peerA, packetA); peerProxyEnqueueOutgoingPacket(peerB, packetB); peerUnlock(peerA->peer);}/** * Informs a peer that the peer they're trying to connect to couldn't be * found. */static void masterReportConnectionError(PeerProxy* peerProxy, Credentials credentials) { OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { LOGW("malloc() error. Failed to tell process %d that process %d is" " dead.", peerProxy->credentials.pid, credentials.pid); return; } packet->header.type = CONNECTION_ERROR; packet->header.credentials = credentials; packet->free = &outgoingPacketFree; peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet); }/** * Handles a request to be connected to another peer. */static void masterHandleConnectionRequest(PeerProxy* peerProxy, Header* header) { Peer* master = peerProxy->peer; pid_t targetPid = header->credentials.pid; if (!hashmapContainsKey(peerProxy->connections, &targetPid)) { // We haven't connected these peers yet. PeerProxy* targetPeer = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid); if (targetPeer == NULL) { // Unknown process. masterReportConnectionError(peerProxy, header->credentials); } else { masterConnectPeers(peerProxy, targetPeer); } } // This packet is complete. Get ready for the next one. peerProxyExpectHeader(peerProxy);}/** * The master told us this peer is dead. */static void masterProxyHandleConnectionError(PeerProxy* masterProxy, Header* header) { Peer* peer = masterProxy->peer; // Look up the peer proxy. pid_t pid = header->credentials.pid; PeerProxy* peerProxy = NULL; peerLock(peer); peerProxy = hashmapGet(peer->peerProxies, &pid); peerUnlock(peer); if (peerProxy != NULL) { LOGI("Couldn't connect to %d.", pid); peerProxyKill(peerProxy, false);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -