⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mq.c

📁 Android 一些工具
💻 C
📖 第 1 页 / 共 3 页
字号:
    } else {        LOGW("Peer proxy for %d not found. This shouldn't happen.", pid);    }        peerProxyExpectHeader(masterProxy);}/** * Handles a packet header. */static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {    switch (header->type) {        case CONNECTION_REQUEST:            masterHandleConnectionRequest(peerProxy, header);            break;        case CONNECTION:            masterProxyExpectConnection(peerProxy, header);            break;        case CONNECTION_ERROR:            masterProxyHandleConnectionError(peerProxy, header);            break;        case BYTES:                peerProxyExpectBytes(peerProxy, header);            break;        default:            LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,                     header->type);            peerProxyKill(peerProxy, false);    }}/** * Buffers input sent by peer. May be called multiple times until the entire * buffer is filled. Returns true when the buffer is full. */static bool peerProxyBufferInput(PeerProxy* peerProxy) {    Buffer* in = peerProxy->inputBuffer;    ssize_t size = bufferRead(in, peerProxy->fd->fd);    if (size < 0) {        peerProxyHandleError(peerProxy, "read");        return false;    } else if (size == 0) {        // EOF.    	LOGI("EOF");        peerProxyKill(peerProxy, false);        return false;    } else if (bufferReadComplete(in)) {        // We're done!        return true;    } else {        // Continue reading.        return false;    }}/** * Reads input from a peer process. */static void peerProxyRead(SelectableFd* fd) {    LOGD("Reading...");    PeerProxy* peerProxy = (PeerProxy*) fd->data;    int state = peerProxy->inputState;    Buffer* in = peerProxy->inputBuffer;    switch (state) {        case READING_HEADER:            if (peerProxyBufferInput(peerProxy)) {                LOGD("Header read.");                // We've read the complete header.                Header* header = (Header*) in->data;                peerProxyHandleHeader(peerProxy, header);            }            break;        case READING_BYTES:            LOGD("Reading bytes...");            if (peerProxyBufferInput(peerProxy)) {                LOGD("Bytes read.");                // We have the complete packet. Notify bytes listener.                peerProxy->peer->onBytes(peerProxy->credentials,                    in->data, in->size);                                        // Get ready for the next packet.                peerProxyExpectHeader(peerProxy);            }            break;        case ACCEPTING_CONNECTION:            masterProxyAcceptConnection(peerProxy);            break;        default:            LOG_ALWAYS_FATAL("Unknown state: %d", state);    }}static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {    PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));    if (peerProxy == NULL) {        return NULL;    }       peerProxy->inputBuffer = bufferCreate(sizeof(Header));    if (peerProxy->inputBuffer == NULL) {        free(peerProxy);        return NULL;    }    peerProxy->peer = peer;    peerProxy->credentials = credentials;    // Initial state == expecting a header.    peerProxyExpectHeader(peerProxy);       // Add this proxy to the map. Make sure the key points to the stable memory    // inside of the peer proxy itself.    pid_t* pid = &(peerProxy->credentials.pid);    hashmapPut(peer->peerProxies, pid, peerProxy);    return peerProxy;}/** Accepts a connection to the master peer. */static void masterAcceptConnection(SelectableFd* listenerFd) {    // Accept connection.    int socket = accept(listenerFd->fd, NULL, NULL);    if (socket == -1) {        LOGW("accept() error: %s", strerror(errno));        return;    }        LOGD("Accepted connection as fd %d.", socket);        // Get credentials.    Credentials credentials;    struct ucred ucredentials;    socklen_t credentialsSize = sizeof(struct ucred);    int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,                 &ucredentials, &credentialsSize);    // We might want to verify credentialsSize.    if (result == -1) {        LOGW("getsockopt() error: %s", strerror(errno));        closeWithWarning(socket);        return;    }    // Copy values into our own structure so we know we have the types right.    credentials.pid = ucredentials.pid;    credentials.uid = ucredentials.uid;    credentials.gid = ucredentials.gid;        LOGI("Accepted connection from process %d.", credentials.pid);       Peer* masterPeer = (Peer*) listenerFd->data;        peerLock(masterPeer);        // Make sure we don't already have a connection from that process.    PeerProxy* peerProxy         = hashmapGet(masterPeer->peerProxies, &credentials.pid);    if (peerProxy != NULL) {        peerUnlock(masterPeer);        LOGW("Alread connected to process %d.", credentials.pid);        closeWithWarning(socket);        return;    }       // Add connection to the selector.    SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);    if (socketFd == NULL) {        peerUnlock(masterPeer);        LOGW("malloc() failed.");        closeWithWarning(socket);        return;    }    // Create a peer proxy.    peerProxy = peerProxyCreate(masterPeer, credentials);    peerUnlock(masterPeer);    if (peerProxy == NULL) {        LOGW("malloc() failed.");        socketFd->remove = true;        closeWithWarning(socket);    }    peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);    peerProxySetFd(peerProxy, socketFd);}/** * Creates the local peer. */static Peer* peerCreate() {    Peer* peer = calloc(1, sizeof(Peer));    if (peer == NULL) {        LOG_ALWAYS_FATAL("malloc() error.");    }    peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);    peer->selector = selectorCreate();        pthread_mutexattr_t attributes;    if (pthread_mutexattr_init(&attributes) != 0) {        LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");    }    if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {        LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");    }    if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {        LOG_ALWAYS_FATAL("pthread_mutex_init() error.");    }        peer->pid = getpid();    return peer;}/** The local peer. */static Peer* localPeer;/** Frees a packet of bytes. */static void outgoingPacketFreeBytes(OutgoingPacket* packet) {    LOGD("Freeing outgoing packet.");    bufferFree(packet->bytes);    free(packet);}/** * Sends a packet of bytes to a remote peer. Returns 0 on success. * * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno * to EINVAL if pid is the same as the local pid. */int peerSendBytes(pid_t pid, const char* bytes, size_t size) {	Peer* peer = localPeer;    assert(peer != NULL);    OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));    if (packet == NULL) {        errno = ENOMEM;        return -1;    }    Buffer* copy = bufferCreate(size);     if (copy == NULL) {        free(packet);        errno = ENOMEM;        return -1;    }    // Copy data.    memcpy(copy->data, bytes, size);    copy->size = size;        packet->bytes = copy;    packet->header.type = BYTES;    packet->header.size = size;    packet->free = outgoingPacketFreeBytes;    bufferPrepareForWrite(packet->bytes);        peerLock(peer);        PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);    if (peerProxy == NULL) {        // The peer is already dead or we couldn't alloc memory. Either way,        // errno is set.        peerUnlock(peer);        packet->free(packet);         return -1;    } else {        peerProxyEnqueueOutgoingPacket(peerProxy, packet);        peerUnlock(peer);        selectorWakeUp(peer->selector);        return 0;     }}/** Keeps track of how to free shared bytes. */typedef struct {    void (*free)(void* context);    void* context;} SharedBytesFreer;/** Frees shared bytes. */static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {    SharedBytesFreer* sharedBytesFreer         = (SharedBytesFreer*) packet->context;    sharedBytesFreer->free(sharedBytesFreer->context);    free(sharedBytesFreer);    free(packet);}/** * Sends a packet of bytes to a remote peer without copying the bytes. Calls * free() with context after the bytes have been sent. * * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno * to EINVAL if pid is the same as the local pid. */int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,        void (*free)(void* context), void* context) {    Peer* peer = localPeer;    assert(peer != NULL);    OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));    if (packet == NULL) {        errno = ENOMEM;        return -1;    }    Buffer* wrapper = bufferWrap(bytes, size, size);    if (wrapper == NULL) {        free(packet);        errno = ENOMEM;        return -1;    }    SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));    if (sharedBytesFreer == NULL) {        free(packet);        free(wrapper);        errno = ENOMEM;        return -1;    }    sharedBytesFreer->free = free;    sharedBytesFreer->context = context;        packet->bytes = wrapper;    packet->context = sharedBytesFreer;    packet->header.type = BYTES;    packet->header.size = size;    packet->free = &outgoingPacketFreeSharedBytes;    bufferPrepareForWrite(packet->bytes);        peerLock(peer);        PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);    if (peerProxy == NULL) {        // The peer is already dead or we couldn't alloc memory. Either way,        // errno is set.        peerUnlock(peer);        packet->free(packet);         return -1;    } else {        peerProxyEnqueueOutgoingPacket(peerProxy, packet);        peerUnlock(peer);        selectorWakeUp(peer->selector);        return 0;     }}/** * Starts the master peer. The master peer differs from other peers in that * it is responsible for connecting the other peers. You can only have one * master peer. * * Goes into an I/O loop and does not return. */void masterPeerInitialize(BytesListener* bytesListener,         DeathListener* deathListener) {    // Create and bind socket.    int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);    if (listenerSocket == -1) {        LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));    }    unlink(MASTER_PATH);    int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),             sizeof(UnixAddress));    if (result == -1) {        LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));    }    LOGD("Listener socket: %d",  listenerSocket);           // Queue up to 16 connections.    result = listen(listenerSocket, 16);    if (result != 0) {        LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));    }    // Make socket non-blocking.    setNonBlocking(listenerSocket);    // Create the peer for this process. Fail if we already have one.    if (localPeer != NULL) {        LOG_ALWAYS_FATAL("Peer is already initialized.");    }    localPeer = peerCreate();    if (localPeer == NULL) {        LOG_ALWAYS_FATAL("malloc() failed.");    }    localPeer->master = true;    localPeer->onBytes = bytesListener;    localPeer->onDeath = deathListener;        // Make listener socket selectable.    SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);    if (listenerFd == NULL) {        LOG_ALWAYS_FATAL("malloc() error.");    }    listenerFd->data = localPeer;    listenerFd->onReadable = &masterAcceptConnection;}/** * Starts a local peer. * * Goes into an I/O loop and does not return. */void peerInitialize(BytesListener* bytesListener,         DeathListener* deathListener) {    // Connect to master peer.    int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);    if (masterSocket == -1) {        LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));    }    int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),            sizeof(UnixAddress));    if (result != 0) {        LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));    }    // Create the peer for this process. Fail if we already have one.    if (localPeer != NULL) {        LOG_ALWAYS_FATAL("Peer is already initialized.");    }    localPeer = peerCreate();    if (localPeer == NULL) {        LOG_ALWAYS_FATAL("malloc() failed.");    }    localPeer->onBytes = bytesListener;    localPeer->onDeath = deathListener;        // Make connection selectable.    SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);    if (masterFd == NULL) {        LOG_ALWAYS_FATAL("malloc() error.");    }    // Create a peer proxy for the master peer.    PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);    if (masterProxy == NULL) {        LOG_ALWAYS_FATAL("malloc() error.");    }    peerProxySetFd(masterProxy, masterFd);    masterProxy->master = true;    localPeer->masterProxy = masterProxy;}/** Starts the master peer I/O loop. Doesn't return. */void peerLoop() {    assert(localPeer != NULL);        // Start selector.    selectorLoop(localPeer->selector);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -