📄 ipcsocket.c
字号:
break; } conn_info->buf_msg = socket_message_new(ch, head.msg_len); conn_info->remaining_data = head.msg_len; /* Next time we'll read the message body */ continue; } /* No, not the start of a new message. Therefore we */ /* must have received (more) data from an old message */ conn_info->remaining_data = conn_info->remaining_data - msg_len; if (conn_info->remaining_data < 0){ cl_log(LOG_CRIT , "received more data than expected"); conn_info->remaining_data = 0; retcode = IPC_FAIL; }else if (conn_info->remaining_data == 0){#if 0 cl_log(LOG_DEBUG, "channel: 0x%lx" , (unsigned long)ch); cl_log(LOG_DEBUG, "New recv_queue = 0x%lx" , (unsigned long)ch->recv_queue); cl_log(LOG_DEBUG, "buf_msg: len = %ld, body = 0x%lx" , (unsigned long)conn_info->buf_msg->msg_len , (unsigned long)conn_info->buf_msg->msg_body); cl_log(LOG_DEBUG, "buf_msg: contents: %s" , (char *)conn_info->buf_msg->msg_body);#endif /* Got the last of the message! */ /* Append gotten message to receive queue */ CHECKFOO(2,ch, conn_info->buf_msg, SavedReceivedBody , "received message"); ch->recv_queue->queue = g_list_append ( ch->recv_queue->queue, conn_info->buf_msg); ch->recv_queue->current_qlen++; SocketIPCStats.ninqueued++; conn_info->buf_msg = NULL; } } /* Check for errors uncaught by recv() */ /* NOTE: It doesn't seem right we have to do this every time */ /* FIXME?? */ if ((retcode == IPC_OK) && (sockpoll.fd = conn_info->s) >= 0) { /* Just check for errors, not for data */ sockpoll.events = 0; ipc_pollfunc_ptr(&sockpoll, 1, 0); retcode = socket_check_poll(ch, &sockpoll); } CHANAUDIT(ch); if (retcode != IPC_OK) { return retcode; } return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io_write(struct IPC_CHANNEL *ch, int* nmsg){ int retcode = IPC_OK; struct SOCKET_CH_PRIVATE* conn_info; CHANAUDIT(ch); conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private; *nmsg = 0; while (ch->ch_status == IPC_CONNECT && retcode == IPC_OK && ch->send_queue->current_qlen > 0) { GList * element; struct IPC_MESSAGE * msg; struct SOCKET_MSG_HEAD head; int sendrc = 0; char* p; int len; int head_len = sizeof(struct SOCKET_MSG_HEAD); int bytes_send; int i; CHANAUDIT(ch); element = g_list_first(ch->send_queue->queue); if (element == NULL) { /* OOPS! - correct consistency problem */ ch->send_queue->current_qlen = 0; break; } msg = (struct IPC_MESSAGE *) (element->data); head.msg_len = msg->msg_len; if (ch->bytes_remaining == 0){ /*start to send a new message*/ bytes_send = 0; }else { bytes_send = head.msg_len + head_len - ch->bytes_remaining; } if (bytes_send < 0){ cl_log(LOG_ERR, "socket_resume_io_write: wrong length" "head.msg_len =%d, head_len =%d, ch->bytes_remaining=%d", head.msg_len, head_len, ch->bytes_remaining); return IPC_FAIL; } for ( i = 0 ; i < 2; i++){ switch (i){ case 0: /* for sending out head*/ if (bytes_send < head_len ){ /*we need send head or partial head*/ p= ((char*)&head) + bytes_send; len = head_len - bytes_send; }else{ /*head is already sent out*/ continue; } break; default: /* for sending out the message body*/ p = (char*)msg->msg_body + bytes_send - head_len; len = msg->msg_len + head_len - bytes_send; } do{ sendrc = send(conn_info->s, p, len, (MSG_DONTWAIT|MSG_NOSIGNAL)); SocketIPCStats.last_send_rc = sendrc; SocketIPCStats.last_send_errno = errno; ++SocketIPCStats.send_count; if (sendrc < 0) { switch (errno) { case EAGAIN: retcode = IPC_OK; break; case EPIPE: socket_check_disc_pending(ch); retcode = IPC_BROKEN; break; default: cl_perror("socket_resume_io_write: send1 bad errno"); ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL; break; } ch->bytes_remaining = msg->msg_len + head_len - bytes_send; goto out; } p += sendrc; len -= sendrc; bytes_send += sendrc; }while( len > 0); } if (bytes_send > head_len + msg->msg_len ){ cl_log(LOG_INFO, "socket_resme_io_write:" " sending out more bytes then the message has"); return IPC_FAIL; } if (bytes_send == head_len + msg->msg_len ){ ch->send_queue->queue = g_list_remove(ch->send_queue->queue, msg); if (msg->msg_done != NULL) { msg->msg_done(msg); } SocketIPCStats.nsent++; ch->send_queue->current_qlen--; (*nmsg)++; ch->bytes_remaining = 0; }else { ch->bytes_remaining = msg->msg_len + head_len - bytes_send; break; } } out: CHANAUDIT(ch); if (retcode != IPC_OK) { return retcode; } return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io(struct IPC_CHANNEL *ch){ int rc1 = IPC_OK; int rc2 = IPC_OK; gboolean rstarted; gboolean wstarted; gboolean OKonce = FALSE; int nwmsg = 1;#ifdef DEBUG int count = 0;#endif CHANAUDIT(ch); if (!IPC_ISRCONN(ch)) { return IPC_BROKEN; } do { rc1 = socket_resume_io_read(ch, &rstarted, FALSE); CHANAUDIT(ch); if (ch->ch_status == IPC_DISC_PENDING) { rc2 = IPC_OK; wstarted = FALSE; } if (nwmsg > 0){ nwmsg = 0; rc2 = socket_resume_io_write(ch, &nwmsg); } CHANAUDIT(ch); if (rc1 == IPC_OK || rc2 == IPC_OK) { OKonce = TRUE; }#ifdef DEBUG ++count; if (rc1 == IPC_OK && rc2 == IPC_OK && (rstarted||wstarted)) { cl_log(LOG_DEBUG , "continuing: rstarted = %d wstarted = %d count: %d" , rstarted, wstarted, count); }#endif } while ((rstarted||nwmsg > 0) && IPC_ISRCONN(ch)); if (IPC_ISRCONN(ch)) { if (rc1 != IPC_OK) { cl_log(LOG_ERR , "socket_resume_io_read() failure"); } if (rc2 != IPC_OK) { cl_log(LOG_ERR , "socket_resume_io_write() failure"); } }else{ return (OKonce ? IPC_OK : IPC_BROKEN); } return (rc1 != IPC_OK ? rc1 : rc2);}static intsocket_get_recv_fd(struct IPC_CHANNEL *ch){ struct SOCKET_CH_PRIVATE* chp = ch ->ch_private; return (chp == NULL ? -1 : chp->s);}static intsocket_get_send_fd(struct IPC_CHANNEL *ch){ return socket_get_recv_fd(ch);}static intsocket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len){ /* This seems more like an assertion failure than a normal error */ if (ch->send_queue == NULL) { return IPC_FAIL; } ch->send_queue->max_qlen = q_len; return IPC_OK; }static intsocket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len){ /* This seems more like an assertion failure than a normal error */ if (ch->recv_queue == NULL) { return IPC_FAIL; } ch->recv_queue->max_qlen = q_len; return IPC_OK;}/* socket object of the function table */static struct IPC_WAIT_OPS socket_wait_ops = { socket_destroy_wait_conn, socket_wait_selectfd, socket_accept_connection,};/* * create a new ipc queue whose length = 0 and inner queue = NULL. * return the pointer to a new ipc queue or NULL is the queue can't be created. */static struct IPC_QUEUE*socket_queue_new(void){ struct IPC_QUEUE *temp_queue; /* temp queue with length = 0 and inner queue = NULL. */ temp_queue = g_new(struct IPC_QUEUE, 1); temp_queue->current_qlen = 0; temp_queue->max_qlen = DEFAULT_MAX_QLEN; temp_queue->queue = NULL; return temp_queue;}/* * destory a ipc queue and clean all memory space assigned to this queue. * parameters: * q (IN) the pointer to the queue which should be destroied. * * FIXME: This function does not free up messages that might * be in the queue. */ voidsocket_destroy_queue(struct IPC_QUEUE * q){ g_list_free(q->queue); g_free((void *) q);}/* * socket_wait_conn_new: * Called by ipc_wait_conn_constructor to get a new socket * waiting connection. * (better explanation of this role might be nice) * * Parameters : * ch_attrs (IN) the attributes used to create this connection. * * Return : * the pointer to the new waiting connection or NULL if the connection * can't be created. * * NOTE : * for domain socket implementation, the only attribute needed is path name. * so the user should * create the hash table like this: * GHashTable * attrs; * attrs = g_hash_table_new(g_str_hash, g_str_equal); * g_hash_table_insert(attrs, PATH_ATTR, path_name); * here PATH_ATTR is defined as "path". */struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable *ch_attrs){ struct IPC_WAIT_CONNECTION * temp_ch; char *path_name; char *mode_attr; struct sockaddr_un my_addr; int s; struct SOCKET_WAIT_CONN_PRIVATE *wait_private; mode_t s_mode; path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR); mode_attr = (char *) g_hash_table_lookup(ch_attrs, IPC_MODE_ATTR); if (mode_attr != NULL) { s_mode = (mode_t)strtoul((const char *)mode_attr, NULL, 8); }else{ s_mode = 0777; } if (path_name == NULL) { return NULL; } /* prepare the unix domain socket */ if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) { cl_perror("socket_wait_conn_new: socket() failure"); return NULL; } if (unlink(path_name) < 0 && errno != ENOENT) { cl_perror("socket_wait_conn_new: unlink failure"); } memset(&my_addr, 0, sizeof(my_addr)); my_addr.sun_family = AF_LOCAL; /* host byte order */ if (strlen(path_name) >= sizeof(my_addr.sun_path)) { close(s); return NULL; } strncpy(my_addr.sun_path, path_name, sizeof(my_addr.sun_path)); if (bind(s, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) { cl_perror("socket_wait_conn_new: trying to create in %s bind:" , path_name); close(s); return NULL; } /* Change the permission of the socket */ if (chmod(path_name,s_mode) < 0){ cl_perror("socket_wait_conn_new: failure trying to chmod %s" , path_name); close(s); return NULL; } /* listen to the socket */ if (listen(s, MAX_LISTEN_NUM) == -1) { cl_perror("socket_wait_conn_new: listen(MAX_LISTEN_NUM)"); close(s); return NULL; } if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) { cl_perror("socket_wait_conn_new: cannot set O_NONBLOCK"); close(s); return NULL; } wait_private = g_new(struct SOCKET_WAIT_CONN_PRIVATE, 1); wait_private->s = s; strncpy(wait_private->path_name, path_name, sizeof(wait_private->path_name)); temp_ch = g_new(struct IPC_WAIT_CONNECTION, 1); temp_ch->ch_private = (void *) wait_private; temp_ch->ch_status = IPC_WAIT; temp_ch->ops = (struct IPC_WAIT_OPS *)&socket_wait_ops; return temp_ch;}/* * will be called by ipc_channel_constructor to create a new socket channel. * parameters : * attrs (IN) the hash table of the attributes used to create this channel. * * return: * the pointer to the new waiting channel or NULL if the channel can't be created.*/struct IPC_CHANNEL * socket_client_channel_new(GHashTable *ch_attrs) { struct IPC_CHANNEL * temp_ch; struct SOCKET_CH_PRIVATE* conn_info; char *path_name; int sockfd; /* * I don't really understand why the client and the server use different * parameter names... * * It's a really bad idea to store both integers and strings * in the same table. * * Maybe we need an internal function with a different set of parameters? */ /* * if we want to seperate them. I suggest
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -