📄 jump_os_linux.c
字号:
{ struct jump_message_queue *jmq; mutex_lock(&queue_list_mutex); jmq = get_message_queue(messageType); if (jmq != NULL) { jmq->useCount++; *code = JUMP_MQ_SUCCESS; } else { jmq = message_queue_create(jumpProcessGetId(), messageType, code, 1); if (jmq != NULL) { jmq->useCount = 1; put_message_queue(jmq); } } mutex_unlock(&queue_list_mutex);}intjumpMessageQueueDestroy(JUMPPlatformCString messageType) { struct jump_message_queue *jmq; int ret; mutex_lock(&queue_list_mutex); jmq = get_message_queue(messageType); if (jmq == NULL) { ret = -1; } else { ret = decrement_usecount_maybe_free(jmq); } mutex_unlock(&queue_list_mutex); return ret;}JUMPMessageQueueHandle jumpMessageQueueOpen(int processId, JUMPPlatformCString type, JUMPMessageQueueStatusCode* code){ return message_queue_create(processId, type, code, 0);}void jumpMessageQueueClose(JUMPMessageQueueHandle handle) { struct jump_message_queue *jmq = (struct jump_message_queue *) handle; message_queue_destroy(jmq);}int jumpMessageQueueDataOffset(void) { return 0;}int jumpMessageQueueSend(JUMPMessageQueueHandle handle, char *buffer, int messageDataSize, JUMPMessageQueueStatusCode* code){ struct jump_message_queue *jmq = (struct jump_message_queue *) handle; struct iovec iovec[2]; if (messageDataSize < 0) { *code = JUMP_MQ_BAD_MESSAGE_SIZE; return -1; } /* Fail if the message is too large to be written atomically, i.e., without being interleaved with writes from other processes. */ if (messageDataSize > JUMP_MESSAGE_QUEUE_MAX_MESSAGE_SIZE) { *code = JUMP_MQ_BAD_MESSAGE_SIZE; return -1; } /* Prepare to write the length followed by the message. */ iovec[0].iov_base = &messageDataSize; iovec[0].iov_len = sizeof(messageDataSize); iovec[1].iov_base = buffer; iovec[1].iov_len = messageDataSize; /* This write is non-blocking. If it would block, we return JUMP_MQ_WOULD_BLOCK. It's atomic, so there are no issues with partial writes. */ while (1) { ssize_t status = writev(jmq->fd, iovec, 2); if (status != -1) { /* All data written successfully. */ *code = JUMP_MQ_SUCCESS; return 0; } if (errno == EINTR) { /* Interrupted before data was written, retry. */ continue; } /* write failed or would block. */ if (errno == EAGAIN) { *code = JUMP_MQ_WOULD_BLOCK; } else { *code = JUMP_MQ_FAILURE; } return -1; }}intjumpMessageQueueWaitForMessage(JUMPPlatformCString type, int32 timeout_millis, JUMPMessageQueueStatusCode* code){ struct timeval deadline; struct jump_message_queue *jmq; int ret; /* If we're using a timeout, calculate deadline = absolute timeout time. */ if (timeout_millis != 0) { int timeout_usec = (timeout_millis % 1000) * 1000; int timeout_sec = timeout_millis / 1000; if (gettimeofday(&deadline, NULL) == -1) { *code = JUMP_MQ_FAILURE; return -1; } deadline.tv_usec += timeout_usec; if (deadline.tv_usec >= 1000000) { deadline.tv_usec -= 1000000; deadline.tv_sec++; } deadline.tv_sec += timeout_sec; } jmq = lock_and_acquire_message_queue(type); if (jmq == NULL) { *code = JUMP_MQ_NO_SUCH_QUEUE; return -1; } /* NOTE: for JUMP_MQ_THREADSAFE we may read a stale false value for jmq->error since we're not locking jmq->read_mutex, but that won't hurt anything since we're not actually reading. */ if (jmq->error) { *code = JUMP_MQ_FAILURE; goto fail; } /* Wait for the fd to become readable, or for timeout. */ while (1) { fd_set readfds; struct timeval timeout; int status; FD_ZERO(&readfds); FD_SET(jmq->fd, &readfds); if (timeout_millis != 0) { /* Calculate the timeout time = deadline - current time. */ if (gettimeofday(&timeout, NULL) == -1) { *code = JUMP_MQ_FAILURE; goto fail; } timeout.tv_usec = deadline.tv_usec - timeout.tv_usec; if (timeout.tv_usec < 0) { timeout.tv_usec += 1000000; timeout.tv_sec++; } timeout.tv_sec = deadline.tv_sec - timeout.tv_sec; if (timeout.tv_sec < 0) { /* Timed out. */ *code = JUMP_MQ_TIMEOUT; goto fail; } } status = select(jmq->fd + 1, &readfds, NULL, NULL, (timeout_millis != 0) ? &timeout : NULL); switch (status) { case 1: /* Ready to read. */ *code = JUMP_MQ_SUCCESS; goto succeed; case 0: /* Timed out. */ *code = JUMP_MQ_TIMEOUT; goto fail; case -1: if (errno == EINTR) { /* Try again. */ continue; } break; default: break; } /* Some error. */ *code = JUMP_MQ_FAILURE; goto fail; } fail: ret = -1; goto out; succeed: ret = 0; out: lock_and_release_message_queue(jmq); return ret;}/* Returns 1 for success, 0 for would block, -1 for error. */static intread_fully(int fd, void *buf, size_t count){ char *cbuf = buf; int first = 1; while (count != 0) { ssize_t status = read(fd, cbuf, count); if (status > 0) { cbuf += status; count -= status; first = 0; continue; } if (status == -1) { if (errno == EINTR) { /* Interrupted, try again. */ continue; } if (errno == EAGAIN) { /* Read would block. */ if (first) { return 0; } } } /* read failed, or would block after a partial read. */ return -1; } return 1;}int jumpMessageQueueReceive(JUMPPlatformCString type, char *buffer, int bufferLength, JUMPMessageQueueStatusCode* code){ struct jump_message_queue *jmq; int messageDataSize; ssize_t status; int ret; jmq = lock_and_acquire_message_queue(type); if (jmq == NULL) { *code = JUMP_MQ_NO_SUCH_QUEUE; return -1; } /* Use the read_mutex to ensure only one thread is reading, so we can do separate reads for the length and the message, and partial reads if necessary. */ mutex_lock(&jmq->read_mutex); /* If this jump_message_queue failed earlier, just fail again. */ if (jmq->error) { goto unrecoverable_error; } /* Read the message size, and fail on error or if the read would block. */ status = read_fully(jmq->fd, &messageDataSize, sizeof(messageDataSize)); if (status == -1) { goto unrecoverable_error; } else if (status == 0) { /* read_fully would block, so there is no message after all. The jump_message_queue is ok, just fail. */ *code = JUMP_MQ_WOULD_BLOCK; goto recoverable_error; } /* Check for "unblock" message. */ if (messageDataSize == -1) { *code = JUMP_MQ_UNBLOCKED; goto recoverable_error; } /* Sanity check. */ if (messageDataSize < 0 || messageDataSize > PIPE_BUF - sizeof(messageDataSize)) { goto unrecoverable_error; } if (messageDataSize > bufferLength) { /* The buffer is not big enough. About all we can do is attempt to recover by reading the message in small pieces and discarding it. */ char buf[128]; while (messageDataSize > 0) { int size = messageDataSize; if (size > sizeof(buf)) { size = sizeof(buf); } status = read_fully(jmq->fd, buf, size); if (status != 1) { goto unrecoverable_error; } messageDataSize -= size; } *code = JUMP_MQ_BUFFER_SMALL; goto recoverable_error; } /* Read the message. */ status = read_fully(jmq->fd, buffer, messageDataSize); if (status != 1) { goto unrecoverable_error; } ret = messageDataSize; *code = JUMP_MQ_SUCCESS; goto out; unrecoverable_error: jmq->error = 1; *code = JUMP_MQ_FAILURE; recoverable_error: ret = -1; out: mutex_unlock(&jmq->read_mutex); lock_and_release_message_queue(jmq); return ret;}voidjumpMessageQueueUnblock(JUMPPlatformCString messageType, JUMPMessageQueueStatusCode* code){ /* The reader is unblocked by sending it a message of length -1, which it checks for. */ struct jump_message_queue *jmq; /* Use the read queue's fd. This avoids out of memory problems which could happen if we tried to create a new write queue and use its fd. */ jmq = lock_and_acquire_message_queue(messageType); if (jmq == NULL) { *code = JUMP_MQ_NO_SUCH_QUEUE; return; } while (1) { int length = -1; /* This is a non-blocking atomic write. */ ssize_t status = write(jmq->fd, &length, sizeof(length)); if (status != -1) { *code = JUMP_MQ_SUCCESS; break; } if (errno == EINTR) { /* Interrupted before data was written, retry. */ continue; } /* If write would block consider it a success, since it means the reader has something to read and will unblock. */ if (errno == EAGAIN) { *code = JUMP_MQ_SUCCESS; break; } /* write failed. */ *code = JUMP_MQ_FAILURE; break; } lock_and_release_message_queue(jmq);}intjumpMessageQueueGetFd(JUMPPlatformCString messageType){ struct jump_message_queue *jmq; int fd; jmq = lock_and_acquire_message_queue(messageType); if (jmq == NULL) { fd = -1; } else { fd = jmq->fd; } lock_and_release_message_queue(jmq); return fd;}/* * Destroy all message queues created by this process, regardless of * useCount. */voidjumpMessageQueueInterfaceDestroy(void){ struct jump_message_queue *p; mutex_lock(&queue_list_mutex); p = queue_list.next; while (p != &queue_list) { struct jump_message_queue *pnext = p->next; remove_message_queue(p); message_queue_destroy(p); p = pnext; } mutex_unlock(&queue_list_mutex);}voidjumpMessageQueueCleanQueuesOf(int cpid){ char prefix[sizeof(JUMP_MQ_PATH_PREFIX) + 20]; int prefixLen; DIR* dir; struct dirent* ptr; if (jumpProcessIsAlive(cpid) == 0) { return; } prefixLen = snprintf(prefix, sizeof(prefix), JUMP_MQ_PATH_PREFIX, cpid); dir = opendir(JUMP_MQ_PATH_DIR); if (dir == NULL) { perror("opendir"); return; } while ((ptr = readdir(dir)) != NULL) { if (strncmp(ptr->d_name, prefix, prefixLen) == 0) { char filename[sizeof(JUMP_MQ_PATH_DIR) + 1 + NAME_MAX + 1]; int len; len = snprintf(filename, sizeof(filename), JUMP_MQ_PATH_DIR "/%s", ptr->d_name); if (len < 0 || len >= sizeof(filename)) { continue; } printf("Exiting process %d has message queue %s\n", cpid, filename); if (unlink(filename) == -1) { perror("mq file delete"); } else { printf(" removed file %s\n", filename); } } } closedir(dir);}/* * The thread porting layer */intjumpThreadGetId(void){ return (int)pthread_self();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -