⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cbroadcast.cpp

📁 window下的多线程编程参考书。值得一读
💻 CPP
字号:
#include "CMcl.h"
#include <stdio.h>
#include <assert.h>

class CBroadcastChannel {
private:
    enum { MAX_CLIENTS = 32 };

    typedef struct {
        DWORD dwClientMask;
        BYTE abData[1];    
    } SampleHeader;

private:
    DWORD m_dwMaxSamples;
    DWORD m_cbDataSize;
    DWORD m_cbSampleSize;
    SampleHeader *m_pSample;
    DWORD m_dwNextWrite;
    DWORD m_adwNextRead[MAX_CLIENTS];
    DWORD m_dwClientMask;
    CMclSemaphore m_csBuffersEmpty;    
    CMclSemaphoreAutoPtr m_apcsBuffersFull[MAX_CLIENTS];
    CMclMutex m_cmUpdateGuard;

public:
    CBroadcastChannel( DWORD dwMaxSamples, DWORD cbDataSize) : 
                m_csBuffersEmpty( dwMaxSamples, dwMaxSamples),
                m_cmUpdateGuard() {
        m_dwMaxSamples = dwMaxSamples;
        m_cbDataSize = cbDataSize;
        m_cbSampleSize = cbDataSize + sizeof(SampleHeader) - 1;
        m_pSample = (SampleHeader *) new BYTE[m_cbSampleSize * m_dwMaxSamples];
        ZeroMemory( m_pSample, m_cbSampleSize * m_dwMaxSamples);
        m_dwClientMask = 0;
        m_dwNextWrite = 0;
        ZeroMemory( m_adwNextRead, sizeof(DWORD) * MAX_CLIENTS);
    };

    DWORD Register(void) {
        // grab the guard mutex...
        CMclAutoLock lock(m_cmUpdateGuard);

        // find the next available ID,
        // MAX_CLIENTS ID's are available, at each bit position
        // of a DWORD...
        for (DWORD dwMask = 1, dwId = 0; dwMask; dwMask <<= 1, dwId++) {
            if (!(dwMask & m_dwClientMask)) {
                // mark the bit in the client mask...
                m_dwClientMask |= dwMask;

                // allocate a semaphore...
                m_apcsBuffersFull[dwId] = new CMclSemaphore( 0, m_dwMaxSamples);

                // reading will start with the NEXT sample written
                // into the broadcast channel...
                m_adwNextRead[dwId] = m_dwNextWrite;

                // return the id...
                return dwId;
            }
        }

        // return 0xFFFFFFFF if we are servicing
        // the maximum number of clients...
        return 0xFFFFFFFF;
    };

    void Unregister(DWORD dwId) {
        // grab the guard mutex...
        CMclAutoLock lock(m_cmUpdateGuard);

        // remove this client from all of the samples...        
        DWORD dwSample;
        DWORD dwReleaseCount = 0;
        DWORD dwMask = 1 << dwId;
        for (dwSample = 0; dwSample < m_dwMaxSamples; dwSample++) {
            SampleHeader *pNextSample = (SampleHeader *) ((LPBYTE)m_pSample + m_cbSampleSize * dwSample);
            if (pNextSample->dwClientMask & dwMask) {
                pNextSample->dwClientMask &= ~dwMask;
                if (pNextSample->dwClientMask == 0) {
                    // this sample has no waiting clients
                    // and can be freed...
                    dwReleaseCount++;
                }
            }

        }

        // release the semaphore equal to the number
        // of samples freed...
        if (dwReleaseCount > 0) {
            m_csBuffersEmpty.Release(dwReleaseCount);
        }

        // zero out the bit for this client ID...
        m_dwClientMask &= ~dwMask;
    };

    BOOL ReadSample( DWORD dwId, LPVOID pSample) {
        // compute the semaphore index from the id
        // and wait until some buffers are ready to be read...
        m_apcsBuffersFull[dwId]->Wait(INFINITE);
        
        // read the next sample for this client...
        DWORD dwSample = m_adwNextRead[dwId];
        SampleHeader *pNextSample = (SampleHeader *) ((LPBYTE)m_pSample + m_cbSampleSize * dwSample);

        DWORD dwMask = 1 << dwId;
        if (pNextSample->dwClientMask & dwMask) {
            // found a sample, copy out the data...
            // this sample can be read by multiple
            // clients at the same time...
            CopyMemory( pSample, pNextSample->abData, m_cbDataSize);

            // now we need to modify and check the client
            // mask for this same, so we must acquire the
            // guard mutex...
            m_cmUpdateGuard.Wait(INFINITE);

            // mark the sample as read by this client...
            pNextSample->dwClientMask &= ~dwMask;

            // release this sample if everyone has read it...
            if (pNextSample->dwClientMask == 0) {
                m_csBuffersEmpty.Release(1);
            }

            // increment the read pointer...
            m_adwNextRead[dwId] = (m_adwNextRead[dwId] + 1) % m_dwMaxSamples;

            // release the guard mutex...
            m_cmUpdateGuard.Release();

            // return success...
            return TRUE;
        }

        // something went wrong...
        return FALSE;
    };

    BOOL WriteSample( LPVOID pSample) {
        // wait for a empty sample buffer to become available...
        // we will be modifying the sample so we
        // need to acquire the guard mutex as well...
        m_csBuffersEmpty.WaitForTwo( m_cmUpdateGuard, TRUE, INFINITE);

        // only broadcast this sample if there are clients...
        BOOL bSampleWritten;
        if (m_dwClientMask) {
            // compute the pointer to the next sample...
            SampleHeader *pNextSample = (SampleHeader *) ((LPBYTE)m_pSample + m_cbSampleSize * m_dwNextWrite);
        
            // set all the current client bits...
            pNextSample->dwClientMask = m_dwClientMask;

            // copy the data into the sample...
            CopyMemory( pNextSample->abData, pSample, m_cbDataSize);

            // increment the write index...
            m_dwNextWrite = (m_dwNextWrite + 1) % m_dwMaxSamples;

            // increase the semaphore counts of all the readers...
            DWORD dwIndex;
            DWORD dwMask;
            for (dwIndex = 0, dwMask = 1; dwIndex < MAX_CLIENTS; dwMask <<= 1, dwIndex++) {
                if (m_dwClientMask & dwMask)
                    m_apcsBuffersFull[dwIndex]->Release(1);
            }

            bSampleWritten = TRUE;
        }
        else {
            // throw away the sample, release the buffer...
            m_csBuffersEmpty.Release(1);
            bSampleWritten = FALSE;
        }

        // release the guard mutex...
        m_cmUpdateGuard.Release();

        // return status indicates if sample was written...
        return bSampleWritten;
    };
};

class MultiConsumerHandler : public CMclThreadHandler {
private:
    CMclThreadAutoPtr m_apThread;
    BOOL m_bRun;
    CBroadcastChannel *m_pBroadcast;
    DWORD m_dwId;

public:
    MultiConsumerHandler( CBroadcastChannel *pBroadcast) {
        m_bRun = TRUE;
        m_pBroadcast = pBroadcast;
        m_apThread = new CMclThread(this);
    };

    void Stop(void) {
        m_bRun = FALSE;
        m_apThread->Wait(INFINITE);
    };

    unsigned ThreadHandlerProc(void) {
        // register with the broadcast channel
        // before we read from it...
        m_dwId = m_pBroadcast->Register();

        printf( "Consumer #%d Started.\n", m_dwId);

        while (m_bRun) {
            DWORD dwSample;
            BOOL bStatus = m_pBroadcast->ReadSample( m_dwId, &dwSample);
            assert(bStatus);
            printf( "Handler ID=%d has read sample <%d>.\n", m_dwId, dwSample);
            Sleep(m_dwId*100);
        }

        // unregister when we are done with the
        // broadcast channel...
        m_pBroadcast->Unregister(m_dwId);

        printf( "Consumer #%d Stopped.\n", m_dwId);

        return 0;
    };
    
};

class ProducerHandler : public CMclThreadHandler {
private:
    CMclThreadAutoPtr m_apThread;
    BOOL m_bRun;
    CBroadcastChannel *m_pBroadcast;

public:
    ProducerHandler( CBroadcastChannel *pBroadcast) {
        m_bRun = TRUE;
        m_pBroadcast = pBroadcast;
        m_apThread = new CMclThread(this);
    };

    void Stop(void) {
        m_bRun = FALSE;
        m_apThread->Wait(INFINITE);
    };

    unsigned ThreadHandlerProc(void) {
        printf( "Producer Started.\n");

        DWORD dwData = 0;
        while (m_bRun) {
            // write data into the broadcast channel
            // until we are told to stop...
            m_pBroadcast->WriteSample( &dwData);
            dwData++;
        }

        printf( "Producer Stopped.\n");
        
        return 0;
    }
};

// define some constants for the main() function.,.
#define NUMBER_CONSUMERS 8
#define NUMBER_CHANNEL_BUFFERS 4

int main( int argc, char *argv[]) {
    CBroadcastChannel broadcast( NUMBER_CHANNEL_BUFFERS, sizeof(DWORD));
    ProducerHandler *pProducer;
    MultiConsumerHandler *pConsumer[NUMBER_CONSUMERS];
    
    // create the producer...
    printf("Creating the Producer...\n");
    pProducer = new ProducerHandler( &broadcast);

    // create nConsumers consumers...
    int i;
    for (i = 0; i < NUMBER_CONSUMERS; i++) {
        printf( "Creating Consumer #%d...\n", i);
        pConsumer[i] = new MultiConsumerHandler( &broadcast);
    }

    // let everything run for a little while...
    Sleep(2000);

    // stop the consumers...
    for (i = 0; i < NUMBER_CONSUMERS; i++) {
        printf( "Stopping Consumer #%d...\n", i);
        pConsumer[i]->Stop();
        delete pConsumer[i];
    }

    // stop the producer...
    printf("Stopping the Producer...\n");
    pProducer->Stop();
    delete pProducer;

    // all done...
    printf("Exiting.\n");
    return 0;
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -