📄 mq.c
字号:
} else { LOGW("Peer proxy for %d not found. This shouldn't happen.", pid); } peerProxyExpectHeader(masterProxy);}/** * Handles a packet header. */static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) { switch (header->type) { case CONNECTION_REQUEST: masterHandleConnectionRequest(peerProxy, header); break; case CONNECTION: masterProxyExpectConnection(peerProxy, header); break; case CONNECTION_ERROR: masterProxyHandleConnectionError(peerProxy, header); break; case BYTES: peerProxyExpectBytes(peerProxy, header); break; default: LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid, header->type); peerProxyKill(peerProxy, false); }}/** * Buffers input sent by peer. May be called multiple times until the entire * buffer is filled. Returns true when the buffer is full. */static bool peerProxyBufferInput(PeerProxy* peerProxy) { Buffer* in = peerProxy->inputBuffer; ssize_t size = bufferRead(in, peerProxy->fd->fd); if (size < 0) { peerProxyHandleError(peerProxy, "read"); return false; } else if (size == 0) { // EOF. LOGI("EOF"); peerProxyKill(peerProxy, false); return false; } else if (bufferReadComplete(in)) { // We're done! return true; } else { // Continue reading. return false; }}/** * Reads input from a peer process. */static void peerProxyRead(SelectableFd* fd) { LOGD("Reading..."); PeerProxy* peerProxy = (PeerProxy*) fd->data; int state = peerProxy->inputState; Buffer* in = peerProxy->inputBuffer; switch (state) { case READING_HEADER: if (peerProxyBufferInput(peerProxy)) { LOGD("Header read."); // We've read the complete header. Header* header = (Header*) in->data; peerProxyHandleHeader(peerProxy, header); } break; case READING_BYTES: LOGD("Reading bytes..."); if (peerProxyBufferInput(peerProxy)) { LOGD("Bytes read."); // We have the complete packet. Notify bytes listener. peerProxy->peer->onBytes(peerProxy->credentials, in->data, in->size); // Get ready for the next packet. peerProxyExpectHeader(peerProxy); } break; case ACCEPTING_CONNECTION: masterProxyAcceptConnection(peerProxy); break; default: LOG_ALWAYS_FATAL("Unknown state: %d", state); }}static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) { PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy)); if (peerProxy == NULL) { return NULL; } peerProxy->inputBuffer = bufferCreate(sizeof(Header)); if (peerProxy->inputBuffer == NULL) { free(peerProxy); return NULL; } peerProxy->peer = peer; peerProxy->credentials = credentials; // Initial state == expecting a header. peerProxyExpectHeader(peerProxy); // Add this proxy to the map. Make sure the key points to the stable memory // inside of the peer proxy itself. pid_t* pid = &(peerProxy->credentials.pid); hashmapPut(peer->peerProxies, pid, peerProxy); return peerProxy;}/** Accepts a connection to the master peer. */static void masterAcceptConnection(SelectableFd* listenerFd) { // Accept connection. int socket = accept(listenerFd->fd, NULL, NULL); if (socket == -1) { LOGW("accept() error: %s", strerror(errno)); return; } LOGD("Accepted connection as fd %d.", socket); // Get credentials. Credentials credentials; struct ucred ucredentials; socklen_t credentialsSize = sizeof(struct ucred); int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED, &ucredentials, &credentialsSize); // We might want to verify credentialsSize. if (result == -1) { LOGW("getsockopt() error: %s", strerror(errno)); closeWithWarning(socket); return; } // Copy values into our own structure so we know we have the types right. credentials.pid = ucredentials.pid; credentials.uid = ucredentials.uid; credentials.gid = ucredentials.gid; LOGI("Accepted connection from process %d.", credentials.pid); Peer* masterPeer = (Peer*) listenerFd->data; peerLock(masterPeer); // Make sure we don't already have a connection from that process. PeerProxy* peerProxy = hashmapGet(masterPeer->peerProxies, &credentials.pid); if (peerProxy != NULL) { peerUnlock(masterPeer); LOGW("Alread connected to process %d.", credentials.pid); closeWithWarning(socket); return; } // Add connection to the selector. SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket); if (socketFd == NULL) { peerUnlock(masterPeer); LOGW("malloc() failed."); closeWithWarning(socket); return; } // Create a peer proxy. peerProxy = peerProxyCreate(masterPeer, credentials); peerUnlock(masterPeer); if (peerProxy == NULL) { LOGW("malloc() failed."); socketFd->remove = true; closeWithWarning(socket); } peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals); peerProxySetFd(peerProxy, socketFd);}/** * Creates the local peer. */static Peer* peerCreate() { Peer* peer = calloc(1, sizeof(Peer)); if (peer == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals); peer->selector = selectorCreate(); pthread_mutexattr_t attributes; if (pthread_mutexattr_init(&attributes) != 0) { LOG_ALWAYS_FATAL("pthread_mutexattr_init() error."); } if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) { LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error."); } if (pthread_mutex_init(&peer->mutex, &attributes) != 0) { LOG_ALWAYS_FATAL("pthread_mutex_init() error."); } peer->pid = getpid(); return peer;}/** The local peer. */static Peer* localPeer;/** Frees a packet of bytes. */static void outgoingPacketFreeBytes(OutgoingPacket* packet) { LOGD("Freeing outgoing packet."); bufferFree(packet->bytes); free(packet);}/** * Sends a packet of bytes to a remote peer. Returns 0 on success. * * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno * to EINVAL if pid is the same as the local pid. */int peerSendBytes(pid_t pid, const char* bytes, size_t size) { Peer* peer = localPeer; assert(peer != NULL); OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { errno = ENOMEM; return -1; } Buffer* copy = bufferCreate(size); if (copy == NULL) { free(packet); errno = ENOMEM; return -1; } // Copy data. memcpy(copy->data, bytes, size); copy->size = size; packet->bytes = copy; packet->header.type = BYTES; packet->header.size = size; packet->free = outgoingPacketFreeBytes; bufferPrepareForWrite(packet->bytes); peerLock(peer); PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); if (peerProxy == NULL) { // The peer is already dead or we couldn't alloc memory. Either way, // errno is set. peerUnlock(peer); packet->free(packet); return -1; } else { peerProxyEnqueueOutgoingPacket(peerProxy, packet); peerUnlock(peer); selectorWakeUp(peer->selector); return 0; }}/** Keeps track of how to free shared bytes. */typedef struct { void (*free)(void* context); void* context;} SharedBytesFreer;/** Frees shared bytes. */static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) { SharedBytesFreer* sharedBytesFreer = (SharedBytesFreer*) packet->context; sharedBytesFreer->free(sharedBytesFreer->context); free(sharedBytesFreer); free(packet);}/** * Sends a packet of bytes to a remote peer without copying the bytes. Calls * free() with context after the bytes have been sent. * * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno * to EINVAL if pid is the same as the local pid. */int peerSendSharedBytes(pid_t pid, char* bytes, size_t size, void (*free)(void* context), void* context) { Peer* peer = localPeer; assert(peer != NULL); OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); if (packet == NULL) { errno = ENOMEM; return -1; } Buffer* wrapper = bufferWrap(bytes, size, size); if (wrapper == NULL) { free(packet); errno = ENOMEM; return -1; } SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer)); if (sharedBytesFreer == NULL) { free(packet); free(wrapper); errno = ENOMEM; return -1; } sharedBytesFreer->free = free; sharedBytesFreer->context = context; packet->bytes = wrapper; packet->context = sharedBytesFreer; packet->header.type = BYTES; packet->header.size = size; packet->free = &outgoingPacketFreeSharedBytes; bufferPrepareForWrite(packet->bytes); peerLock(peer); PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); if (peerProxy == NULL) { // The peer is already dead or we couldn't alloc memory. Either way, // errno is set. peerUnlock(peer); packet->free(packet); return -1; } else { peerProxyEnqueueOutgoingPacket(peerProxy, packet); peerUnlock(peer); selectorWakeUp(peer->selector); return 0; }}/** * Starts the master peer. The master peer differs from other peers in that * it is responsible for connecting the other peers. You can only have one * master peer. * * Goes into an I/O loop and does not return. */void masterPeerInitialize(BytesListener* bytesListener, DeathListener* deathListener) { // Create and bind socket. int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0); if (listenerSocket == -1) { LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); } unlink(MASTER_PATH); int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(), sizeof(UnixAddress)); if (result == -1) { LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno)); } LOGD("Listener socket: %d", listenerSocket); // Queue up to 16 connections. result = listen(listenerSocket, 16); if (result != 0) { LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno)); } // Make socket non-blocking. setNonBlocking(listenerSocket); // Create the peer for this process. Fail if we already have one. if (localPeer != NULL) { LOG_ALWAYS_FATAL("Peer is already initialized."); } localPeer = peerCreate(); if (localPeer == NULL) { LOG_ALWAYS_FATAL("malloc() failed."); } localPeer->master = true; localPeer->onBytes = bytesListener; localPeer->onDeath = deathListener; // Make listener socket selectable. SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket); if (listenerFd == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } listenerFd->data = localPeer; listenerFd->onReadable = &masterAcceptConnection;}/** * Starts a local peer. * * Goes into an I/O loop and does not return. */void peerInitialize(BytesListener* bytesListener, DeathListener* deathListener) { // Connect to master peer. int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0); if (masterSocket == -1) { LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); } int result = connect(masterSocket, (SocketAddress*) getMasterAddress(), sizeof(UnixAddress)); if (result != 0) { LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno)); } // Create the peer for this process. Fail if we already have one. if (localPeer != NULL) { LOG_ALWAYS_FATAL("Peer is already initialized."); } localPeer = peerCreate(); if (localPeer == NULL) { LOG_ALWAYS_FATAL("malloc() failed."); } localPeer->onBytes = bytesListener; localPeer->onDeath = deathListener; // Make connection selectable. SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket); if (masterFd == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } // Create a peer proxy for the master peer. PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS); if (masterProxy == NULL) { LOG_ALWAYS_FATAL("malloc() error."); } peerProxySetFd(masterProxy, masterFd); masterProxy->master = true; localPeer->masterProxy = masterProxy;}/** Starts the master peer I/O loop. Doesn't return. */void peerLoop() { assert(localPeer != NULL); // Start selector. selectorLoop(localPeer->selector);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -