📄 mq.c
字号:
/* * Copyright (C) 2007 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */#define LOG_TAG "mq"#include <assert.h>#include <errno.h>#include <fcntl.h>#include <pthread.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/socket.h>#include <sys/types.h>#include <sys/un.h>#include <sys/uio.h>#include <cutils/array.h>#include <cutils/hashmap.h>#include <cutils/selector.h>#include "loghack.h"#include "buffer.h"/** Number of dead peers to remember. */#define PEER_HISTORY (16)typedef struct sockaddr SocketAddress;typedef struct sockaddr_un UnixAddress;/** * Process/user/group ID. We don't use ucred directly because it's only * available on Linux. */typedef struct { pid_t pid; uid_t uid; gid_t gid;} Credentials;/** Listens for bytes coming from remote peers. */typedef void BytesListener(Credentials credentials, char* bytes, size_t size);/** Listens for the deaths of remote peers. */typedef void DeathListener(pid_t pid);/** Types of packets. */typedef enum { /** Request for a connection to another peer. */ CONNECTION_REQUEST, /** A connection to another peer. */ CONNECTION, /** Reports a failed connection attempt. */ CONNECTION_ERROR, /** A generic packet of bytes. */ BYTES,} PacketType;typedef enum { /** Reading a packet header. */ READING_HEADER, /** Waiting for a connection from the master. */ ACCEPTING_CONNECTION, /** Reading bytes. */ READING_BYTES,} InputState;/** A packet header. */// TODO: Use custom headers for master->peer, peer->master, peer->peer.typedef struct { PacketType type; union { /** Packet size. Used for BYTES. */ size_t size; /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */ Credentials credentials; };} Header;/** A packet which will be sent to a peer. */typedef struct OutgoingPacket OutgoingPacket;struct OutgoingPacket { /** Packet header. */ Header header; union { /** Connection to peer. Used with CONNECTION. */ int socket; /** Buffer of bytes. Used with BYTES. */ Buffer* bytes; }; /** Frees all resources associated with this packet. */ void (*free)(OutgoingPacket* packet); /** Optional context. */ void* context; /** Next packet in the queue. */ OutgoingPacket* nextPacket;};/** Represents a remote peer. */typedef struct PeerProxy PeerProxy;/** Local peer state. You typically have one peer per process. */typedef struct { /** This peer's PID. */ pid_t pid; /** * Map from pid to peer proxy. The peer has a peer proxy for each remote * peer it's connected to. * * Acquire mutex before use. */ Hashmap* peerProxies; /** Manages I/O. */ Selector* selector; /** Used to synchronize operations with the selector thread. */ pthread_mutex_t mutex; /** Is this peer the master? */ bool master; /** Peer proxy for the master. */ PeerProxy* masterProxy; /** Listens for packets from remote peers. */ BytesListener* onBytes; /** Listens for deaths of remote peers. */ DeathListener* onDeath; /** Keeps track of recently dead peers. Requires mutex. */ pid_t deadPeers[PEER_HISTORY]; size_t deadPeerCursor;} Peer;struct PeerProxy { /** Credentials of the remote process. */ Credentials credentials; /** Keeps track of data coming in from the remote peer. */ InputState inputState; Buffer* inputBuffer; PeerProxy* connecting; /** File descriptor for this peer. */ SelectableFd* fd; /** * Queue of packets to be written out to the remote peer. * * Requires mutex. */ // TODO: Limit queue length. OutgoingPacket* currentPacket; OutgoingPacket* lastPacket; /** Used to write outgoing header. */ Buffer outgoingHeader; /** True if this is the master's proxy. */ bool master; /** Reference back to the local peer. */ Peer* peer; /** * Used in master only. Maps this peer proxy to other peer proxies to * which the peer has been connected to. Maps pid to PeerProxy. Helps * keep track of which connections we've sent to whom. */ Hashmap* connections;};/** Server socket path. */static const char* MASTER_PATH = "/master.peer";/** Credentials of the master peer. */static const Credentials MASTER_CREDENTIALS = {0, 0, 0};/** Creates a peer proxy and adds it to the peer proxy map. */static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);/** Sets the non-blocking flag on a descriptor. */static void setNonBlocking(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); } if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); } }/** Closes a fd and logs a warning if the close fails. */static void closeWithWarning(int fd) { int result = close(fd); if (result == -1) { LOGW("close() error: %s", strerror(errno)); }}/** Hashes pid_t keys. */static int pidHash(void* key) { pid_t* pid = (pid_t*) key; return (int) (*pid);}/** Compares pid_t keys. */static bool pidEquals(void* keyA, void* keyB) { pid_t* a = (pid_t*) keyA; pid_t* b = (pid_t*) keyB; return *a == *b;}/** Gets the master address. Not thread safe. */static UnixAddress* getMasterAddress() { static UnixAddress masterAddress; static bool initialized = false; if (initialized == false) { masterAddress.sun_family = AF_LOCAL; strcpy(masterAddress.sun_path, MASTER_PATH); initialized = true; } return &masterAddress;}/** Gets exclusive access to the peer for this thread. */static void peerLock(Peer* peer) { pthread_mutex_lock(&peer->mutex);}/** Releases exclusive access to the peer. */static void peerUnlock(Peer* peer) { pthread_mutex_unlock(&peer->mutex);}/** Frees a simple, i.e. header-only, outgoing packet. */static void outgoingPacketFree(OutgoingPacket* packet) { LOGD("Freeing outgoing packet."); free(packet);}/** * Prepare to read a new packet from the peer. */static void peerProxyExpectHeader(PeerProxy* peerProxy) { peerProxy->inputState = READING_HEADER; bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));}/** Sets up the buffer for the outgoing header. */static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) { peerProxy->outgoingHeader.data = (char*) &(peerProxy->currentPacket->header); peerProxy->outgoingHeader.size = sizeof(Header); bufferPrepareForWrite(&peerProxy->outgoingHeader);}/** Adds a packet to the end of the queue. Callers must have the mutex. */static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy, OutgoingPacket* newPacket) { newPacket->nextPacket = NULL; // Just in case. if (peerProxy->currentPacket == NULL) { // The queue is empty. peerProxy->currentPacket = newPacket; peerProxy->lastPacket = newPacket; peerProxyPrepareOutgoingHeader(peerProxy); } else { peerProxy->lastPacket->nextPacket = newPacket; }}/** Takes the peer lock and enqueues the given packet. */static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy, OutgoingPacket* newPacket) { Peer* peer = peerProxy->peer; peerLock(peer); peerProxyEnqueueOutgoingPacket(peerProxy, newPacket); peerUnlock(peer);}/** * Frees current packet and moves to the next one. Returns true if there is * a next packet or false if the queue is empty. */static bool peerProxyNextPacket(PeerProxy* peerProxy) { Peer* peer = peerProxy->peer; peerLock(peer); OutgoingPacket* current = peerProxy->currentPacket; if (current == NULL) { // The queue is already empty. peerUnlock(peer); return false; } OutgoingPacket* next = current->nextPacket; peerProxy->currentPacket = next; current->nextPacket = NULL; current->free(current); if (next == NULL) { // The queue is empty. peerProxy->lastPacket = NULL; peerUnlock(peer); return false; } else { peerUnlock(peer); peerProxyPrepareOutgoingHeader(peerProxy); // TODO: Start writing next packet? It would reduce the number of // system calls, but we could also starve other peers. return true; }}/** * Checks whether a peer died recently. */static bool peerIsDead(Peer* peer, pid_t pid) { size_t i; for (i = 0; i < PEER_HISTORY; i++) { pid_t deadPeer = peer->deadPeers[i]; if (deadPeer == 0) { return false; } if (deadPeer == pid) { return true; } } return false;}/** * Cleans up connection information. */static bool peerProxyRemoveConnection(void* key, void* value, void* context) { PeerProxy* deadPeer = (PeerProxy*) context; PeerProxy* otherPeer = (PeerProxy*) value; hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid)); return true;}/** * Called when the peer dies. */static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) { if (errnoIsSet) { LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid, strerror(errno)); } else { LOGI("Peer %d died.", peerProxy->credentials.pid); } // If we lost the master, we're up a creek. We can't let this happen. if (peerProxy->master) { LOG_ALWAYS_FATAL("Lost connection to master."); } Peer* localPeer = peerProxy->peer; pid_t pid = peerProxy->credentials.pid; peerLock(localPeer); // Remember for awhile that the peer died. localPeer->deadPeers[localPeer->deadPeerCursor] = peerProxy->credentials.pid; localPeer->deadPeerCursor++; if (localPeer->deadPeerCursor == PEER_HISTORY) { localPeer->deadPeerCursor = 0; } // Remove from peer map. hashmapRemove(localPeer->peerProxies, &pid); // External threads can no longer get to this peer proxy, so we don't // need the lock anymore. peerUnlock(localPeer); // Remove the fd from the selector. if (peerProxy->fd != NULL) { peerProxy->fd->remove = true; } // Clear outgoing packet queue. while (peerProxyNextPacket(peerProxy)) {} bufferFree(peerProxy->inputBuffer); // This only applies to the master. if (peerProxy->connections != NULL) { // We can't leave these other maps pointing to freed memory. hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection, peerProxy); hashmapFree(peerProxy->connections); } // Invoke death listener. localPeer->onDeath(pid); // Free the peer proxy itself. free(peerProxy);}static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) { if (errno == EINTR) { // Log interruptions but otherwise ignore them. LOGW("%s() interrupted.", functionName); } else if (errno == EAGAIN) { LOGD("EWOULDBLOCK"); // Ignore. } else { LOGW("Error returned by %s().", functionName); peerProxyKill(peerProxy, true); }}/** * Buffers output sent to a peer. May be called multiple times until the entire * buffer is filled. Returns true when the buffer is empty. */static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) { ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd); if (size < 0) { peerProxyHandleError(peerProxy, "write");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -