📄 rep_net.c
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2005 * Oracle Corporation. All rights reserved. * * $Id: rep_net.c,v 12.12 2006/08/24 14:45:45 bostic Exp $ */#include <sys/types.h>#include <assert.h>#include <errno.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <db.h>#include "rep_base.h"#include "../common/rep_common.h"#ifndef _SYS_QUEUE_H/* * Some *BSD Unix variants include the Queue macros in their libraries and * these might already have been included. In that case, it would be bad * to include them again. */#include <dbinc/queue.h> /* !!!: for the LIST_XXX macros. */#endifint machtab_add __P((machtab_t *, socket_t, u_int32_t, int, int *));#ifdef DIAGNOSTICvoid machtab_print __P((machtab_t *));#endifssize_t readn __P((socket_t, void *, size_t));/* * This file defines the communication infrastructure for the ex_repquote * sample application. * * This application uses TCP/IP for its communication. In an N-site * replication group, this means that there are N * N communication * channels so that every site can communicate with every other site * (this allows elections to be held when the master fails). We do * not require that anyone know about all sites when the application * starts up. In order to communicate, the application should know * about someone, else it has no idea how to ever get in the game. * * Communication is handled via a number of different threads. These * thread functions are implemented in rep_util.c In this file, we * define the data structures that maintain the state that describes * the comm infrastructure, the functions that manipulates this state * and the routines used to actually send and receive data over the * sockets. *//* * The communication infrastructure is represented by a machine table, * machtab_t, which is essentially a mutex-protected linked list of members * of the group. The machtab also contains the parameters that are needed * to call for an election. We hardwire values for these parameters in the * init function, but these could be set via some configuration setup in a * real application. We reserve the machine-id 1 to refer to ourselves and * make the machine-id 0 be invalid. */#define MACHID_INVALID 0#define MACHID_SELF 1struct __machtab { LIST_HEAD(__machlist, __member) machlist; int nextid; mutex_t mtmutex; u_int32_t timeout_time; int current; int max; int nsites;};/* Data structure that describes each entry in the machtab. */struct __member { u_int32_t hostaddr; /* Host IP address. */ int port; /* Port number. */ int eid; /* Application-specific machine id. */ socket_t fd; /* File descriptor for the socket. */ LIST_ENTRY(__member) links; /* For linked list of all members we know of. */};static int quote_send_broadcast __P((machtab_t *, const DBT *, const DBT *, u_int32_t));static int quote_send_one __P((const DBT *, const DBT *, socket_t, u_int32_t));/* * machtab_init -- * Initialize the machine ID table. * XXX Right now we treat the number of sites as the maximum * number we've ever had on the list at one time. We probably * want to make that smarter. */intmachtab_init(machtabp, nsites) machtab_t **machtabp; int nsites;{ int ret; machtab_t *machtab; if ((machtab = malloc(sizeof(machtab_t))) == NULL) { fprintf(stderr, "can't allocate memory\n"); return (ENOMEM); } LIST_INIT(&machtab->machlist); /* Reserve eid's 0 and 1. */ machtab->nextid = 2; machtab->timeout_time = 2 * 1000000; /* 2 seconds. */ machtab->current = machtab->max = 0; machtab->nsites = nsites; ret = mutex_init(&machtab->mtmutex, NULL); *machtabp = machtab; return (ret);}/* * machtab_add -- * Add a file descriptor to the table of machines, returning * a new machine ID. */intmachtab_add(machtab, fd, hostaddr, port, idp) machtab_t *machtab; socket_t fd; u_int32_t hostaddr; int port, *idp;{ int ret; member_t *m, *member; ret = 0; if ((member = malloc(sizeof(member_t))) == NULL) { fprintf(stderr, "can't allocate memory\n"); return (ENOMEM); } member->fd = fd; member->hostaddr = hostaddr; member->port = port; if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { fprintf(stderr, "can't lock mutex"); return (ret); } for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = LIST_NEXT(m, links)) if (m->hostaddr == hostaddr && m->port == port) break; if (m == NULL) { member->eid = machtab->nextid++; LIST_INSERT_HEAD(&machtab->machlist, member, links); } else member->eid = m->eid; if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { fprintf(stderr, "can't unlock mutex\n"); return (ret); } if (idp != NULL) *idp = member->eid; if (m == NULL) { if (++machtab->current > machtab->max) machtab->max = machtab->current; } else { free(member); ret = EEXIST; }#ifdef DIAGNOSTIC printf("Exiting machtab_add\n"); machtab_print(machtab);#endif return (ret);}/* * machtab_getinfo -- * Return host and port information for a particular machine id. */intmachtab_getinfo(machtab, eid, hostp, portp) machtab_t *machtab; int eid; u_int32_t *hostp; int *portp;{ int ret; member_t *member; if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { fprintf(stderr, "can't lock mutex\n"); return (ret); } for (member = LIST_FIRST(&machtab->machlist); member != NULL; member = LIST_NEXT(member, links)) if (member->eid == eid) { *hostp = member->hostaddr; *portp = member->port; break; } if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { fprintf(stderr, "can't unlock mutex\n"); return (ret); } return (member != NULL ? 0 : EINVAL);}/* * machtab_rem -- * Remove a mapping from the table of machines. Lock indicates * whether we need to lock the machtab or not (0 indicates we do not * need to lock; non-zero indicates that we do need to lock). */intmachtab_rem(machtab, eid, lock) machtab_t *machtab; int eid; int lock;{ int found, ret; member_t *member; ret = 0; if (lock && (ret = mutex_lock(&machtab->mtmutex)) != 0) { fprintf(stderr, "can't lock mutex\n"); return (ret); } for (found = 0, member = LIST_FIRST(&machtab->machlist); member != NULL; member = LIST_NEXT(member, links)) if (member->eid == eid) { found = 1; LIST_REMOVE(member, links); (void)closesocket(member->fd); free(member); machtab->current--; break; } if (LIST_FIRST(&machtab->machlist) == NULL) machtab->nextid = 2; if (lock && (ret = mutex_unlock(&machtab->mtmutex)) != 0) fprintf(stderr, "can't unlock mutex\n");#ifdef DIAGNOSTIC printf("Exiting machtab_rem\n"); machtab_print(machtab);#endif return (ret);}voidmachtab_parm(machtab, nump, timeoutp) machtab_t *machtab; int *nump; u_int32_t *timeoutp;{ if (machtab->nsites == 0) *nump = machtab->max; else *nump = machtab->nsites; *timeoutp = machtab->timeout_time;}#ifdef DIAGNOSTICvoidmachtab_print(machtab) machtab_t *machtab;{ member_t *m; if (mutex_lock(&machtab->mtmutex) != 0) { fprintf(stderr, "can't lock mutex\n"); abort(); } for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = LIST_NEXT(m, links)) { printf("IP: %lx Port: %6d EID: %2d FD: %3d\n", (long)m->hostaddr, m->port, m->eid, m->fd); } if (mutex_unlock(&machtab->mtmutex) != 0) { fprintf(stderr, "can't unlock mutex\n"); abort(); }}#endif/* * listen_socket_init -- * Initialize a socket for listening on the specified port. Returns * a file descriptor for the socket, ready for an accept() call * in a thread that we're happy to let block. */socket_tlisten_socket_init(progname, port) const char *progname; int port;{ socket_t s; int sockopt; struct sockaddr_in si; COMPQUIET(progname, NULL); if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { perror("can't create listen socket"); return (-1); } memset(&si, 0, sizeof(si)); si.sin_family = AF_INET; si.sin_addr.s_addr = htonl(INADDR_ANY); si.sin_port = htons((unsigned short)port); /* * When using this example for testing, it's common to kill and restart * regularly. On some systems, this causes bind to fail with "address * in use" errors unless this option is set. */ sockopt = 1; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&sockopt, sizeof (sockopt)); if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) { perror("can't bind listen socket"); goto err; } if (listen(s, 5) != 0) { perror("can't establish listen queue"); goto err; } return (s);err: closesocket(s); return (-1);}/* * listen_socket_accept -- * Accept a connection on a socket. This is essentially just a wrapper * for accept(3). */socket_tlisten_socket_accept(machtab, progname, s, eidp) machtab_t *machtab; const char *progname; socket_t s; int *eidp;{ struct sockaddr_in si; int si_len; int host, ret;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -