⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ex_thread.c

📁 File system using stacked.
💻 C
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 1997-2002 *	Sleepycat Software.  All rights reserved. * * $Id: ex_thread.c,v 1.1.1.1 2004/08/19 23:53:56 gopalan Exp $ */#include <sys/types.h>#include <sys/time.h>#include <errno.h>#include <pthread.h>#include <signal.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <time.h>#ifdef _WIN32extern int getopt(int, char * const *, const char *);#else#include <unistd.h>#endif#include <db.h>/* * NB: This application is written using POSIX 1003.1b-1993 pthreads * interfaces, which may not be portable to your system. */extern int sched_yield __P((void));		/* Pthread yield function. */int	db_init __P((const char *));void   *deadlock __P((void *));void	fatal __P((const char *, int, int));void	onint __P((int));int	main __P((int, char *[]));int	reader __P((int));void	stats __P((void));void   *trickle __P((void *));void   *tstart __P((void *));int	usage __P((void));void	word __P((void));int	writer __P((int));int	quit;					/* Interrupt handling flag. */struct _statistics {	int aborted;				/* Write. */	int aborts;				/* Read/write. */	int adds;				/* Write. */	int deletes;				/* Write. */	int txns;				/* Write. */	int found;				/* Read. */	int notfound;				/* Read. */} *perf;const char	*progname = "ex_thread";		/* Program name. */#define	DATABASE	"access.db"		/* Database name. */#define	WORDLIST	"../test/wordlist"	/* Dictionary. *//* * We can seriously increase the number of collisions and transaction * aborts by yielding the scheduler after every DB call.  Specify the * -p option to do this. */int	punish;					/* -p */int	nlist;					/* -n */int	nreaders;				/* -r */int	verbose;				/* -v */int	nwriters;				/* -w */DB     *dbp;					/* Database handle. */DB_ENV *dbenv;					/* Database environment. */int	nthreads;				/* Total threads. */char  **list;					/* Word list. *//* * ex_thread -- *	Run a simple threaded application of some numbers of readers and *	writers competing for a set of words. * * Example UNIX shell script to run this program: *	% rm -rf TESTDIR *	% mkdir TESTDIR *	% ex_thread -h TESTDIR */intmain(argc, argv)	int argc;	char *argv[];{	extern char *optarg;	extern int errno, optind;	DB_TXN *txnp;	pthread_t *tids;	int ch, i, ret;	const char *home;	void *retp;	txnp = NULL;	nlist = 1000;	nreaders = nwriters = 4;	home = "TESTDIR";	while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)		switch (ch) {		case 'h':			home = optarg;			break;		case 'p':			punish = 1;			break;		case 'n':			nlist = atoi(optarg);			break;		case 'r':			nreaders = atoi(optarg);			break;		case 'v':			verbose = 1;			break;		case 'w':			nwriters = atoi(optarg);			break;		case '?':		default:			return (usage());		}	argc -= optind;	argv += optind;	/* Initialize the random number generator. */	srand(getpid() | time(NULL));	/* Register the signal handler. */	(void)signal(SIGINT, onint);	/* Build the key list. */	word();	/* Remove the previous database. */	(void)remove(DATABASE);	/* Initialize the database environment. */	if ((ret = db_init(home)) != 0)		return (ret);	/* Initialize the database. */	if ((ret = db_create(&dbp, dbenv, 0)) != 0) {		dbenv->err(dbenv, ret, "db_create");		(void)dbenv->close(dbenv, 0);		return (EXIT_FAILURE);	}	if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {		dbp->err(dbp, ret, "set_pagesize");		goto err;	}	if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0)		fatal("txn_begin", ret, 1);	if ((ret = dbp->open(dbp, txnp,	     DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {		dbp->err(dbp, ret, "%s: open", DATABASE);		goto err;	} else {		ret = txnp->commit(txnp, 0);		txnp = NULL;		if (ret != 0)			goto err;	}	nthreads = nreaders + nwriters + 2;	printf("Running: readers %d, writers %d\n", nreaders, nwriters);	fflush(stdout);	/* Create statistics structures, offset by 1. */	if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)		fatal(NULL, errno, 1);	/* Create thread ID structures. */	if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)		fatal(NULL, errno, 1);	/* Create reader/writer threads. */	for (i = 0; i < nreaders + nwriters; ++i)		if ((ret =		    pthread_create(&tids[i], NULL, tstart, (void *)i)) != 0)			fatal("pthread_create", ret > 0 ? ret : errno, 1);	/* Create buffer pool trickle thread. */	if (pthread_create(&tids[i], NULL, trickle, &i))		fatal("pthread_create", errno, 1);	++i;	/* Create deadlock detector thread. */	if (pthread_create(&tids[i], NULL, deadlock, &i))		fatal("pthread_create", errno, 1);	/* Wait for the threads. */	for (i = 0; i < nthreads; ++i)		(void)pthread_join(tids[i], &retp);	printf("Exiting\n");	stats();err:	if (txnp != NULL)		(void)txnp->abort(txnp);	(void)dbp->close(dbp, 0);	(void)dbenv->close(dbenv, 0);	return (EXIT_SUCCESS);}intreader(id)	int id;{	DBT key, data;	int n, ret;	char buf[64];	/*	 * DBT's must use local memory or malloc'd memory if the DB handle	 * is accessed in a threaded fashion.	 */	memset(&key, 0, sizeof(DBT));	memset(&data, 0, sizeof(DBT));	data.flags = DB_DBT_MALLOC;	/*	 * Read-only threads do not require transaction protection, unless	 * there's a need for repeatable reads.	 */	while (!quit) {		/* Pick a key at random, and look it up. */		n = rand() % nlist;		key.data = list[n];		key.size = strlen(key.data);		if (verbose) {			sprintf(buf, "reader: %d: list entry %d\n", id, n);			write(STDOUT_FILENO, buf, strlen(buf));		}		switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {		case DB_LOCK_DEADLOCK:		/* Deadlock. */			++perf[id].aborts;			break;		case 0:				/* Success. */			++perf[id].found;			free(data.data);			break;		case DB_NOTFOUND:		/* Not found. */			++perf[id].notfound;			break;		default:			sprintf(buf,			    "reader %d: dbp->get: %s", id, (char *)key.data);			fatal(buf, ret, 0);		}	}	return (0);}intwriter(id)	int id;{	DBT key, data;	DB_TXN *tid;	time_t now, then;	int n, ret;	char buf[256], dbuf[10000];	time(&now);	then = now;	/*	 * DBT's must use local memory or malloc'd memory if the DB handle	 * is accessed in a threaded fashion.	 */	memset(&key, 0, sizeof(DBT));	memset(&data, 0, sizeof(DBT));	data.data = dbuf;	data.ulen = sizeof(dbuf);	data.flags = DB_DBT_USERMEM;	while (!quit) {		/* Pick a random key. */		n = rand() % nlist;		key.data = list[n];		key.size = strlen(key.data);		if (verbose) {			sprintf(buf, "writer: %d: list entry %d\n", id, n);			write(STDOUT_FILENO, buf, strlen(buf));		}		/* Abort and retry. */		if (0) {retry:			if ((ret = tid->abort(tid)) != 0)				fatal("DB_TXN->abort", ret, 1);			++perf[id].aborts;			++perf[id].aborted;		}		/* Thread #1 prints out the stats every 20 seconds. */		if (id == 1) {			time(&now);			if (now - then >= 20) {				stats();				then = now;			}		}		/* Begin the transaction. */		if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0)			fatal("txn_begin", ret, 1);		/*		 * Get the key.  If it doesn't exist, add it.  If it does		 * exist, delete it.		 */		switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {		case DB_LOCK_DEADLOCK:			goto retry;		case 0:			goto delete;		case DB_NOTFOUND:			goto add;		}		sprintf(buf, "writer: %d: dbp->get", id);		fatal(buf, ret, 1);		/* NOTREACHED */delete:		/* Delete the key. */		switch (ret = dbp->del(dbp, tid, &key, 0)) {		case DB_LOCK_DEADLOCK:			goto retry;		case 0:			++perf[id].deletes;			goto commit;		}		sprintf(buf, "writer: %d: dbp->del", id);		fatal(buf, ret, 1);		/* NOTREACHED */add:		/* Add the key.  1 data item in 30 is an overflow item. */		data.size = 20 + rand() % 128;		if (rand() % 30 == 0)			data.size += 8192;		switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {		case DB_LOCK_DEADLOCK:			goto retry;		case 0:			++perf[id].adds;			goto commit;		default:			sprintf(buf, "writer: %d: dbp->put", id);			fatal(buf, ret, 1);		}commit:		/* The transaction finished, commit it. */		if ((ret = tid->commit(tid, 0)) != 0)			fatal("DB_TXN->commit", ret, 1);		/*		 * Every time the thread completes 20 transactions, show		 * our progress.		 */		if (++perf[id].txns % 20 == 0) {			sprintf(buf,"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",			    id, perf[id].adds, perf[id].deletes,			    perf[id].aborts, perf[id].txns);			write(STDOUT_FILENO, buf, strlen(buf));		}		/*		 * If this thread was aborted more than 5 times before		 * the transaction finished, complain.		 */		if (perf[id].aborted > 5) {			sprintf(buf,"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n",			    id, perf[id].adds, perf[id].deletes,			    perf[id].aborts, perf[id].txns, perf[id].aborted);			write(STDOUT_FILENO, buf, strlen(buf));		}		perf[id].aborted = 0;	}	return (0);}/* * stats -- *	Display reader/writer thread statistics.  To display the statistics *	for the mpool trickle or deadlock threads, use db_stat(1). */voidstats(){	int id;	char *p, buf[8192];	p = buf + sprintf(buf, "-------------\n");	for (id = 0; id < nreaders + nwriters;)		if (id++ < nwriters)			p += sprintf(p,	"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",			    id, perf[id].adds,			    perf[id].deletes, perf[id].aborts, perf[id].txns);		else			p += sprintf(p,	"reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n",			    id, perf[id].found,			    perf[id].notfound, perf[id].aborts);	p += sprintf(p, "-------------\n");	write(STDOUT_FILENO, buf, p - buf);}/* * db_init -- *	Initialize the environment. */intdb_init(home)	const char *home;{	int ret;	if ((ret = db_env_create(&dbenv, 0)) != 0) {		fprintf(stderr,		    "%s: db_env_create: %s\n", progname, db_strerror(ret));		return (EXIT_FAILURE);	}	if (punish) {		(void)dbenv->set_flags(dbenv, DB_YIELDCPU, 1);		(void)db_env_set_func_yield(sched_yield);	}	dbenv->set_errfile(dbenv, stderr);	dbenv->set_errpfx(dbenv, progname);	(void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);	(void)dbenv->set_lg_max(dbenv, 200000);	if ((ret = dbenv->open(dbenv, home,	    DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |	    DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {		dbenv->err(dbenv, ret, NULL);		(void)dbenv->close(dbenv, 0);		return (EXIT_FAILURE);	}	return (0);}/* * tstart -- *	Thread start function for readers and writers. */void *tstart(arg)	void *arg;{	pthread_t tid;	u_int id;	id = (u_int)arg + 1;	tid = pthread_self();	if (id <= (u_int)nwriters) {		printf("write thread %d starting: tid: %lu\n", id, (u_long)tid);		fflush(stdout);		writer(id);	} else {		printf("read thread %d starting: tid: %lu\n", id, (u_long)tid);		fflush(stdout);		reader(id);	}	/* NOTREACHED */	return (NULL);}/* * deadlock -- *	Thread start function for DB_ENV->lock_detect. */void *deadlock(arg)	void *arg;{	struct timeval t;	pthread_t tid;	arg = arg;				/* XXX: shut the compiler up. */	tid = pthread_self();	printf("deadlock thread starting: tid: %lu\n", (u_long)tid);	fflush(stdout);	t.tv_sec = 0;	t.tv_usec = 100000;	while (!quit) {		(void)dbenv->lock_detect(dbenv, 0, DB_LOCK_YOUNGEST, NULL);		/* Check every 100ms. */		(void)select(0, NULL, NULL, NULL, &t);	}	return (NULL);}/* * trickle -- *	Thread start function for memp_trickle. */void *trickle(arg)	void *arg;{	pthread_t tid;	int wrote;	char buf[64];	arg = arg;				/* XXX: shut the compiler up. */	tid = pthread_self();	printf("trickle thread starting: tid: %lu\n", (u_long)tid);	fflush(stdout);	while (!quit) {		(void)dbenv->memp_trickle(dbenv, 10, &wrote);		if (verbose) {			sprintf(buf, "trickle: wrote %d\n", wrote);			write(STDOUT_FILENO, buf, strlen(buf));		}		if (wrote == 0) {			sleep(1);			sched_yield();		}	}	return (NULL);}/* * word -- *	Build the dictionary word list. */voidword(){	FILE *fp;	int cnt;	char buf[256];	if ((fp = fopen(WORDLIST, "r")) == NULL)		fatal(WORDLIST, errno, 1);	if ((list = malloc(nlist * sizeof(char *))) == NULL)		fatal(NULL, errno, 1);	for (cnt = 0; cnt < nlist; ++cnt) {		if (fgets(buf, sizeof(buf), fp) == NULL)			break;		if ((list[cnt] = strdup(buf)) == NULL)			fatal(NULL, errno, 1);	}	nlist = cnt;		/* In case nlist was larger than possible. */}/* * fatal -- *	Report a fatal error and quit. */voidfatal(msg, err, syserr)	const char *msg;	int err, syserr;{	fprintf(stderr, "%s: ", progname);	if (msg != NULL) {		fprintf(stderr, "%s", msg);		if (syserr)			fprintf(stderr, ": ");	}	if (syserr)		fprintf(stderr, "%s", strerror(err));	fprintf(stderr, "\n");	exit(EXIT_FAILURE);	/* NOTREACHED */}/* * usage -- *	Usage message. */intusage(){	(void)fprintf(stderr,    "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n",	    progname);	return (EXIT_FAILURE);}/* * onint -- *	Interrupt signal handler. */voidonint(signo)	int signo;{	signo = 0;		/* Quiet compiler. */	quit = 1;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -