📄 ex_rq_net.c
字号:
struct sockaddr_in si; int si_len; int s, ns; if ((proto = getprotobyname("tcp")) == NULL) return (-1); if ((s = socket(AF_INET, SOCK_STREAM, proto->p_proto)) < 0) return (-1); memset(&si, 0, sizeof(si)); si.sin_family = AF_INET; si.sin_addr.s_addr = htonl(INADDR_ANY); si.sin_port = htons(port); if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) goto err; if (listen(s, 5) != 0) goto err; memset(&si, 0, sizeof(si)); si_len = sizeof(si); ns = accept(s, (struct sockaddr *)&si, &si_len); return (ns);err: fprintf(stderr, "%s: %s", progname, strerror(errno)); close (s); return (-1);}/* * get_connected_socket -- * Connect to the specified port of the specified remote machine, * and return a file descriptor when we have accepted a connection on it. * Add this connection to the machtab. If we already have a connection * open to this machine, then don't create another one, return the eid * of the connection (in *eidp) and set is_open to 1. Return 0. */intget_connected_socket(machtab, progname, remotehost, port, is_open, eidp) machtab_t *machtab; const char *progname, *remotehost; int port, *is_open, *eidp;{ int ret, s; struct hostent *hp; struct protoent *proto; struct sockaddr_in si; u_int32_t addr; *is_open = 0; if ((proto = getprotobyname("tcp")) == NULL) return (-1); if ((hp = gethostbyname(remotehost)) == NULL) { fprintf(stderr, "%s: host not found: %s\n", progname, strerror(errno)); return (-1); } if ((s = socket(AF_INET, SOCK_STREAM, proto->p_proto)) < 0) return (-1); memset(&si, 0, sizeof(si)); memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length); addr = ntohl(si.sin_addr.s_addr); ret = machtab_add(machtab, s, addr, port, eidp); if (ret == EEXIST) { *is_open = 1; close(s); return (0); } else if (ret != 0) { close (s); return (-1); } si.sin_family = AF_INET; si.sin_port = htons(port); if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) { fprintf(stderr, "%s: connection failed: %s", progname, strerror(errno)); (void)machtab_rem(machtab, *eidp, 1); return (-1); } return (s);}/* * get_next_message -- * Read a single message from the specified file descriptor, and * return it in the format used by rep functions (two DBTs and a type). * * This function is called in a loop by both clients and masters, and * the resulting DBTs are manually dispatched to DB_ENV->rep_process_message(). */intget_next_message(fd, rec, control) int fd; DBT *rec, *control;{ size_t nr; u_int32_t rsize, csize; u_int8_t *recbuf, *controlbuf; /* * The protocol we use on the wire is dead simple: * * 4 bytes - rec->size * (# read above) - rec->data * 4 bytes - control->size * (# read above) - control->data */ /* Read rec->size. */ nr = readn(fd, &rsize, 4); if (nr != 4) return (1); /* Read the record itself. */ if (rsize > 0) { if (rec->size < rsize) rec->data = realloc(rec->data, rsize); recbuf = rec->data; nr = readn(fd, recbuf, rsize); } else { if (rec->data != NULL) free(rec->data); rec->data = NULL; } rec->size = rsize; /* Read control->size. */ nr = readn(fd, &csize, 4); if (nr != 4) return (1); /* Read the control struct itself. */ if (csize > 0) { controlbuf = control->data; if (control->size < csize) controlbuf = realloc(controlbuf, csize); nr = readn(fd, controlbuf, csize); if (nr != csize) return (1); } else { if (control->data != NULL) free(control->data); controlbuf = NULL; } control->data = controlbuf; control->size = csize; return (0);}/* * readn -- * Read a full n characters from a file descriptor, unless we get an error * or EOF. */ssize_treadn(fd, vptr, n) int fd; void *vptr; size_t n;{ size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { /* * Call read() again on interrupted system call; * on other errors, bail. */ if (errno == EINTR) nread = 0; else return (-1); } else if (nread == 0) break; /* EOF */ nleft -= nread; ptr += nread; } return (n - nleft);}/* * quote_send -- * The f_send function for DB_ENV->set_rep_transport. */intquote_send(dbenv, control, rec, eid, flags) DB_ENV *dbenv; const DBT *control, *rec; int eid; u_int32_t flags;{ int fd, n, ret, t_ret; machtab_t *machtab; member_t *m; machtab = (machtab_t *)dbenv->app_private; if (eid == DB_EID_BROADCAST) { /* * Right now, we do not require successful transmission. * I'd like to move this requiring at least one successful * transmission on PERMANENT requests. */ n = quote_send_broadcast(machtab, rec, control, flags); if (n < 0 /*|| (n == 0 && LF_ISSET(DB_REP_PERMANENT))*/) return (DB_REP_UNAVAIL); return (0); } if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0) return (ret); fd = 0; for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = LIST_NEXT(m, links)) { if (m->eid == eid) { fd = m->fd; break; } } if (fd == 0) { dbenv->err(dbenv, DB_REP_UNAVAIL, "quote_send: cannot find machine ID %d", eid); return (DB_REP_UNAVAIL); } ret = quote_send_one(rec, control, fd, flags); if ((t_ret = (pthread_mutex_unlock(&machtab->mtmutex))) != 0 && ret == 0) ret = t_ret; return (ret);}/* * quote_send_broadcast -- * Send a message to everybody. * Returns the number of sites to which this message was successfully * communicated. A -1 indicates a fatal error. */static intquote_send_broadcast(machtab, rec, control, flags) machtab_t *machtab; const DBT *rec, *control; u_int32_t flags;{ int ret, sent; member_t *m, *next; if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0) return (0); sent = 0; for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) { next = LIST_NEXT(m, links); if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) { (void)machtab_rem(machtab, m->eid, 0); } else sent++; } if (pthread_mutex_unlock(&machtab->mtmutex) != 0) return (-1); return (sent);}/* * quote_send_one -- * Send a message to a single machine, given that machine's file * descriptor. * * !!! * Note that the machtab mutex should be held through this call. * It doubles as a synchronizer to make sure that two threads don't * intersperse writes that are part of two single messages. */static intquote_send_one(rec, control, fd, flags) const DBT *rec, *control; int fd; u_int32_t flags;{ int retry; ssize_t bytes_left, nw; u_int8_t *wp; COMPQUIET(flags, 0); /* * The protocol is simply: write rec->size, write rec->data, * write control->size, write control->data. */ nw = write(fd, &rec->size, 4); if (nw != 4) return (DB_REP_UNAVAIL); if (rec->size > 0) { nw = write(fd, rec->data, rec->size); if (nw < 0) return (DB_REP_UNAVAIL); if (nw != (ssize_t)rec->size) { /* Try a couple of times to finish the write. */ wp = (u_int8_t *)rec->data + nw; bytes_left = rec->size - nw; for (retry = 0; bytes_left > 0 && retry < 3; retry++) { nw = write(fd, wp, bytes_left); if (nw < 0) return (DB_REP_UNAVAIL); bytes_left -= nw; wp += nw; } if (bytes_left > 0) return (DB_REP_UNAVAIL); } } nw = write(fd, &control->size, 4); if (nw != 4) return (DB_REP_UNAVAIL); if (control->size > 0) { nw = write(fd, control->data, control->size); if (nw != (ssize_t)control->size) return (DB_REP_UNAVAIL); } return (0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -