📄 threads.c
字号:
/*****************************************************************
* *
* Copyright (c) 2001-2007 McObject LLC. All Right Reserved. *
* *
*****************************************************************/
#include "platform.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "simple.h"
#define MAXREADERS 10 /*10*/
#define MAXWRITERS 4 /*4*/
#define DIVIDER 7 /*4*/
#define WRITER_DELAY 100 /* msec */
#define READER_DELAY 200 /* msec */
#define REPORTER_DELAY 200 /* msec */
/* mask values: */
#define M_UPGRADED 1
#define M_IN_TRN 2
#define M_FINISHING_TRN 3 /* in commit or rollback */
#define M_WAIT 4 /* in commit or rollback */
#define M_IDLE 0 /* not in trn */
#define TEST_TIMEOUT 25000 // 25 sec
/* ******************** SHARED DATA ***************** */
static volatile int repeat = 1;
static int TestMode = 0;
static short maskR[MAXREADERS] =
{
/* who is active now (readers) */
0
};
static short maskW[MAXWRITERS] =
{
/* who is active now (writers) */
0
};
static MUTEX_T mut;
static THREAD_ID_T reader_id[MAXREADERS];
static THREAD_ID_T writer_id[MAXWRITERS];
static THREAD_ID_T stopwait_id;
static THREAD_ID_T statusreporter_id;
static int nwrites = 0;
static int nreads = 0;
static int count = 0;
static int nUpgradeTry = 0, nUpgraded = 0;
int nr = 0;
int nw = 0;
typedef struct ThrParam_
{
int num; /* 0 .. (MAXREADERS-1) or 0 .. (MAXWRITERS-1) */
mco_db_h db;
int finished;
} ThrParam;
/* start read transaction */
static mco_trans_h trans_R(mco_db_h db)
{
mco_trans_h t = 0;
mco_trans_start(db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t);
return t;
}
/* start write transaction */
static mco_trans_h trans_W(mco_db_h db)
{
mco_trans_h t = 0;
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
return t;
}
/* ************************************************** */
static void error(MCO_RET rc)
{
printf("\n********** error %d ********** \n", rc);
exit( - 1);
}
/* reader thread procedure */
THREAD_PROC_DEFINE(QuoteReader, p_)
{
ThrParam* tp = (ThrParam*)p_;
MCO_RET rc;
MCO_RET rcu = 0;
mco_trans_h t = 0;
mco_db_h db = tp->db;
while (repeat)
{
MUTEX_LOCK(&mut);
maskR[tp->num] = M_WAIT;
MUTEX_UNLOCK(&mut);
mco_trans_start(db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t);
nreads++;
MUTEX_LOCK(&mut);
maskR[tp->num] = M_IN_TRN;
MUTEX_UNLOCK(&mut);
/* do the job ... */
Sleep(10); /* 1000 */
if ((++count) % DIVIDER == 0)
{
Sleep(10);
}
if (rand() % 4 == 0)
{
MUTEX_LOCK(&mut);
maskR[tp->num] = M_UPGRADED;
MUTEX_UNLOCK(&mut);
rcu = mco_trans_upgrade(t);
nUpgradeTry++;
if (rcu == MCO_S_OK)
{
nUpgraded++;
}
Sleep(10); /* 1000 */
}
MUTEX_LOCK(&mut);
maskR[tp->num] = M_FINISHING_TRN;
MUTEX_UNLOCK(&mut);
Sleep(1); /* 1000 */
MUTEX_LOCK(&mut);
rc = mco_trans_commit(t);
maskR[tp->num] = M_IDLE;
MUTEX_UNLOCK(&mut);
if (rc)
{
error(rc);
}
Sleep(READER_DELAY);
}
/* disconnect from the database, db is no longer valid */
mco_trans_start(db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t);
mco_trans_rollback(t);
mco_db_disconnect(db);
tp->finished = 1;
}
/* thread */
THREAD_PROC_DEFINE(QuoteWriter, p_)
{
ThrParam* tp = (ThrParam*)p_;
mco_trans_h t = 0;
MCO_RET rc;
mco_db_h db = tp->db;
while (repeat)
{
MUTEX_LOCK(&mut);
maskW[tp->num] = M_WAIT;
MUTEX_UNLOCK(&mut);
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
nwrites++;
MUTEX_LOCK(&mut);
maskW[tp->num] = M_IN_TRN;
MUTEX_UNLOCK(&mut);
/* do the job ... */
Sleep(10);
if ((++count) % DIVIDER == 0)
{
Sleep(10);
}
MUTEX_LOCK(&mut);
maskW[tp->num] = M_FINISHING_TRN;
MUTEX_UNLOCK(&mut);
MUTEX_LOCK(&mut);
Sleep(1);
rc = mco_trans_commit(t);
maskW[tp->num] = M_IDLE;
MUTEX_UNLOCK(&mut);
if (rc)
{
error(rc);
}
Sleep(WRITER_DELAY);
}
/* disconnect from the database, db is no longer valid */
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
mco_trans_rollback(t);
mco_db_disconnect(db);
tp->finished = 1;
}
/* thread procedure which waits a key to stop test */
THREAD_PROC_DEFINE(StopWait, p)
{
ThrParam* tp = (ThrParam*)p;
printf("\nPress Enter to stop test\n");
if (TestMode)
{
Sleep(TEST_TIMEOUT);
}
getchar();
repeat = 0;
tp->finished = 1;
}
/* thread which reports reader/writer threads status periodically */
THREAD_PROC_DEFINE(StatusReporter, p)
{
short mskR[MAXREADERS];
short mskW[MAXWRITERS];
ThrParam* tp = (ThrParam*)p;
/* these were copies */
while (repeat)
{
int i;
Sleep(REPORTER_DELAY);
/* make copy of masks for analysis: */
MUTEX_LOCK(&mut);
for (i = 0; i < MAXREADERS; i++)
{
mskR[i] = maskR[i];
}
for (i = 0; i < MAXWRITERS; i++)
{
mskW[i] = maskW[i];
}
MUTEX_UNLOCK(&mut);
/* first, check any conflicts: */
{
int nr = 0, nw = 0;
for (i = 0; i < MAXREADERS; i++)
{
if ((mskR[i] != M_IDLE) && (mskR[i] != M_UPGRADED) && (mskR[i] != M_WAIT))
{
nr++;
}
}
for (i = 0; i < MAXWRITERS; i++)
{
if ((mskW[i] != M_IDLE) && (mskW[i] != M_WAIT))
{
nw++;
}
}
if (nr > 0 && nw > 0)
{
error(1000);
}
if (nw > 1)
{
error(1001);
}
}
/* second, print status: */
{
char s[100];
int ps = 0;
for (i = 0; i < MAXREADERS; i++)
{
switch (mskR[i])
{
case M_IDLE:
s[ps] = '-';
break;
case M_FINISHING_TRN:
s[ps] = '@';
break;
case M_WAIT:
s[ps] = '&';
break;
case M_IN_TRN:
s[ps] = '*';
break;
case M_UPGRADED:
s[ps] = '$';
break;
default:
s[ps] = '?';
break;
}
ps++;
}
s[ps++] = ' ';
s[ps++] = ' ';
for (i = 0; i < MAXWRITERS; i++)
{
switch (mskW[i])
{
case M_IDLE:
s[ps] = '-';
break;
case M_FINISHING_TRN:
s[ps] = '@';
break;
case M_WAIT:
s[ps] = '&';
break;
case M_IN_TRN:
s[ps] = '*';
break;
default:
s[ps] = '?';
break;
}
ps++;
}
sprintf(s + ps, " %d writes %d reads %d(%d) upgrades\n", nwrites, nreads, nUpgradeTry, nUpgraded);
/*s[ps++] = '\n';
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -