📄 ipcsocket.c
字号:
int diff; CHANAUDIT(ch); element = g_list_first(ch->send_queue->queue); if (element == NULL) { /* OOPS! - correct consistency problem */ ch->send_queue->current_qlen = 0; break; } msg = (struct IPC_MESSAGE *) (element->data); diff = 0; if (msg->msg_buf ){ diff = (char*)msg->msg_body - (char*)msg->msg_buf; } if ( diff < sizeof(struct SOCKET_MSG_HEAD) ){ /* either we don't have msg->msg_buf set * or we don't have enough bytes for socket head * we delete this message and creates * a new one and delete the old one */ newmsg= socket_message_new(ch, msg->msg_len); if (newmsg == NULL){ cl_log(LOG_ERR, "socket_resume_io_write: " "allocating memory for new ipc msg failed"); return IPC_FAIL; } memcpy(newmsg->msg_body, msg->msg_body, msg->msg_len); oldmsg = msg; msg = newmsg; } head = (struct SOCKET_MSG_HEAD*) msg->msg_buf; head->msg_len = msg->msg_len; head->magic = HEADMAGIC; if (ch->bytes_remaining == 0){ bytes_remaining = msg->msg_len + ch->msgpad; p = msg->msg_buf; }else { bytes_remaining = ch->bytes_remaining; p = ((char*)msg->msg_buf) + msg->msg_len + ch->msgpad - bytes_remaining; } sendrc = 0; do { CHANAUDIT(ch); sendrc = send(conn_info->s, p , bytes_remaining, (MSG_DONTWAIT|MSG_NOSIGNAL)); SocketIPCStats.last_send_rc = sendrc; SocketIPCStats.last_send_errno = errno; ++SocketIPCStats.send_count; if (sendrc <= 0){ break; }else { p = p + sendrc; bytes_remaining -= sendrc; } } while(bytes_remaining > 0 ); ch->bytes_remaining = bytes_remaining; if (sendrc < 0) { switch (errno) { case EAGAIN: retcode = IPC_OK; break; case EPIPE: socket_check_disc_pending(ch); retcode = IPC_BROKEN; break; default: cl_perror("socket_resume_io_write" ": send2 bad errno"); ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL; break; } break; }else{ int orig_qlen; CHECKFOO(3,ch, msg, SavedSentBody, "sent message") if (oldmsg){ if (msg->msg_done != NULL) { msg->msg_done(msg); } msg=oldmsg; } if(ch->bytes_remaining ==0){ ch->send_queue->queue = g_list_remove(ch->send_queue->queue, msg); if (msg->msg_done != NULL) { msg->msg_done(msg); } SocketIPCStats.nsent++; orig_qlen = ch->send_queue->current_qlen--; socket_check_flow_control(ch, orig_qlen, orig_qlen -1 ); (*nmsg)++; } } } CHANAUDIT(ch); if (retcode != IPC_OK) { return retcode; } return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io(struct IPC_CHANNEL *ch){ int rc1 = IPC_OK; int rc2 = IPC_OK; int nwmsg = 1; int nbytes_r = 1; gboolean OKonce = FALSE; CHANAUDIT(ch); if (!IPC_ISRCONN(ch)) { return IPC_BROKEN; } do { if (nbytes_r > 0){ rc1 = socket_resume_io_read(ch, &nbytes_r, FALSE); } if (nwmsg > 0){ nwmsg = 0; rc2 = socket_resume_io_write(ch, &nwmsg); } if (rc1 == IPC_OK || rc2 == IPC_OK) { OKonce = TRUE; } } while ((nbytes_r > 0 || nwmsg > 0) && IPC_ISRCONN(ch)); if (IPC_ISRCONN(ch)) { if (rc1 != IPC_OK) { cl_log(LOG_ERR , "socket_resume_io_read() failure"); } if (rc2 != IPC_OK) { cl_log(LOG_ERR , "socket_resume_io_write() failure"); } }else{ return (OKonce ? IPC_OK : IPC_BROKEN); } return (rc1 != IPC_OK ? rc1 : rc2);}static intsocket_get_recv_fd(struct IPC_CHANNEL *ch){ struct SOCKET_CH_PRIVATE* chp = ch ->ch_private; return (chp == NULL ? -1 : chp->s);}static intsocket_get_send_fd(struct IPC_CHANNEL *ch){ return socket_get_recv_fd(ch);}static intsocket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len){ /* This seems more like an assertion failure than a normal error */ if (ch->send_queue == NULL) { return IPC_FAIL; } ch->send_queue->max_qlen = q_len; return IPC_OK; }static intsocket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len){ /* This seems more like an assertion failure than a normal error */ if (ch->recv_queue == NULL) { return IPC_FAIL; } ch->recv_queue->max_qlen = q_len; return IPC_OK;}static voidsocket_del_ipcmsg(IPC_Message* m){ if (m == NULL){ cl_log(LOG_ERR, "socket_del_ipcmsg:" "msg is NULL"); return; } if (m->msg_body){ memset(m->msg_body, 0, m->msg_len); } if (m->msg_buf){ g_free(m->msg_buf); } memset(m, 0, sizeof(*m)); g_free(m); return;}static IPC_Message*socket_new_ipcmsg(IPC_Channel* ch, const void* data, int len, void* private){ IPC_Message* hdr; char* copy = NULL; char* buf; char* body; if (ch == NULL || len < 0){ cl_log(LOG_ERR, "socket_new_ipcmsg:" " invalid parameter"); return NULL; } if (ch->msgpad > MAX_MSGPAD){ cl_log(LOG_ERR, "socket_new_ipcmsg: too many pads " "something is wrong"); return NULL; } if ((hdr = (IPC_Message*)g_malloc(sizeof(*hdr))) == NULL) { return NULL; } memset(hdr, 0, sizeof(*hdr)); if (len > 0){ if ((copy = (char*)g_malloc(ch->msgpad + len)) == NULL) { g_free(hdr); return NULL; } if (data){ memcpy(copy + ch->msgpad, data, len); } buf = copy; body = copy + ch->msgpad;; }else { len = 0; buf = body = NULL; } hdr->msg_len = len; hdr->msg_buf = buf; hdr->msg_body = body; hdr->msg_ch = ch; hdr->msg_done = socket_del_ipcmsg; hdr->msg_private = private; return hdr;}/* socket object of the function table */static struct IPC_WAIT_OPS socket_wait_ops = { socket_destroy_wait_conn, socket_wait_selectfd, socket_accept_connection,};/* * create a new ipc queue whose length = 0 and inner queue = NULL. * return the pointer to a new ipc queue or NULL is the queue can't be created. */static struct IPC_QUEUE*socket_queue_new(void){ struct IPC_QUEUE *temp_queue; /* temp queue with length = 0 and inner queue = NULL. */ temp_queue = g_new(struct IPC_QUEUE, 1); temp_queue->current_qlen = 0; temp_queue->max_qlen = DEFAULT_MAX_QLEN; temp_queue->queue = NULL; return temp_queue;}/* * destory a ipc queue and clean all memory space assigned to this queue. * parameters: * q (IN) the pointer to the queue which should be destroied. * * FIXME: This function does not free up messages that might * be in the queue. */ voidsocket_destroy_queue(struct IPC_QUEUE * q){ g_list_free(q->queue); g_free((void *) q);}/* * socket_wait_conn_new: * Called by ipc_wait_conn_constructor to get a new socket * waiting connection. * (better explanation of this role might be nice) * * Parameters : * ch_attrs (IN) the attributes used to create this connection. * * Return : * the pointer to the new waiting connection or NULL if the connection * can't be created. * * NOTE : * for domain socket implementation, the only attribute needed is path name. * so the user should * create the hash table like this: * GHashTable * attrs; * attrs = g_hash_table_new(g_str_hash, g_str_equal); * g_hash_table_insert(attrs, PATH_ATTR, path_name); * here PATH_ATTR is defined as "path". */struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable *ch_attrs){ struct IPC_WAIT_CONNECTION * temp_ch; char *path_name; char *mode_attr; struct sockaddr_un my_addr; int s, flags; struct SOCKET_WAIT_CONN_PRIVATE *wait_private; mode_t s_mode; path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR); mode_attr = (char *) g_hash_table_lookup(ch_attrs, IPC_MODE_ATTR); if (mode_attr != NULL) { s_mode = (mode_t)strtoul((const char *)mode_attr, NULL, 8); }else{ s_mode = 0777; } if (path_name == NULL) { return NULL; } /* prepare the unix domain socket */ if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) { cl_perror("socket_wait_conn_new: socket() failure"); return NULL; } if (unlink(path_name) < 0 && errno != ENOENT) { cl_perror("socket_wait_conn_new: unlink failure"); } memset(&my_addr, 0, sizeof(my_addr)); my_addr.sun_family = AF_LOCAL; /* host byte order */ if (strlen(path_name) >= sizeof(my_addr.sun_path)) { close(s); return NULL; } strncpy(my_addr.sun_path, path_name, sizeof(my_addr.sun_path)); if (bind(s, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) { cl_perror("socket_wait_conn_new: trying to create in %s bind:" , path_name); close(s); return NULL; } /* Change the permission of the socket */ if (chmod(path_name,s_mode) < 0){ cl_perror("socket_wait_conn_new: failure trying to chmod %s" , path_name); close(s); return NULL; } /* listen to the socket */ if (listen(s, MAX_LISTEN_NUM) == -1) { cl_perror("socket_wait_conn_new: listen(MAX_LISTEN_NUM)"); close(s); return NULL; } flags = fcntl(s, F_GETFL, O_NONBLOCK); if (flags == -1) { cl_perror("socket_wait_conn_new: cannot read file descriptor flags"); close(s); return NULL; } flags |= O_NONBLOCK; if (fcntl(s, F_SETFL, flags) < 0) { cl_perror("socket_wait_conn_new: cannot set O_NONBLOCK"); close(s); return NULL; } wait_private = g_new(struct SOCKET_WAIT_CONN_PRIVATE, 1); wait_private->s = s; strncpy(wait_private->path_name, path_name, sizeof(wait_private->path_name)); temp_ch = g_new(struct IPC_WAIT_CONNECTION, 1); temp_ch->ch_private = (void *) wait_private; temp_ch->ch_status = IPC_WAIT; temp_ch->ops = (struct IPC_WAIT_OPS *)&socket_wait_ops; return temp_ch;}/* * will be called by ipc_channel_constructor to create a new socket channel. * parameters : * attrs (IN) the hash table of the attributes used to create this channel. * * return: * the pointer to the new waiting channel or NULL if the channel can't be created.*/struct IPC_CHANNEL * socket_client_channel_new(GHashTable *ch_attrs) { struct IPC_CHANNEL * temp_ch; struct SOCKET_CH_PRIVATE* conn_info; char *path_name; int sockfd, flags;#ifdef USE_BINDSTAT_CREDS char rand_id[16]; char uuid_str_tmp[40]; struct sockaddr_un sock_addr;#endif /* * I don't really understand why the client and the server use different * parameter names... * * It's a really bad idea to store both integers and strings * in the same table. * * Maybe we need an internal function with a different set of parameters? */ /* * if we want to seperate them. I suggest * <client side> * user call ipc_channel_constructor(ch_type,attrs) to create a new channel. * ipc_channel_constructor() call socket_channel_new(GHashTable*)to * create a new socket channel. * <server side> * wait_conn->accept_connection() will call another function to create a * new channel. This function will take socketfd as the parameter to * create a socket channel. */ path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR); if (path_name != NULL) { if (strlen(path_name) >= sizeof(conn_info->path_name)) { return NULL; } /* prepare the socket */ if ((sockfd = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) { cl_perror("socket_client_channel_new: socket"); return NULL; } }else{ return NULL; } temp_ch = g_new(struct IPC_CHANNEL, 1); if (temp_ch == NULL){ cl_log(LOG_ERR, "socket_server_channel_new:" " allocating memory for channel failed"); return NULL; } memset(temp_ch, 0, sizeof(struct IPC_CHANNEL)); conn_info = g_new(struct SOCKET_CH_PRIVATE, 1); conn_info->peer_addr = NULL; #ifdef USE_BINDSTAT_CREDS /* Prepare the socket */ memset(&sock_addr, 0, sizeof(sock_addr)); sock_addr.sun_family = AF_UNIX; /* make sure socket paths never clash */ uuid_generate(rand_id); uuid_unparse(rand_id, uuid_str_tmp); snprintf(sock_addr.sun_path, sizeof(sock_addr.sun_path), "%s/%s/%s", HA_VARLIBDIR, PACKAGE, uuid_str_tmp); unlink(sock_addr.sun_path); if(bind(sockfd, (struct sockaddr*)&sock_addr, SUN_LEN(&sock_addr)) < 0) { perror("Client bind() failure"); close(sockfd); g_free(conn_info); conn_info = NULL; g_free(temp_ch); return NULL; }#endif flags = fcntl(sockfd, F_GETFL, O_NONBLOCK); if (flags == -1) { cl_perror("socket_client_channel_new: cannot read file descriptor flags"); g_free(conn_info); conn_info = NULL; g_free(temp_ch); close(sockfd); return NULL; } flags |= O_NONBLOCK; if (fcntl(sockfd, F_SETFL, flags) < 0) { cl_perror("socket_client_channel_new: cannot set O_NONBLOCK"); close(sockfd); g_free(conn_info); g_free(temp_ch); return NULL; } conn_info->s = sockfd; conn_info->remaining_data = 0; conn_info->buf_msg = NULL; strncpy(conn_info->path_name, path_name, sizeof(conn_info->path_name)); temp_ch->ch_status = IPC_DISCONNECT;#ifdef DEBUG cl_log(LOG_INFO, "Initializing client socket %d to DISCONNECT", sockfd);#endif temp_ch->ch_private = (void*) conn_info; temp_ch->ops = (struct IPC_OPS *)&socket_ops; temp_ch->msgpad = sizeof(struct SOCKET_MSG_HEAD);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -