⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mq.c

📁 Android 一些工具
💻 C
📖 第 1 页 / 共 3 页
字号:
        return false;    } else {        return bufferWriteComplete(outgoing);    }}/** Writes packet bytes to peer. */static void peerProxyWriteBytes(PeerProxy* peerProxy) {		Buffer* buffer = peerProxy->currentPacket->bytes;	if (peerProxyWriteFromBuffer(peerProxy, buffer)) {        LOGD("Bytes written.");        peerProxyNextPacket(peerProxy);    }    }/** Sends a socket to the peer. */static void peerProxyWriteConnection(PeerProxy* peerProxy) {    int socket = peerProxy->currentPacket->socket;    // Why does sending and receiving fds have to be such a PITA?    struct msghdr msg;    struct iovec iov[1];        union {        struct cmsghdr cm;        char control[CMSG_SPACE(sizeof(int))];    } control_un;       struct cmsghdr *cmptr;        msg.msg_control = control_un.control;    msg.msg_controllen = sizeof(control_un.control);    cmptr = CMSG_FIRSTHDR(&msg);    cmptr->cmsg_len = CMSG_LEN(sizeof(int));    cmptr->cmsg_level = SOL_SOCKET;    cmptr->cmsg_type = SCM_RIGHTS;       // Store the socket in the message.    *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;    msg.msg_name = NULL;    msg.msg_namelen = 0;    iov[0].iov_base = "";    iov[0].iov_len = 1;    msg.msg_iov = iov;    msg.msg_iovlen = 1;    ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);        if (result < 0) {        peerProxyHandleError(peerProxy, "sendmsg");    } else {        // Success. Queue up the next packet.        peerProxyNextPacket(peerProxy);            }}/** * Writes some outgoing data. */static void peerProxyWrite(SelectableFd* fd) {    // TODO: Try to write header and body with one system call.    PeerProxy* peerProxy = (PeerProxy*) fd->data;    OutgoingPacket* current = peerProxy->currentPacket;        if (current == NULL) {        // We have nothing left to write.        return;    }    // Write the header.    Buffer* outgoingHeader = &peerProxy->outgoingHeader;    bool headerWritten = bufferWriteComplete(outgoingHeader);    if (!headerWritten) {        LOGD("Writing header...");        headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);        if (headerWritten) {            LOGD("Header written.");        }    }        // Write body.    if (headerWritten) {        PacketType type = current->header.type;        switch (type) {            case CONNECTION:                peerProxyWriteConnection(peerProxy);                break;            case BYTES:                peerProxyWriteBytes(peerProxy);                break;            case CONNECTION_REQUEST:            case CONNECTION_ERROR:                // These packets consist solely of a header.                peerProxyNextPacket(peerProxy);                break;            default:                LOG_ALWAYS_FATAL("Unknown packet type: %d", type);         }    }}/** * Sets up a peer proxy's fd before we try to select() it. */static void peerProxyBeforeSelect(SelectableFd* fd) {    LOGD("Before select...");    PeerProxy* peerProxy = (PeerProxy*) fd->data;      peerLock(peerProxy->peer);    bool hasPackets = peerProxy->currentPacket != NULL;    peerUnlock(peerProxy->peer);        if (hasPackets) {        LOGD("Packets found. Setting onWritable().");                    fd->onWritable = &peerProxyWrite;    } else {        // We have nothing to write.        fd->onWritable = NULL;    }}/** Prepare to read bytes from the peer. */static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {	LOGD("Expecting %d bytes.", header->size);		peerProxy->inputState = READING_BYTES;    if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {        LOGW("Couldn't allocate memory for incoming data. Size: %u",                (unsigned int) header->size);                    // TODO: Ignore the packet and log a warning?        peerProxyKill(peerProxy, false);    }}/** * Gets a peer proxy for the given ID. Creates a peer proxy if necessary. * Sends a connection request to the master if desired. * * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died * or ENOMEM if memory couldn't be allocated. */static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,         bool requestConnection) {    if (pid == peer->pid) {        errno = EINVAL;        return NULL;    }        if (peerIsDead(peer, pid)) {        errno = EHOSTDOWN;        return NULL;    }        PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);    if (peerProxy != NULL) {        return peerProxy;    }    // If this is the master peer, we already know about all peers.    if (peer->master) {        errno = EHOSTDOWN;        return NULL;    }    // Try to create a peer proxy.    Credentials credentials;    credentials.pid = pid;    // Fake gid and uid until we have the real thing. The real creds are    // filled in by masterProxyExpectConnection(). These fake creds will    // never be exposed to the user.    credentials.uid = 0;    credentials.gid = 0;    // Make sure we can allocate the connection request packet.    OutgoingPacket* packet = NULL;    if (requestConnection) {        packet = calloc(1, sizeof(OutgoingPacket));        if (packet == NULL) {            errno = ENOMEM;            return NULL;        }        packet->header.type = CONNECTION_REQUEST;        packet->header.credentials = credentials;        packet->free = &outgoingPacketFree;    }        peerProxy = peerProxyCreate(peer, credentials);    if (peerProxy == NULL) {        free(packet);        errno = ENOMEM;        return NULL;    } else {        // Send a connection request to the master.        if (requestConnection) {            PeerProxy* masterProxy = peer->masterProxy;            peerProxyEnqueueOutgoingPacket(masterProxy, packet);        }                return peerProxy;    }}/** * Switches the master peer proxy into a state where it's waiting for a * connection from the master. */static void masterProxyExpectConnection(PeerProxy* masterProxy,        Header* header) {    // TODO: Restructure things so we don't need this check.    // Verify that this really is the master.    if (!masterProxy->master) {        LOGW("Non-master process %d tried to send us a connection.",             masterProxy->credentials.pid);        // Kill off the evil peer.        peerProxyKill(masterProxy, false);        return;    }        masterProxy->inputState = ACCEPTING_CONNECTION;    Peer* localPeer = masterProxy->peer;       // Create a peer proxy so we have somewhere to stash the creds.    // See if we already have a proxy set up.    pid_t pid = header->credentials.pid;    peerLock(localPeer);    PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);    if (peerProxy == NULL) {        LOGW("Peer proxy creation failed: %s", strerror(errno));    } else {        // Fill in full credentials.        peerProxy->credentials = header->credentials;    }    peerUnlock(localPeer);        // Keep track of which peer proxy we're accepting a connection for.    masterProxy->connecting = peerProxy;}/** * Reads input from a peer process. */static void peerProxyRead(SelectableFd* fd);/** Sets up fd callbacks. */static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {    peerProxy->fd = fd;    fd->data = peerProxy;    fd->onReadable = &peerProxyRead;    fd->beforeSelect = &peerProxyBeforeSelect;        // Make the socket non-blocking.    setNonBlocking(fd->fd);}/** * Accepts a connection sent by the master proxy. */static void masterProxyAcceptConnection(PeerProxy* masterProxy) {    struct msghdr msg;    struct iovec iov[1];    ssize_t size;    char ignored;    int incomingFd;        // TODO: Reuse code which writes the connection. Who the heck designed    // this API anyway?    union {        struct cmsghdr cm;        char control[CMSG_SPACE(sizeof(int))];    } control_un;    struct cmsghdr *cmptr;    msg.msg_control = control_un.control;    msg.msg_controllen = sizeof(control_un.control);    msg.msg_name = NULL;    msg.msg_namelen = 0;    // We sent 1 byte of data so we can detect EOF.    iov[0].iov_base = &ignored;    iov[0].iov_len = 1;    msg.msg_iov = iov;    msg.msg_iovlen = 1;    size = recvmsg(masterProxy->fd->fd, &msg, 0);    if (size < 0) {        if (errno == EINTR) {            // Log interruptions but otherwise ignore them.            LOGW("recvmsg() interrupted.");            return;        } else if (errno == EAGAIN) {            // Keep waiting for the connection.            return;        } else {            LOG_ALWAYS_FATAL("Error reading connection from master: %s",                    strerror(errno));        }    } else if (size == 0) {        // EOF.        LOG_ALWAYS_FATAL("Received EOF from master.");    }    // Extract fd from message.    if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {        if (cmptr->cmsg_level != SOL_SOCKET) {            LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");        }        if (cmptr->cmsg_type != SCM_RIGHTS) {            LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");        }        incomingFd = *((int*) CMSG_DATA(cmptr));    } else {        LOG_ALWAYS_FATAL("Expected fd.");    }        // The peer proxy this connection is for.    PeerProxy* peerProxy = masterProxy->connecting;    if (peerProxy == NULL) {        LOGW("Received connection for unknown peer.");        closeWithWarning(incomingFd);    } else {        Peer* peer = masterProxy->peer;                SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);        if (selectableFd == NULL) {            LOGW("Error adding fd to selector for %d.",                    peerProxy->credentials.pid);            closeWithWarning(incomingFd);            peerProxyKill(peerProxy, false);        }        peerProxySetFd(peerProxy, selectableFd);    }        peerProxyExpectHeader(masterProxy);}/** * Frees an outgoing packet containing a connection. */static void outgoingPacketFreeSocket(OutgoingPacket* packet) {    closeWithWarning(packet->socket);    outgoingPacketFree(packet);}/** * Connects two known peers. */static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {    int sockets[2];    int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);    if (result == -1) {        LOGW("socketpair() error: %s", strerror(errno));        // TODO: Send CONNECTION_FAILED packets to peers.        return;    }    OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));    OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));    if (packetA == NULL || packetB == NULL) {        free(packetA);        free(packetB);        LOGW("malloc() error. Failed to tell process %d that process %d is"                " dead.", peerA->credentials.pid, peerB->credentials.pid);        return;    }       packetA->header.type = CONNECTION;    packetB->header.type = CONNECTION;        packetA->header.credentials = peerB->credentials;    packetB->header.credentials = peerA->credentials;    packetA->socket = sockets[0];    packetB->socket = sockets[1];    packetA->free = &outgoingPacketFreeSocket;    packetB->free = &outgoingPacketFreeSocket;       peerLock(peerA->peer);    peerProxyEnqueueOutgoingPacket(peerA, packetA);       peerProxyEnqueueOutgoingPacket(peerB, packetB);       peerUnlock(peerA->peer);}/** * Informs a peer that the peer they're trying to connect to couldn't be * found. */static void masterReportConnectionError(PeerProxy* peerProxy,        Credentials credentials) {    OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));    if (packet == NULL) {        LOGW("malloc() error. Failed to tell process %d that process %d is"                " dead.", peerProxy->credentials.pid, credentials.pid);        return;    }       packet->header.type = CONNECTION_ERROR;    packet->header.credentials = credentials;    packet->free = &outgoingPacketFree;        peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);   }/** * Handles a request to be connected to another peer. */static void masterHandleConnectionRequest(PeerProxy* peerProxy,         Header* header) {    Peer* master = peerProxy->peer;    pid_t targetPid = header->credentials.pid;    if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {        // We haven't connected these peers yet.        PeerProxy* targetPeer             = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);        if (targetPeer == NULL) {            // Unknown process.            masterReportConnectionError(peerProxy, header->credentials);        } else {            masterConnectPeers(peerProxy, targetPeer);        }    }        // This packet is complete. Get ready for the next one.    peerProxyExpectHeader(peerProxy);}/** * The master told us this peer is dead. */static void masterProxyHandleConnectionError(PeerProxy* masterProxy,        Header* header) {    Peer* peer = masterProxy->peer;        // Look up the peer proxy.    pid_t pid = header->credentials.pid;    PeerProxy* peerProxy = NULL;    peerLock(peer);    peerProxy = hashmapGet(peer->peerProxies, &pid);    peerUnlock(peer);    if (peerProxy != NULL) {        LOGI("Couldn't connect to %d.", pid);        peerProxyKill(peerProxy, false);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -