⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threads.c

📁 PB 熟悉的哥们希望大家可以互相学习一下
💻 C
字号:
/*****************************************************************
 *                                                               *
 * Copyright (c) 2001-2007 McObject LLC. All Right Reserved.     *
 *                                                               *
 *****************************************************************/
#include "platform.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "simple.h"

#define MAXREADERS      10						  /*10*/
#define MAXWRITERS      4	  					  /*4*/
#define DIVIDER		7	  					  /*4*/

#define WRITER_DELAY    100	          /* msec */
#define READER_DELAY    200		  /* msec */
#define REPORTER_DELAY  200						  /* msec */

/* mask values: */
#define M_UPGRADED              1
#define M_IN_TRN                2
#define M_FINISHING_TRN         3				  /* in commit or rollback */
#define M_WAIT			4				  /* in commit or rollback */
#define M_IDLE                  0				  /* not in trn */

#define TEST_TIMEOUT		25000 // 25 sec
/* ******************** SHARED DATA ***************** */
static volatile int repeat = 1;
static int TestMode = 0;
static short maskR[MAXREADERS] = 
{
     /* who is active now (readers) */
    0
};
static short maskW[MAXWRITERS] = 
{
     /* who is active now (writers) */
    0
};

static MUTEX_T mut;
static THREAD_ID_T reader_id[MAXREADERS];
static THREAD_ID_T writer_id[MAXWRITERS];
static THREAD_ID_T stopwait_id;
static THREAD_ID_T statusreporter_id;

static int nwrites = 0;
static int nreads = 0;
static int count = 0;

static int nUpgradeTry = 0, nUpgraded = 0;

int nr = 0;
int nw = 0;

typedef struct ThrParam_
{
    int num; /* 0 .. (MAXREADERS-1)  or 0 .. (MAXWRITERS-1) */
    mco_db_h db;
    int finished;
} ThrParam;


/* start read transaction */
static mco_trans_h trans_R(mco_db_h db)
{
    mco_trans_h t = 0;
    mco_trans_start(db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t);
    return t;
}


/* start write transaction */
static mco_trans_h trans_W(mco_db_h db)
{
    mco_trans_h t = 0;
    mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
    return t;
}


/* ************************************************** */
static void error(MCO_RET rc)
{
    printf("\n********** error %d ********** \n", rc);
    exit( - 1);
}


/* reader thread procedure */
THREAD_PROC_DEFINE(QuoteReader, p_)
{
    ThrParam* tp = (ThrParam*)p_;
    MCO_RET rc;
    MCO_RET rcu = 0;
    mco_trans_h t = 0;

    mco_db_h db = tp->db;

    while (repeat)
    {

        MUTEX_LOCK(&mut);
        maskR[tp->num] = M_WAIT;
        MUTEX_UNLOCK(&mut);
        mco_trans_start(db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t);
        nreads++;

        MUTEX_LOCK(&mut);
        maskR[tp->num] = M_IN_TRN;
        MUTEX_UNLOCK(&mut);

        /* do the job ... */

        Sleep(10); /* 1000 */
        if ((++count) % DIVIDER == 0)
        {
            Sleep(10);
        }

        if (rand() % 4 == 0)
        {

            MUTEX_LOCK(&mut);
            maskR[tp->num] = M_UPGRADED;
            MUTEX_UNLOCK(&mut);

            rcu = mco_trans_upgrade(t);

            nUpgradeTry++;
            if (rcu == MCO_S_OK)
            {
                nUpgraded++;
            }
            Sleep(10); /* 1000 */
        }

        MUTEX_LOCK(&mut);
        maskR[tp->num] = M_FINISHING_TRN;
        MUTEX_UNLOCK(&mut);
        Sleep(1); /* 1000 */
        MUTEX_LOCK(&mut);

        rc = mco_trans_commit(t);

        maskR[tp->num] = M_IDLE;
        MUTEX_UNLOCK(&mut);

        if (rc)
        {
            error(rc);
        }

        Sleep(READER_DELAY);
    }

    /* disconnect from the database, db is no longer valid */
    mco_trans_start(db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &t);
    mco_trans_rollback(t);
    mco_db_disconnect(db);
    tp->finished = 1;
}


/* thread */
THREAD_PROC_DEFINE(QuoteWriter, p_)
{
    ThrParam* tp = (ThrParam*)p_;
    mco_trans_h t = 0;
    MCO_RET rc;

    mco_db_h db = tp->db;

    while (repeat)
    {

        MUTEX_LOCK(&mut);
        maskW[tp->num] = M_WAIT;
        MUTEX_UNLOCK(&mut);
        mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
        nwrites++;

        MUTEX_LOCK(&mut);
        maskW[tp->num] = M_IN_TRN;
        MUTEX_UNLOCK(&mut);

        /* do the job ... */

        Sleep(10);
        if ((++count) % DIVIDER == 0)
        {
            Sleep(10);
        }

        MUTEX_LOCK(&mut);
        maskW[tp->num] = M_FINISHING_TRN;
        MUTEX_UNLOCK(&mut);

        MUTEX_LOCK(&mut);

        Sleep(1);

        rc = mco_trans_commit(t);

        maskW[tp->num] = M_IDLE;
        MUTEX_UNLOCK(&mut);

        if (rc)
        {
            error(rc);
        }

        Sleep(WRITER_DELAY);
    }
    /* disconnect from the database, db is no longer valid */
    mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
    mco_trans_rollback(t);
    mco_db_disconnect(db);
    tp->finished = 1;
}


/* thread procedure which waits a key to stop test */
THREAD_PROC_DEFINE(StopWait, p)
{
    ThrParam* tp = (ThrParam*)p;

    printf("\nPress Enter to stop test\n");
    if (TestMode)
    {
        Sleep(TEST_TIMEOUT);
    }
    getchar();
    repeat = 0;
    tp->finished = 1;
}


/* thread which reports reader/writer threads status periodically */
THREAD_PROC_DEFINE(StatusReporter, p)
{
    short mskR[MAXREADERS];
    short mskW[MAXWRITERS];
    ThrParam* tp = (ThrParam*)p;

    /* these were copies */

    while (repeat)
    {
        int i;

        Sleep(REPORTER_DELAY);

        /* make copy of masks for analysis: */
        MUTEX_LOCK(&mut);
        for (i = 0; i < MAXREADERS; i++)
        {
            mskR[i] = maskR[i];
        }

        for (i = 0; i < MAXWRITERS; i++)
        {
            mskW[i] = maskW[i];
        }
        MUTEX_UNLOCK(&mut);

        /* first, check any conflicts: */
        {
            int nr = 0, nw = 0;

            for (i = 0; i < MAXREADERS; i++)
            {
                if ((mskR[i] != M_IDLE) && (mskR[i] != M_UPGRADED) && (mskR[i] != M_WAIT))
                {
                    nr++;
                }
            }

            for (i = 0; i < MAXWRITERS; i++)
            {
                if ((mskW[i] != M_IDLE) && (mskW[i] != M_WAIT))
                {
                    nw++;
                }
            }

            if (nr > 0 && nw > 0)
            {
                error(1000);
            }

            if (nw > 1)
            {
                error(1001);
            }
        }

        /* second, print status: */
        {
            char s[100];
            int ps = 0;

            for (i = 0; i < MAXREADERS; i++)
            {
                switch (mskR[i])
                {
                    case M_IDLE:
                        s[ps] = '-';
                        break;
                    case M_FINISHING_TRN:
                        s[ps] = '@';
                        break;
                    case M_WAIT:
                        s[ps] = '&';
                        break;
                    case M_IN_TRN:
                        s[ps] = '*';
                        break;
                    case M_UPGRADED:
                        s[ps] = '$';
                        break;
                    default:
                        s[ps] = '?';
                        break;
                }

                ps++;
            }

            s[ps++] = ' ';
            s[ps++] = ' ';

            for (i = 0; i < MAXWRITERS; i++)
            {
                switch (mskW[i])
                {
                    case M_IDLE:
                        s[ps] = '-';
                        break;
                    case M_FINISHING_TRN:
                        s[ps] = '@';
                        break;
                    case M_WAIT:
                        s[ps] = '&';
                        break;
                    case M_IN_TRN:
                        s[ps] = '*';
                        break;
                    default:
                        s[ps] = '?';
                        break;
                }

                ps++;
            }

            sprintf(s + ps, " %d writes %d reads %d(%d) upgrades\n", nwrites, nreads, nUpgradeTry, nUpgraded);

            /*s[ps++] = '\n'; 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -