⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mq.c

📁 Android 一些工具
💻 C
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (C) 2007 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *      http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */#define LOG_TAG "mq"#include <assert.h>#include <errno.h>#include <fcntl.h>#include <pthread.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/socket.h>#include <sys/types.h>#include <sys/un.h>#include <sys/uio.h>#include <cutils/array.h>#include <cutils/hashmap.h>#include <cutils/selector.h>#include "loghack.h"#include "buffer.h"/** Number of dead peers to remember. */#define PEER_HISTORY (16)typedef struct sockaddr SocketAddress;typedef struct sockaddr_un UnixAddress;/**  * Process/user/group ID. We don't use ucred directly because it's only  * available on Linux. */typedef struct {    pid_t pid;    uid_t uid;    gid_t gid;} Credentials;/** Listens for bytes coming from remote peers. */typedef void BytesListener(Credentials credentials, char* bytes, size_t size);/** Listens for the deaths of remote peers. */typedef void DeathListener(pid_t pid);/** Types of packets. */typedef enum {    /** Request for a connection to another peer. */    CONNECTION_REQUEST,     /** A connection to another peer. */    CONNECTION,     /** Reports a failed connection attempt. */    CONNECTION_ERROR,     /** A generic packet of bytes. */    BYTES,} PacketType;typedef enum {    /** Reading a packet header. */    READING_HEADER,    /** Waiting for a connection from the master. */    ACCEPTING_CONNECTION,    /** Reading bytes. */    READING_BYTES,} InputState;/** A packet header. */// TODO: Use custom headers for master->peer, peer->master, peer->peer.typedef struct {    PacketType type;    union {        /** Packet size. Used for BYTES. */        size_t size;        /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */        Credentials credentials;     };} Header;/** A packet which will be sent to a peer. */typedef struct OutgoingPacket OutgoingPacket;struct OutgoingPacket {    /** Packet header. */    Header header;         union {        /** Connection to peer. Used with CONNECTION. */        int socket;                /** Buffer of bytes. Used with BYTES. */        Buffer* bytes;    };    /** Frees all resources associated with this packet. */    void (*free)(OutgoingPacket* packet);       /** Optional context. */    void* context;        /** Next packet in the queue. */    OutgoingPacket* nextPacket;};/** Represents a remote peer. */typedef struct PeerProxy PeerProxy;/** Local peer state. You typically have one peer per process. */typedef struct {    /** This peer's PID. */    pid_t pid;        /**      * Map from pid to peer proxy. The peer has a peer proxy for each remote     * peer it's connected to.      *     * Acquire mutex before use.     */    Hashmap* peerProxies;    /** Manages I/O. */    Selector* selector;       /** Used to synchronize operations with the selector thread. */    pthread_mutex_t mutex;     /** Is this peer the master? */    bool master;    /** Peer proxy for the master. */    PeerProxy* masterProxy;        /** Listens for packets from remote peers. */    BytesListener* onBytes;        /** Listens for deaths of remote peers. */    DeathListener* onDeath;        /** Keeps track of recently dead peers. Requires mutex. */    pid_t deadPeers[PEER_HISTORY];    size_t deadPeerCursor;} Peer;struct PeerProxy {    /** Credentials of the remote process. */    Credentials credentials;    /** Keeps track of data coming in from the remote peer. */    InputState inputState;    Buffer* inputBuffer;    PeerProxy* connecting;    /** File descriptor for this peer. */    SelectableFd* fd;    /**      * Queue of packets to be written out to the remote peer.     *     * Requires mutex.     */    // TODO: Limit queue length.    OutgoingPacket* currentPacket;    OutgoingPacket* lastPacket;        /** Used to write outgoing header. */    Buffer outgoingHeader;        /** True if this is the master's proxy. */    bool master;    /** Reference back to the local peer. */    Peer* peer;    /**     * Used in master only. Maps this peer proxy to other peer proxies to     * which the peer has been connected to. Maps pid to PeerProxy. Helps     * keep track of which connections we've sent to whom.     */    Hashmap* connections;};/** Server socket path. */static const char* MASTER_PATH = "/master.peer";/** Credentials of the master peer. */static const Credentials MASTER_CREDENTIALS = {0, 0, 0};/** Creates a peer proxy and adds it to the peer proxy map. */static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);/** Sets the non-blocking flag on a descriptor. */static void setNonBlocking(int fd) {    int flags;    if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {         LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));    }     if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {         LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));    } }/** Closes a fd and logs a warning if the close fails. */static void closeWithWarning(int fd) {    int result = close(fd);    if (result == -1) {        LOGW("close() error: %s", strerror(errno));    }}/** Hashes pid_t keys. */static int pidHash(void* key) {    pid_t* pid = (pid_t*) key;    return (int) (*pid);}/** Compares pid_t keys. */static bool pidEquals(void* keyA, void* keyB) {    pid_t* a = (pid_t*) keyA;    pid_t* b = (pid_t*) keyB;    return *a == *b;}/** Gets the master address. Not thread safe. */static UnixAddress* getMasterAddress() {    static UnixAddress masterAddress;    static bool initialized = false;    if (initialized == false) {        masterAddress.sun_family = AF_LOCAL;        strcpy(masterAddress.sun_path, MASTER_PATH);         initialized = true;    }    return &masterAddress;}/** Gets exclusive access to the peer for this thread. */static void peerLock(Peer* peer) {    pthread_mutex_lock(&peer->mutex);}/** Releases exclusive access to the peer. */static void peerUnlock(Peer* peer) {    pthread_mutex_unlock(&peer->mutex);}/** Frees a simple, i.e. header-only, outgoing packet. */static void outgoingPacketFree(OutgoingPacket* packet) {    LOGD("Freeing outgoing packet.");	free(packet);}/** * Prepare to read a new packet from the peer. */static void peerProxyExpectHeader(PeerProxy* peerProxy) {    peerProxy->inputState = READING_HEADER;    bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));}/** Sets up the buffer for the outgoing header. */static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {    peerProxy->outgoingHeader.data         = (char*) &(peerProxy->currentPacket->header);    peerProxy->outgoingHeader.size = sizeof(Header);    bufferPrepareForWrite(&peerProxy->outgoingHeader);}/** Adds a packet to the end of the queue. Callers must have the mutex. */static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,        OutgoingPacket* newPacket) {    newPacket->nextPacket = NULL; // Just in case.    if (peerProxy->currentPacket == NULL) {        // The queue is empty.        peerProxy->currentPacket = newPacket;        peerProxy->lastPacket = newPacket;                peerProxyPrepareOutgoingHeader(peerProxy);     } else {        peerProxy->lastPacket->nextPacket = newPacket;    }}/** Takes the peer lock and enqueues the given packet. */static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,        OutgoingPacket* newPacket) {    Peer* peer = peerProxy->peer;    peerLock(peer);    peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);    peerUnlock(peer);}/**  * Frees current packet and moves to the next one. Returns true if there is * a next packet or false if the queue is empty. */static bool peerProxyNextPacket(PeerProxy* peerProxy) {    Peer* peer = peerProxy->peer;    peerLock(peer);        OutgoingPacket* current = peerProxy->currentPacket;        if (current == NULL) {    	// The queue is already empty.        peerUnlock(peer);        return false;    }        OutgoingPacket* next = current->nextPacket;    peerProxy->currentPacket = next;    current->nextPacket = NULL;    current->free(current);    if (next == NULL) {        // The queue is empty.        peerProxy->lastPacket = NULL;        peerUnlock(peer);        return false;    } else {        peerUnlock(peer);        peerProxyPrepareOutgoingHeader(peerProxy);         // TODO: Start writing next packet? It would reduce the number of        // system calls, but we could also starve other peers.        return true;    }}/** * Checks whether a peer died recently. */static bool peerIsDead(Peer* peer, pid_t pid) {    size_t i;    for (i = 0; i < PEER_HISTORY; i++) {        pid_t deadPeer = peer->deadPeers[i];        if (deadPeer == 0) {            return false;        }        if (deadPeer == pid) {            return true;        }    }    return false;}/** * Cleans up connection information. */static bool peerProxyRemoveConnection(void* key, void* value, void* context) {    PeerProxy* deadPeer = (PeerProxy*) context;    PeerProxy* otherPeer = (PeerProxy*) value;    hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));    return true;}/** * Called when the peer dies. */static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {    if (errnoIsSet) {        LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,                 strerror(errno));    } else {        LOGI("Peer %d died.", peerProxy->credentials.pid);    }        // If we lost the master, we're up a creek. We can't let this happen.    if (peerProxy->master) {            LOG_ALWAYS_FATAL("Lost connection to master.");    }    Peer* localPeer = peerProxy->peer;    pid_t pid = peerProxy->credentials.pid;        peerLock(localPeer);        // Remember for awhile that the peer died.    localPeer->deadPeers[localPeer->deadPeerCursor]         = peerProxy->credentials.pid;    localPeer->deadPeerCursor++;    if (localPeer->deadPeerCursor == PEER_HISTORY) {        localPeer->deadPeerCursor = 0;    }      // Remove from peer map.    hashmapRemove(localPeer->peerProxies, &pid);        // External threads can no longer get to this peer proxy, so we don't     // need the lock anymore.    peerUnlock(localPeer);        // Remove the fd from the selector.    if (peerProxy->fd != NULL) {        peerProxy->fd->remove = true;    }    // Clear outgoing packet queue.    while (peerProxyNextPacket(peerProxy)) {}    bufferFree(peerProxy->inputBuffer);    // This only applies to the master.    if (peerProxy->connections != NULL) {        // We can't leave these other maps pointing to freed memory.        hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,                 peerProxy);        hashmapFree(peerProxy->connections);    }    // Invoke death listener.    localPeer->onDeath(pid);    // Free the peer proxy itself.    free(peerProxy);}static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {    if (errno == EINTR) {        // Log interruptions but otherwise ignore them.        LOGW("%s() interrupted.", functionName);    } else if (errno == EAGAIN) {    	LOGD("EWOULDBLOCK");        // Ignore.    } else {        LOGW("Error returned by %s().", functionName);        peerProxyKill(peerProxy, true);    }}/** * Buffers output sent to a peer. May be called multiple times until the entire * buffer is filled. Returns true when the buffer is empty. */static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {    ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);    if (size < 0) {        peerProxyHandleError(peerProxy, "write");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -