📄 threads.c
字号:
/***************************************************************** * * * Copyright (c) 2001-2006 McObject LLC. All Right Reserved. * * * *****************************************************************/#include "platform.h"#include <stdio.h>#include <stdlib.h>#include <string.h>#include "simple.h"#define MAXREADERS 10 /*10*/#define MAXWRITERS 4 /*4*/#define DIVIDER 7 /*4*/#define WRITER_DELAY 100 /* msec */#define READER_DELAY 200 /* msec */#define REPORTER_DELAY 200 /* msec *//* mask values: */#define M_UPGRADED 1#define M_IN_TRN 2#define M_FINISHING_TRN 3 /* in commit or rollback */#define M_WAIT 4 /* in commit or rollback */#define M_IDLE 0 /* not in trn */#define TEST_TIMEOUT 25000 // 25 sec/* ******************** SHARED DATA ***************** */static volatile int repeat = 1;static int TestMode = 0;static short maskR[MAXREADERS] = { /* who is active now (readers) */ 0};static short maskW[MAXWRITERS] = { /* who is active now (writers) */ 0};static MUTEX_T mut;static THREAD_ID_T reader_id[MAXREADERS];static THREAD_ID_T writer_id[MAXWRITERS];static THREAD_ID_T stopwait_id;static THREAD_ID_T statusreporter_id;static int nwrites = 0;static int nreads = 0;static int count = 0;static int nUpgradeTry = 0, nUpgraded = 0; int nr = 0; int nw = 0;typedef struct ThrParam_{ int num; /* 0 .. (MAXREADERS-1) or 0 .. (MAXWRITERS-1) */ mco_db_h db; int finished;} ThrParam;/* start read transaction */static mco_trans_h trans_R( mco_db_h db ) { mco_trans_h t = 0; mco_trans_start( db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t ); return t;}/* start write transaction */static mco_trans_h trans_W( mco_db_h db ) { mco_trans_h t = 0; mco_trans_start( db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t ); return t;}/* ************************************************** */static void error( MCO_RET rc ) { printf( "\n********** error %d ********** \n", rc ); exit( -1 );}/* reader thread procedure */THREAD_PROC_DEFINE(QuoteReader,p_) { ThrParam *tp = ( ThrParam * ) p_; MCO_RET rc; MCO_RET rcu = 0; mco_trans_h t = 0; mco_db_h db = tp->db; while ( repeat ) { MUTEX_LOCK(&mut); maskR[tp->num] = M_WAIT; MUTEX_UNLOCK(&mut); mco_trans_start( db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t ); nreads++; MUTEX_LOCK(&mut); maskR[tp->num] = M_IN_TRN; MUTEX_UNLOCK(&mut); /* do the job ... */ Sleep( 10 ); /* 1000 */ if((++count)%DIVIDER == 0) Sleep(10); if ( rand() % 4 == 0 ) { MUTEX_LOCK(&mut); maskR[tp->num] = M_UPGRADED; MUTEX_UNLOCK(&mut); rcu = mco_trans_upgrade( t ); nUpgradeTry++; if ( rcu == MCO_S_OK ) nUpgraded++; Sleep( 10 ); /* 1000 */ } MUTEX_LOCK(&mut); maskR[tp->num] = M_FINISHING_TRN; MUTEX_UNLOCK(&mut); Sleep( 1 ); /* 1000 */ MUTEX_LOCK(&mut); rc = mco_trans_commit( t ); maskR[tp->num] = M_IDLE; MUTEX_UNLOCK(&mut); if ( rc ) { error( rc ); } Sleep( READER_DELAY ); } /* disconnect from the database, db is no longer valid */ mco_trans_start( db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t ); mco_trans_rollback( t ); mco_db_disconnect( db ); tp->finished = 1;}/* thread */THREAD_PROC_DEFINE(QuoteWriter,p_) { ThrParam *tp = ( ThrParam * ) p_; mco_trans_h t = 0; MCO_RET rc; mco_db_h db = tp->db; while ( repeat ) { MUTEX_LOCK(&mut); maskW[tp->num] = M_WAIT; MUTEX_UNLOCK(&mut); mco_trans_start( db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t ); nwrites++; MUTEX_LOCK(&mut); maskW[tp->num] = M_IN_TRN; MUTEX_UNLOCK(&mut); /* do the job ... */ Sleep( 10 ); if((++count)%DIVIDER == 0) Sleep(10); MUTEX_LOCK(&mut); maskW[tp->num] = M_FINISHING_TRN; MUTEX_UNLOCK(&mut); MUTEX_LOCK(&mut); Sleep( 1 ); rc = mco_trans_commit( t ); maskW[tp->num] = M_IDLE; MUTEX_UNLOCK(&mut); if ( rc ) error( rc ); Sleep( WRITER_DELAY ); } /* disconnect from the database, db is no longer valid */ mco_trans_start( db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t ); mco_trans_rollback( t ); mco_db_disconnect( db ); tp->finished = 1;}/* thread procedure which waits a key to stop test */THREAD_PROC_DEFINE(StopWait,p) { ThrParam *tp = ( ThrParam * ) p; printf( "\nPress Enter to stop test\n" ); if (TestMode) Sleep(TEST_TIMEOUT); getchar(); repeat = 0; tp->finished = 1;}/* thread which reports reader/writer threads status periodically */THREAD_PROC_DEFINE(StatusReporter,p) { short mskR[MAXREADERS]; short mskW[MAXWRITERS]; ThrParam *tp = ( ThrParam * ) p; /* these were copies */ while ( repeat ) { int i; Sleep( REPORTER_DELAY ); /* make copy of masks for analysis: */ MUTEX_LOCK(&mut); for ( i = 0; i < MAXREADERS; i++ ) { mskR[i] = maskR[i]; } for ( i = 0; i < MAXWRITERS; i++ ) { mskW[i] = maskW[i]; } MUTEX_UNLOCK(&mut); /* first, check any conflicts: */ { int nr = 0, nw = 0; for ( i = 0; i < MAXREADERS; i++ ) { if ( (mskR[i] != M_IDLE) && (mskR[i] != M_UPGRADED) && (mskR[i] != M_WAIT) ) nr++; } for ( i = 0; i < MAXWRITERS; i++ ) { if ( ( mskW[i] != M_IDLE ) && ( mskW[i] != M_WAIT ) ) nw++; } if ( nr > 0 && nw > 0 ) { error( 1000 ); } if ( nw > 1 ) error( 1001 ); } /* second, print status: */ { char s[100]; int ps = 0; for ( i = 0; i < MAXREADERS; i++ ) { switch ( mskR[i] ) { case M_IDLE: s[ps] = '-'; break; case M_FINISHING_TRN: s[ps] = '@'; break; case M_WAIT: s[ps] = '&'; break; case M_IN_TRN: s[ps] = '*'; break; case M_UPGRADED: s[ps] = '$'; break; default: s[ps] = '?'; break; } ps++; } s[ps++] = ' '; s[ps++] = ' '; for ( i = 0; i < MAXWRITERS; i++ ) { switch ( mskW[i] ) { case M_IDLE: s[ps] = '-'; break; case M_FINISHING_TRN: s[ps] = '@'; break; case M_WAIT: s[ps] = '&'; break; case M_IN_TRN: s[ps] = '*'; break; default: s[ps] = '?'; break; } ps++; } sprintf( s + ps, " %d writes %d reads %d(%d) upgrades\n", nwrites, nreads, nUpgradeTry, nUpgraded ); /*s[ps++] = '\n';
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -