⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 doublewrite.c

📁 这是一个针对大数据的反复多线程读写文件系统的测试代码。
💻 C
字号:
#define _GNU_SOURCE#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <string.h>#include <stdlib.h>#include <stdio.h>#include <errno.h>#include <unistd.h>#include <pthread.h>#include <time.h>#define TRACE(args...) do { } while(0)//#define TRACE(args...) do { fprintf(stderr, args) } while(0)struct ctrl {  int id;  int fd;  char *fname;  unsigned long long offset;  void *buf;  int len;  volatile int doclose, dowrite, done;};struct ctrl c1, c2;static int myopen(int *fd, char *fname){  if((*fd) == -1) {    TRACE("Opening file\n");    if(((*fd) = open(fname, O_WRONLY)) <0) {      perror("open");      return -errno;    }  }  return 0;}void *writer_thread(void *arg){  struct ctrl *c = (struct ctrl *)arg;  int err = 0, r;  TRACE("Thread %d starting\n", c->id);  while(1) {    TRACE("Thread %d waiting for command\n", c->id);    while(c->dowrite == 0)      sched_yield();    if(c->dowrite == -1) {      TRACE("Thread %d got abort command\n", c->id);      break;    }    TRACE("Thread %d got write command\n", c->id);    if((rand() & 3) == 3) {      TRACE("Thread %d doing artificial yield\n", c->id);      sched_yield();    }    c->dowrite = 0;    err = myopen(&c->fd, c->fname);    if(err)      break;    TRACE("Thread %d writing at %12llu len %d\n",	c->id, c->offset, c->len);    if((r = lseek64(c->fd, c->offset, SEEK_SET)) <0) {      fprintf(stderr, "Thread %d: lseek64: %s\n", c->id, strerror(errno));      fflush(NULL);      err = -errno;      break;    }    if((r = write(c->fd, c->buf, c->len)) <0) {      fprintf(stderr, "Thread %d: write: %s\n", c->id, strerror(errno));      fflush(NULL);      err = -errno;      break;    }else if(r != c->len) {      fprintf(stderr, "Thread %d: Short write: %d instead of %d\n", 	      c->id, r, c->len);      fflush(NULL);      err = -999;      break;    }    if(c->doclose) {      TRACE("Thread %d closing file\n", c->id);      close(c->fd);      c->fd = -1;      c->doclose = 0;    }    TRACE("Thread %d signaling done\n", c->id);    do {      c->done = 1;    } while(0);  }  TRACE("Thread %d finishing, returns %d\n", c->id, err);  c->done = err;  return (void *)err;}void usage(char *progname){  fprintf(stderr, "Usage: %s <outfilename> <byte-value> <Mbytes>\n", progname);  exit(1);}int main(int argc, char *argv[]){  int fd, res1, res2;  pthread_t t1, t2;  unsigned long long buf1[2048], buf2[2048];  unsigned long long pos, len;  unsigned char mark;  int turns = 0;  if(argc != 4)    usage(argv[0]);  {    int v;    char *endptr;    v = strtol(argv[2], &endptr, 0);    if(endptr == argv[2] || *endptr != 0)      usage(argv[0]);    if(v<0 || v>255)      usage(argv[0]);    mark = v;  }  {    int v;    char *endptr;    v = strtol(argv[3], &endptr, 0);    if(endptr == argv[3] || *endptr != 0)      usage(argv[0]);    if(v<0)      usage(argv[0]);    len = v;  }  c1.id = 1; c2.id = 2;  c1.fd = c2.fd = -1;  c1.fname = c2.fname = argv[1];  c1.doclose = c2.doclose = 0;  if((fd = open(argv[1], O_WRONLY|O_CREAT|O_TRUNC, 0666)) <0) {    perror("open"); exit(1);  }  c1.dowrite = c2.dowrite = 0;  c1.buf = buf1; c2.buf = buf2;  if(pthread_create(&t1, NULL, &writer_thread, (void *)&c1) != 0) {    perror("pthread_create"); exit(1);  }  if(pthread_create(&t2, NULL, &writer_thread, (void *)&c2) != 0) {    perror("pthread_create"); exit(1);  }  fprintf(stderr, "Doublewrite: length %lluMB, chunks %d bytes, mark 0x%.2x\n",	  len, sizeof(buf1), mark);  len *= 1024*1024;  pos = 0LL;  while(pos < len) {    int i, l = (len-pos)/2;    if(0 && pos != 0 && pos % (16*1024*1024) == 0)      fprintf(stderr, "Progress %12llu\n", pos);    if(l > sizeof(buf1))      l = sizeof(buf1);    c1.offset = pos;    c2.offset = pos+l;    c1.len = c2.len = l;    l /= sizeof(unsigned long long);    for(i=0; i<l; i++) {      *((unsigned long long *)(buf1+i))	= pos | ((unsigned long long)mark <<48) | (0x01000000LL <<32);      pos += sizeof(unsigned long long);    }    for(i=0; i<l; i++) {      *((unsigned long long *)(buf2+i))	= pos | ((unsigned long long)mark << 48) | (0x02000000LL <<32);      pos += sizeof(unsigned long long);    }    if(turns % 8 == 0) {      c1.doclose = 1;      c2.doclose = 1;    }    TRACE("Triggering writes for offset %12llu\n", c1.offset);    do {      c1.done = 0;      c2.done = 0;    } while(0);    do {      c1.dowrite = 1;      c2.dowrite = 1;    } while(0);    TRACE("Waiting for completion\n");    while(!c1.done || !c2.done)      sched_yield();    TRACE("Both threads completed\n");    if(c1.done <0 || c2.done <0) {      fprintf(stderr, "Thread reports error\n");      break;    }    turns++;  }  TRACE("Telling threads to stop\n");  c1.dowrite = c2.dowrite = -1;  TRACE("Joining threads\n");  fflush(NULL);  if(pthread_join(t1, (void **)&res1) != 0     || pthread_join(t2, (void **)&res2) != 0) {    perror("pthread_join"); exit(1);  }  if(res1 != 0)    fprintf(stderr, "Thread 1 says %s\n", strerror(-res1));  if(res2 != 0)    fprintf(stderr, "Thread 2 says %s\n", strerror(-res2));  close(c1.fd); close(c2.fd);  if(res1 == 0 || res2 == 0)    fprintf(stderr, "Graceful exit\n");  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -