📄 rf_pscript.c
字号:
/* * rf_pscript.c * * Jim Zelenka, CMU/SCS, 13 Jun 1996 *//* * Copyright (c) 1996 Carnegie-Mellon University. * All rights reserved. * * Author: Jim Zelenka * * Permission to use, copy, modify and distribute this software and * its documentation is hereby granted, provided that both the copyright * notice and this permission notice appear in all copies of the * software, derivative works or modified versions, and any portions * thereof, and that both notices appear in supporting documentation. * * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. * * Carnegie Mellon requests users of this software to return to * * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU * School of Computer Science * Carnegie Mellon University * Pittsburgh PA 15213-3890 * * any improvements or extensions that they make and grant Carnegie the * rights to redistribute these changes. */#include <stdio.h>#include <errno.h>#include <fcntl.h>#include <unistd.h>#include <limits.h>#include <signal.h>#if defined(__osf__) || defined(AIX)#include <aio.h>#endif /* __osf__ || AIX */#include <sys/types.h>#ifdef __osf__#include <sys/utctime.h>#endif /* __osf__ */#include <sys/time.h>#include <sys/ioctl.h>#ifdef __osf__#include <sys/siginfo.h>#endif /* __osf__ */#include "rf_types.h"#include "rf_threadstuff.h"#include "rf_general.h"#if RF_CMU_PDL > 0 && !defined(SIMULATE)extern char *rf_gp_progname;#define DIR_READ 0#define DIR_WRITE 1static char *ops_str[] = {"read", "write"};#define RAIO_MAX AIO_LISTIO_MAXstatic struct raio { struct aiocb cb; struct raio *next;} raio_a[RAIO_MAX], *raio, *raio_active;RF_DECLARE_STATIC_MUTEX(raio_mutex)RF_DECLARE_STATIC_MUTEX(raio_mutex_bogus_ready)RF_DECLARE_STATIC_COND(raio_cond)RF_DECLARE_STATIC_COND(raio_cond_ready)static RF_Thread_t raio_check_thread;#define RAIO_LOCK() { RF_LOCK_MUTEX(raio_mutex); }#define RAIO_UNLOCK() { RF_UNLOCK_MUTEX(raio_mutex); }#define RAIO_WAIT() RF_WAIT_COND(raio_cond, raio_mutex)#define RAIO_SIGNAL() RF_SIGNAL_COND(raio_cond)#define RAIO_WAIT_READY() { \ RF_LOCK_MUTEX(raio_mutex_bogus_ready); \ RF_WAIT_COND(raio_cond_ready, raio_mutex_bogus_ready); \ RF_UNLOCK_MUTEX(raio_mutex_bogus_ready); \}#define RAIO_SIGNAL_READY() RF_SIGNAL_COND(raio_cond_ready)static RF_ThreadGroup_t scr_group;static RF_ThreadGroup_t raio_check_group;#define MAX_CONCURRENT 64#ifdef MAX#undef MAX#endif /* MAX */#define MAX(a,b) (((a)>(b))?(a):(b))#define DEBUG_NONE 0#define DEBUG_DELAY (1<<1) /* 2 */#define DEBUG_RAIO (1<<2) /* 4 */#define DEBUG_WORKLOOP (1<<3) /* 8 */#define DEBUG_SYNCOP (1<<4) /* 16 */#define DEBUG_ASYNCOP (1<<5) /* 32 */#define DEBUG_VPARSE (1<<6) /* 64 */static int debug;static int rf_bugcompat = 0;static int naio = 0;static int num_ios = 4096;static int concurrency = 1;static int blocksize = 4096;static int max_blocksize = 0;static off_t max_off = 0UL;struct rs_trace_entry { int blkno; int size; double delay; short pid; char op; char async_flag;};struct threadwork { int fd; RF_Thread_t thread; int ios; char *buf; int first_io; int offset; /* for reading tracefile */ double total_delay; /* debugging */};struct iowork { off_t off; int len; int dir; double delay; int async;};#define TWRITE(_b_,_l_) { \ int rc; \ rc = write(trace_fd, _b_, _l_); \ if (rc != _l_) { \ fprintf(stderr, "ERROR: %s got %d, wanted %d writing %s line %d\n", \ rf_gp_progname, rc, _l_, tracefile, __LINE__); \ fflush(stderr); \ exit(1); \ } \}#define TREAD(_b_,_l_) { \ int rc; \ rc = read(trace_fd, _b_, _l_); \ if (rc != _l_) { \ fprintf(stderr, "ERROR: %s got %d, wanted %d reading %s line %d\n", \ rf_gp_progname, rc, _l_, tracefile, __LINE__); \ fflush(stderr); \ exit(1); \ } \}static struct threadwork tw_a[MAX_CONCURRENT];static struct iowork *ios;/* * Call with raio lock held */static int raio_check(){ struct raio *r, *n, *p; struct aiocb *aio; int rc, f; if (debug&DEBUG_RAIO) { printf("check aio status\n"); fflush(stdout); } for(f=0,p=NULL,r=raio_active;r;r=n) { n = r->next; aio = &r->cb; rc = aio_error(aio); if (rc == EINPROGRESS) { p = r; continue; } f++; if (rc) { if (rc < 0) perror("aio_error"); fprintf(stderr, "ERROR: got rc=%d for failed aio, aio=%lx, raio_a=%lx\n", rc, aio, raio_a); fprintf(stderr, "fd %d offset %ld len %ld buf=%lx\n", aio->aio_fildes, aio->aio_offset, (long)aio->aio_nbytes, aio->aio_buf); fflush(stderr); exit(1); } if (debug&DEBUG_RAIO) { printf("completed %lx, raio_active=%lx, p=%lx\n", aio, raio_active, p); fflush(stdout); } aio_return(aio); /* cleanup */ if (p) p->next = r->next; else raio_active = r->next; r->next = raio; raio = r; } if (debug&DEBUG_RAIO) { printf("done check of aio status, f=%d\n", f); fflush(stdout); } return(f);}static void raio_check_worker(){ unsigned long i = 0; int f; RF_THREADGROUP_RUNNING(&raio_check_group); while(1) { if (debug&DEBUG_RAIO) { printf("wait_ready\n"); fflush(stdout); } RAIO_WAIT_READY(); if (debug&DEBUG_RAIO) { i++; printf("done wait %ld\n", i); fflush(stdout); } if (debug&DEBUG_RAIO) { printf("raio_check_worker: awake\n"); fflush(stdout); } RAIO_LOCK(); if (debug&DEBUG_RAIO) { printf("raio_check_worker: got lock\n"); fflush(stdout); } f = raio_check(); if (debug&DEBUG_RAIO) { printf("raio_check_worker: f=%d\n", f); fflush(stdout); } RAIO_UNLOCK(); if (f) RAIO_SIGNAL(); }}static void usr1handler(int sig, siginfo_t *info, struct sigcontext *scp){ if (debug&DEBUG_RAIO) { printf("usr1handler\n"); fflush(stdout); } RAIO_SIGNAL_READY();}static int random_op(fd, dir, offset, len, buf) int fd; int dir; off_t offset; int len; char *buf;{ off_t off; int rc; off = lseek(fd, offset, SEEK_SET); if (off != offset) { fprintf(stderr, "ERROR: seeked to %ld, wanted %ld\n", off, offset); fflush(stderr); return(-1); } if (dir == DIR_READ) { rc = read(fd, buf, len); } else { rc = write(fd, buf, len); }#if 1 if (rc != len) { char *s, t[1024]; sprintf(t, "%d", dir); if ((dir == DIR_READ) || (dir == DIR_WRITE)) s = ops_str[dir]; else s = t; fprintf(stderr, "%s(%d, %lx, %d)\n", s, fd, buf, len); fprintf(stderr, "random_op(%d, %d, %ld, %d, %lx)\n", fd, dir, offset, len, buf); fflush(stderr); }#endif return(rc);}static void work_loop(arg) RF_ThreadArg_t arg;{ int i, rc, ind, len, arr; struct threadwork *tw; struct aiocb *aio; struct raio *r; RF_THREADGROUP_RUNNING(&scr_group); signal(SIGUSR1, usr1handler); ind = (int)arg; tw = &tw_a[ind]; if (debug&DEBUG_WORKLOOP) { printf("work_loop for %d\n", ind); fflush(stdout); } for(i=0;i<tw->ios;i++) { arr = i + tw->first_io; len = ios[arr].len; if (debug&DEBUG_SYNCOP) { printf("%d doing random_op(%d, %d, %ld, %d, %lx)\n", ind, tw->fd, ios[arr].dir, ios[arr].off, len, tw->buf); fflush(stdout); } if ((ios[arr].delay != (double)0.0) && (!(rf_bugcompat && ios[arr].async))) { int secs = (int)ios[arr].delay; struct timespec ts; double d; ts.tv_sec = secs; d = ios[arr].delay - (double)secs; d *= (double)1000000000.0; ts.tv_nsec = (int)d; if (rf_bugcompat) { /* round off a la raidframe */ ts.tv_nsec /= 1000000; ts.tv_nsec *= 1000000; } if (debug&DEBUG_DELAY) { printf("%d delaying %d:%09d\n", ind, ts.tv_sec, ts.tv_nsec); fflush(stdout); } RF_DELAY_THREAD_TS(ts); if (debug&DEBUG_DELAY) { printf("%d done delaying\n", ind); fflush(stdout); } } if (ios[arr].async) { if (debug&DEBUG_ASYNCOP) { int n = naio++; printf("%d encountered async io %d\n", ind, n); fflush(stdout); } RAIO_LOCK(); if (debug&DEBUG_RAIO) { printf("%d took RAIO lock\n", ind); fflush(stdout); } while(raio == NULL) { if (debug&DEBUG_RAIO) { fprintf(stderr, "%d waiting for raio\n", ind); fflush(stderr); } RAIO_WAIT(); } r = raio; raio = r->next; aio = &r->cb; RAIO_UNLOCK(); if (debug&DEBUG_RAIO) { printf("%d has aio struct\n", ind); fflush(stdout); } bzero(aio, sizeof(aio)); aio->aio_fildes = tw->fd; aio->aio_offset = ios[arr].off; aio->aio_buf = tw->buf; aio->aio_nbytes = ios[arr].len; aio->aio_reqprio = AIO_PRIO_DFL; aio->aio_sigevent.sigev_signo = SIGUSR1; aio->aio_sigevent.sigev_notify = SIGEV_SIGNAL; if (ios[arr].dir == DIR_READ) { aio->aio_lio_opcode = LIO_READ; rc = aio_read(aio); } else { aio->aio_lio_opcode = LIO_WRITE; rc = aio_write(aio); } if (rc) { perror("aio op"); fprintf(stderr, "%d failed aio_%s io %d\n", ind, ops_str[ios[arr].dir], i); fflush(stderr); exit(1); } if (debug&DEBUG_ASYNCOP) { printf("%d launched aio %lx\n", ind, aio); fflush(stdout); } RAIO_LOCK(); r->next = raio_active; raio_active = r; RAIO_UNLOCK(); if (debug&DEBUG_RAIO) { printf("%d marked aio %lx active\n", ind, aio); fflush(stdout); } } else { rc = random_op(tw->fd, ios[arr].dir, ios[arr].off, len, tw->buf); if (rc != len) { if (rc < 0) { perror("random_op"); fprintf(stderr, "thread %d i=%d arr=%d\n", ind, i, arr); } else { char *s, t[1024]; sprintf(t, "%d", ios[arr].dir); if ((ios[arr].dir == DIR_READ) || (ios[arr].dir == DIR_WRITE)) s = ops_str[ios[arr].dir]; else s = t; fprintf(stderr, "ERROR: %s did %s of %d, wanted %d\n", rf_gp_progname, s, rc, len); fprintf(stderr, "random_op was fd=%d dir=%d off=%ld len=%d buf=%lx\n", tw->fd, ios[arr].dir, ios[i+tw->first_io].off, len, tw->buf); } fflush(stderr); exit(1); } if (debug&DEBUG_SYNCOP) { printf("%d completed random_op\n", ind); fflush(stdout); } } } if (debug&DEBUG_WORKLOOP) { printf("[%d] completed work_loop\n", ind); fflush(stdout); } RF_THREADGROUP_DONE(&scr_group);}void play_trace_init(){ int rc; rc = rf_mutex_init(&raio_mutex); if (rc) { fprintf(stderr, "ERROR: cannot init raio mutex\n"); exit(1); } rc = rf_mutex_init(&raio_mutex_bogus_ready); if (rc) { fprintf(stderr, "ERROR: cannot init raio mutex-2\n"); exit(1); } rc = rf_cond_init(&raio_cond); if (rc) { fprintf(stderr, "ERROR: cannot init raio_cond\n"); exit(1); } rc = rf_cond_init(&raio_cond_ready); if (rc) { fprintf(stderr, "ERROR: cannot init raio_cond_ready\n"); exit(1); } bzero((char *)&raio_check_group, sizeof(RF_ThreadGroup_t)); rc = rf_mutex_init(&raio_check_group.mutex); if (rc) { fprintf(stderr, "ERROR: cannot init raio_check_group mutex\n"); exit(1); } rc = rf_cond_init(&raio_check_group.cond); if (rc) { fprintf(stderr, "ERROR: cannot init raio_check_group cond\n"); exit(1); } rc = RF_CREATE_THREAD(raio_check_thread, raio_check_worker, NULL); if (rc) { fprintf(stderr, "ERROR: could not create raio check thread\n"); fflush(stderr); exit(1); } RF_THREADGROUP_STARTED(&raio_check_group); RF_THREADGROUP_WAIT_START(&raio_check_group);}int play_trace( char *tracefile, char *device){ int trace_fd, i, j, tmp, rc; struct rs_trace_entry ent; unsigned long status; bzero((char *)&scr_group, sizeof(RF_ThreadGroup_t)); rc = rf_mutex_init(&scr_group.mutex); if (rc) { fprintf(stderr, "ERROR: cannot init scr_group mutex\n"); exit(1); } rc = rf_cond_init(&scr_group.cond); if (rc) { fprintf(stderr, "ERROR: cannot init scr_group cond\n"); exit(1); } trace_fd = open(tracefile, O_RDONLY); if (trace_fd < 0) { perror(tracefile); exit(1); } TREAD(&concurrency, 4); if (concurrency > MAX_CONCURRENT) { fprintf(stderr, "%s: increase MAX_CONCURRENT (hack)\n"); exit(1); } for(i=0;i<concurrency;i++) { TREAD(&tw_a[i].offset, 4); } for(num_ios=0,i=0;i<concurrency;i++) { tw_a[i].first_io = num_ios; TREAD(&tw_a[i].ios, 4); num_ios += tw_a[i].ios; tw_a[i].total_delay = (double)0.0; } tmp=tw_a[0].ios; if (tw_a[0].offset != (4+(8*concurrency))) { fprintf(stderr, "ERROR: %s does not like this tracefile\n", rf_gp_progname); fprintf(stderr, " %s expected ios packed, in-order\n"); exit(1); } for(i=1;i<concurrency;i++) { if ((4+(8*concurrency) + (tmp*sizeof(ent))) != tw_a[i].offset) { fprintf(stderr, "ERROR: %s does not like this tracefile\n", rf_gp_progname); fprintf(stderr, " %s expected ios packed, in-order\n"); exit(1); } tmp += tw_a[i].ios; } ios = (struct iowork *)malloc(num_ios * sizeof(struct iowork)); if (ios == NULL) { fprintf(stderr, "ERROR: %s could not allocate %d iowork structs\n", rf_gp_progname, num_ios); exit(1); } max_off = 0L; for(i=0;i<num_ios;i++) { TREAD(&ent, sizeof(ent)); ios[i].off = (long)ent.blkno; if (ios[i].off > max_off) max_off = ios[i].off; blocksize = ent.size; if (i == 0) { max_blocksize = blocksize; } ios[i].len = ent.size; ios[i].delay = ent.delay; ios[i].async = ent.async_flag; if (ent.op == 'r') ios[i].dir = DIR_READ; else if (ent.op == 'w') ios[i].dir = DIR_WRITE; else { fprintf(stderr, "ERROR: %s got io direction %02x unknown\n", rf_gp_progname, ent.op); exit(1); } if (debug&DEBUG_VPARSE) { printf("ent %d:\n", i); printf(" blkno=%d\n", ent.blkno); printf(" size=%d\n", ent.size); printf(" delay=%lf\n", ent.delay); printf(" pid=%d\n", (int)ent.pid); printf(" op=%c\n", ent.op); printf(" async_flag=%d\n", ent.async_flag); } max_blocksize = MAX(blocksize, max_blocksize); } max_off -= max_blocksize; for(i=0;i<num_ios;i++) { ios[i].off %= (max_off/512L); ios[i].off *= 512L; if (ios[i].off < 0) { fprintf(stderr, "ERROR: %s got off=%ld from io %d\n", rf_gp_progname, ios[i].off, i); exit(1); } if ((ios[i].off + ios[i].len) >= max_off) { ios[i].off -= ((((ios[i].len)/512L)+1)*512L); } } for(i=0;i<concurrency;i++) { for(j=0;j<tw_a[i].ios;j++) { tw_a[i].total_delay += ios[j+tw_a[i].first_io].delay; } tw_a[i].fd = open(device, O_RDWR); if (tw_a[i].fd < 0) { perror(device); exit(1); } tw_a[i].buf = (char *)malloc(max_blocksize); if (tw_a[i].buf == NULL) { fprintf(stderr, "ERROR: %s cannot allocate %d byte buf\n", rf_gp_progname, max_blocksize); exit(1); } } printf("Read %d ios from %s. %d processes, max_blocksize=%d\n", num_ios, tracefile, concurrency, max_blocksize); for(i=0;i<concurrency;i++) { rc = RF_CREATE_THREAD(tw_a[i].thread, work_loop, (RF_ThreadArg_t)i); if (rc) { fprintf(stderr, "ERROR: could not create thread %d\n", i); exit(1); } RF_THREADGROUP_STARTED(&scr_group); } RF_THREADGROUP_WAIT_START(&scr_group); RF_THREADGROUP_WAIT_STOP(&scr_group); rf_mutex_destroy(&scr_group.mutex); rf_mutex_destroy(&scr_group.cond);}#endif /* RF_CMU_PDL > 0 && !defined(SIMULATE) */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -