📄 ipcsocket.c
字号:
( wait_conn->ch_private); ch_private = (struct SOCKET_CH_PRIVATE *)(ch->ch_private); strncpy(ch_private->path_name,conn_private->path_name , sizeof(conn_private->path_name)); } } /* Verify the client authorization information. */ if (ch->ops->verify_auth(ch, auth_info) == IPC_OK) { ch->ch_status = IPC_CONNECT; ch->farside_pid = socket_get_farside_pid(new_sock); return ch; } return NULL;}static voidsocket_destroy_channel(struct IPC_CHANNEL * ch){ socket_disconnect(ch); socket_destroy_queue(ch->send_queue); socket_destroy_queue(ch->recv_queue); if (ch->ch_private != NULL) { g_free((void*)(ch->ch_private)); } memset(ch, 0xff, sizeof(*ch)); g_free((void*)ch);}/* * Called by socket_destory(). Disconnect the connection * and set ch_status to IPC_DISCONNECT. * * parameters : * ch (IN) the pointer to the channel. * * return values : * IPC_OK the connection is disconnected successfully. * IPC_FAIL operation fails.*/static intsocket_disconnect(struct IPC_CHANNEL* ch){ struct SOCKET_CH_PRIVATE* conn_info; conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;#if 0 if (ch->ch_status != IPC_DISCONNECT) { cl_log(LOG_INFO, "forced disconnect for fd %d", conn_info->s); }#endif close(conn_info->s); cl_poll_ignore(conn_info->s); conn_info->s = -1; ch->ch_status = IPC_DISCONNECT; return IPC_OK;}static intsocket_check_disc_pending(struct IPC_CHANNEL* ch){ int rc; struct pollfd sockpoll; if (ch->ch_status == IPC_DISCONNECT) { cl_log(LOG_ERR, "check_disc_pending() already disconnected"); return IPC_BROKEN; } if (ch->recv_queue->current_qlen > 0) { return IPC_OK; } sockpoll.fd = ch->ops->get_recv_select_fd(ch); sockpoll.events = POLLIN; rc = ipc_pollfunc_ptr(&sockpoll, 1, 0); if (rc < 0) { cl_log(LOG_INFO , "socket_check_disc_pending() bad poll call"); ch->ch_status = IPC_DISCONNECT; return IPC_BROKEN; } if (sockpoll.revents & POLLHUP) { if (sockpoll.revents & POLLIN) { ch->ch_status = IPC_DISC_PENDING; }else{#if 0 cl_log(LOG_INFO, "HUP without input");#endif ch->ch_status = IPC_DISCONNECT; return IPC_BROKEN; } } if (sockpoll.revents & POLLIN) { int dummy; socket_resume_io_read(ch, &dummy, FALSE); } return IPC_OK;}static int socket_initiate_connection(struct IPC_CHANNEL * ch){ struct SOCKET_CH_PRIVATE* conn_info; struct sockaddr_un peer_addr; /* connector's address information */ conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private; /* Prepare the socket */ memset(&peer_addr, 0, sizeof(peer_addr)); peer_addr.sun_family = AF_LOCAL; /* host byte order */ if (strlen(conn_info->path_name) >= sizeof(peer_addr.sun_path)) { return IPC_FAIL; } strncpy(peer_addr.sun_path, conn_info->path_name, sizeof(peer_addr.sun_path)); /* Send connection request */ if (connect(conn_info->s, (struct sockaddr *)&peer_addr , sizeof(struct sockaddr_un)) == -1) { cl_perror("initiate_connection: connect failure"); return IPC_FAIL; } ch->ch_status = IPC_CONNECT; ch->farside_pid = socket_get_farside_pid(conn_info->s); return IPC_OK;}static int socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg){ if (msg->msg_len < 0 || msg->msg_len > MAXDATASIZE) { return IPC_FAIL; } if (ch->ch_status != IPC_CONNECT){ return IPC_FAIL; } if ( !ch->is_send_blocking && ch->send_queue->current_qlen >= ch->send_queue->max_qlen) { cl_log(LOG_ERR, "send queue maximum length(%d) exceeded", ch->send_queue->max_qlen ); return IPC_FAIL; } do{ ch->ops->resume_io(ch); } while (ch->send_queue->current_qlen >= ch->send_queue->max_qlen); /* add the message into the send queue */ CHECKFOO(0,ch, msg, SavedQueuedBody, "queued message"); SocketIPCStats.noutqueued++; ch->send_queue->queue = g_list_append(ch->send_queue->queue, msg); ch->send_queue->current_qlen++; /* resume io */ ch->ops->resume_io(ch); return IPC_OK; }static int socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message){ GList *element; gboolean started; int result; (void)socket_resume_io(ch); result = socket_resume_io_read(ch, &started, TRUE); *message = NULL; if (ch->recv_queue->current_qlen == 0) { return result != IPC_OK ? result : IPC_FAIL; } element = g_list_first(ch->recv_queue->queue); if (element == NULL) { /* Internal accounting error, but correctable */ cl_log(LOG_ERR , "recv failure: qlen (%d) > 0, but no message found." , ch->recv_queue->current_qlen); ch->recv_queue->current_qlen = 0; return IPC_FAIL; } *message = (struct IPC_MESSAGE *) (element->data); CHECKFOO(1,ch, *message, SavedReadBody, "read message"); SocketIPCStats.nreceived++; ch->recv_queue->queue = g_list_remove(ch->recv_queue->queue , element->data); ch->recv_queue->current_qlen--; return IPC_OK;}static intsocket_check_poll(struct IPC_CHANNEL * ch, struct pollfd * sockpoll){ if (ch->ch_status == IPC_DISCONNECT) { return IPC_OK; } if (sockpoll->revents & POLLHUP) { /* If input present, or this is an output-only poll... */ if (sockpoll->revents & POLLIN || (sockpoll-> events & POLLIN) == 0 ) { ch->ch_status = IPC_DISC_PENDING; return IPC_OK; }#if 0 cl_log(LOG_INFO, "socket_check_poll(): HUP without input");#endif ch->ch_status = IPC_DISCONNECT; return IPC_BROKEN; }else if (sockpoll->revents & (POLLNVAL|POLLERR)) { /* Have we already closed the socket? */ if (fcntl(sockpoll->fd, F_GETFL) < 0) { cl_perror("socket_check_poll(pid %d): bad fd [%d]" , (int) getpid(), sockpoll->fd); ch->ch_status = IPC_DISCONNECT; return IPC_OK; } cl_log(LOG_ERR , "revents failure: fd %d, flags 0x%x" , sockpoll->fd, sockpoll->revents); errno = EINVAL; return IPC_FAIL; } return IPC_OK;}static intsocket_waitfor(struct IPC_CHANNEL * ch, gboolean (*finished)(struct IPC_CHANNEL * ch)){ struct pollfd sockpoll; CHANAUDIT(ch); if (finished(ch)) { return IPC_OK; } if (ch->ch_status == IPC_DISCONNECT) { return IPC_BROKEN; } sockpoll.fd = ch->ops->get_recv_select_fd(ch); while (!finished(ch) && IPC_ISRCONN(ch)) { int rc; sockpoll.events = POLLIN; /* Cannot call resume_io after the call to finished() * and before the call to poll because we might * change the state of the thing finished() is * waiting for. * This means that the poll call below would be * not only pointless, but might * make us hang forever waiting for this * event which has already happened */ if (ch->send_queue->current_qlen > 0) { sockpoll.events |= POLLOUT; } rc = ipc_pollfunc_ptr(&sockpoll, 1, -1); if (rc < 0) { return (errno == EINTR ? IPC_INTR : IPC_FAIL); } rc = socket_check_poll(ch, &sockpoll); if (sockpoll.revents & POLLIN) { socket_resume_io(ch); } if (rc != IPC_OK) { CHANAUDIT(ch); return rc; } } CHANAUDIT(ch); return IPC_OK;}static intsocket_waitin(struct IPC_CHANNEL * ch){ return socket_waitfor(ch, ch->ops->is_message_pending);}static gbooleansocket_is_output_flushed(struct IPC_CHANNEL * ch){ return ! ch->ops->is_sending_blocked(ch);}static intsocket_waitout(struct IPC_CHANNEL * ch){ int rc; CHANAUDIT(ch); rc = socket_waitfor(ch, socket_is_output_flushed); if (rc != IPC_OK) { cl_log(LOG_ERR , "socket_waitout failure: rc = %d", rc); }else if (ch->ops->is_sending_blocked(ch)) { cl_log(LOG_ERR, "socket_waitout output still blocked"); } CHANAUDIT(ch); return rc;}static gbooleansocket_is_message_pending(struct IPC_CHANNEL * ch){ ch->ops->resume_io(ch); if (ch->recv_queue->current_qlen > 0) { return TRUE; } return !IPC_ISRCONN(ch);}static gbooleansocket_is_output_pending(struct IPC_CHANNEL * ch){ socket_resume_io(ch); return ch->ch_status == IPC_CONNECT && ch->send_queue->current_qlen > 0;}static int socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth){ cl_log(LOG_ERR , "the assert_auth function for domain socket is not implemented"); return IPC_FAIL;}static intsocket_resume_io_read(struct IPC_CHANNEL *ch, gboolean* started, gboolean read1anyway){ struct SOCKET_CH_PRIVATE* conn_info; int retcode = IPC_OK; struct pollfd sockpoll; int debug_loopcount = 0; int debug_bytecount = 0; int maxqlen = ch->recv_queue->max_qlen; CHANAUDIT(ch); conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private; *started = FALSE; if (maxqlen <= 0 && read1anyway) { maxqlen = 1; } while (ch->recv_queue->current_qlen < maxqlen && retcode == IPC_OK) { gboolean new_msg; void * msg_begin; int msg_len; struct SOCKET_MSG_HEAD head; int len; CHANAUDIT(ch); ++debug_loopcount; new_msg = (conn_info->remaining_data == 0); if (new_msg) { len = sizeof(struct SOCKET_MSG_HEAD); msg_begin = &head; }else{ struct IPC_MESSAGE * msg = conn_info->buf_msg; len = conn_info->remaining_data; msg_begin = ((char*)msg->msg_body) + (msg->msg_len - len); } if (len <= 0 || len > MAXDATASIZE) { cl_log(LOG_ERR , "socket_resume_io_read()" ": bad packet length [%d]", len); ch->ch_status = IPC_DISCONNECT; retcode = IPC_BROKEN; break; } CHANAUDIT(ch); /* Now try to receive some data */ msg_len = recv(conn_info->s, msg_begin, len, MSG_DONTWAIT); SocketIPCStats.last_recv_rc = msg_len; SocketIPCStats.last_recv_errno = errno; ++SocketIPCStats.recv_count;#ifdef DEBUG cl_perror("recv() => %d, errno = %d loopcount = %d, %s" , msg_len, errno, debug_loopcount , (new_msg ? "msg head": "msg body"));#endif CHANAUDIT(ch); /* Did we get an error? */ if (msg_len < 0) { /* What kind of error did we get? */ switch (errno) { case EAGAIN: if (ch->ch_status==IPC_DISC_PENDING){ ch->ch_status =IPC_DISCONNECT; retcode = IPC_BROKEN; } break; case ECONNREFUSED: case ECONNRESET: retcode = socket_check_disc_pending(ch); break; default: cl_perror("socket_resume_io_read" ": unknown recv error"); ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL; break; } break; /* out of loop */ } if (msg_len == 0) { if (ch->ch_status == IPC_DISC_PENDING && ch->recv_queue->current_qlen <= 0) { ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL; } break; } /* How about that! We read something! */ /* Note that all previous cases break out of the loop */ debug_bytecount += msg_len; *started=TRUE;#if 0 cl_log(LOG_DEBUG, "Got %d byte message", msg_len); cl_log(LOG_DEBUG, "Contents: %s", (char*)msg_begin);#endif /* Is this data for the start of a new message? */ if (new_msg){ /* We assume we read 'len' bytes */ if (head.msg_len <= 0 || head.msg_len > MAXDATASIZE) { cl_log(LOG_CRIT , "socket_resume_io_read()" ": Invalid msg len [%d]" , head.msg_len); ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -