⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiwait.c

📁 Netscape NSPR库源码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//*  * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ *  * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. *  * The Original Code is the Netscape Portable Runtime (NSPR). *  * The Initial Developer of the Original Code is Netscape * Communications Corporation.  Portions created by Netscape are  * Copyright (C) 1998-2000 Netscape Communications Corporation.  All * Rights Reserved. *  * Contributor(s): *  * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable  * instead of those above.  If you wish to allow use of your  * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL.  If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. */#include "prio.h"#include "prprf.h"#include "prlog.h"#include "prmem.h"#include "pratom.h"#include "prlock.h"#include "prmwait.h"#include "prclist.h"#include "prerror.h"#include "prinrval.h"#include "prnetdb.h"#include "prthread.h"#include "plstr.h"#include "plerror.h"#include "plgetopt.h"#include <string.h>typedef struct Shared{    const char *title;    PRLock *list_lock;    PRWaitGroup *group;    PRIntervalTime timeout;} Shared;typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;static PRFileDesc *debug = NULL;static PRInt32 desc_allocated = 0;static PRUint16 default_port = 12273;static enum Verbosity verbosity = quiet;static PRInt32 ops_required = 1000, ops_done = 0;static PRThreadScope thread_scope = PR_LOCAL_THREAD;static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;#if defined(DEBUG)#define MW_ASSERT(_expr) \    ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))static void _MW_Assert(const char *s, const char *file, PRIntn ln){    if (NULL != debug) PL_FPrintError(debug, NULL);    PR_Assert(s, file, ln);}  /* _MW_Assert */#else#define MW_ASSERT(_expr)#endifstatic void PrintRecvDesc(PRRecvWait *desc, const char *msg){    const char *tag[] = {        "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",        "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};    PR_fprintf(        debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",        msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);}  /* PrintRecvDesc */static Shared *MakeShared(const char *title){    Shared *shared = PR_NEWZAP(Shared);    shared->group = PR_CreateWaitGroup(1);    shared->timeout = PR_SecondsToInterval(1);    shared->list_lock = PR_NewLock();    shared->title = title;    return shared;}  /* MakeShared */static void DestroyShared(Shared *shared){    PRStatus rv;    if (verbosity > quiet)        PR_fprintf(debug, "%s: destroying group\n", shared->title);    rv = PR_DestroyWaitGroup(shared->group);    MW_ASSERT(PR_SUCCESS == rv);    PR_DestroyLock(shared->list_lock);    PR_DELETE(shared);}  /* DestroyShared */static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout){    PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);    MW_ASSERT(NULL != desc_out);    MW_ASSERT(NULL != fd);    desc_out->fd = fd;    desc_out->timeout = timeout;    desc_out->buffer.length = 120;    desc_out->buffer.start = PR_CALLOC(120);    PR_AtomicIncrement(&desc_allocated);    if (verbosity > chatty)        PrintRecvDesc(desc_out, "Allocated");    return desc_out;}  /* CreateRecvWait */static void DestroyRecvWait(PRRecvWait *desc_out){    if (verbosity > chatty)        PrintRecvDesc(desc_out, "Destroying");    PR_Close(desc_out->fd);    if (NULL != desc_out->buffer.start)        PR_DELETE(desc_out->buffer.start);    PR_Free(desc_out);    (void)PR_AtomicDecrement(&desc_allocated);}  /* DestroyRecvWait */static void CancelGroup(Shared *shared){    PRRecvWait *desc_out;    if (verbosity > quiet)        PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);    do    {        desc_out = PR_CancelWaitGroup(shared->group);        if (NULL != desc_out) DestroyRecvWait(desc_out);    } while (NULL != desc_out);    MW_ASSERT(0 == desc_allocated);    MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());}  /* CancelGroup */static void PR_CALLBACK ClientThread(void* arg){    PRStatus rv;    PRInt32 bytes;    PRIntn empty_flags = 0;    PRNetAddr server_address;    unsigned char buffer[100];    Shared *shared = (Shared*)arg;    PRFileDesc *server = PR_NewTCPSocket();    if ((NULL == server)    && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;    MW_ASSERT(NULL != server);    if (verbosity > chatty)        PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);    /* Initialize the buffer so that Purify won't complain */    memset(buffer, 0, sizeof(buffer));    rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);    MW_ASSERT(PR_SUCCESS == rv);    if (verbosity > quiet)        PR_fprintf(debug, "%s: Client opening connection\n", shared->title);    rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);    if (PR_FAILURE == rv)    {        if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");        return;    }    while (ops_done < ops_required)    {        bytes = PR_Send(            server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);        if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;        MW_ASSERT(sizeof(buffer) == bytes);        if (verbosity > chatty)            PR_fprintf(                debug, "%s: Client sent %d bytes\n",                shared->title, sizeof(buffer));        bytes = PR_Recv(            server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);        if (verbosity > chatty)            PR_fprintf(                debug, "%s: Client received %d bytes\n",                shared->title, sizeof(buffer));        if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;        MW_ASSERT(sizeof(buffer) == bytes);        PR_Sleep(shared->timeout);    }    rv = PR_Close(server);    MW_ASSERT(PR_SUCCESS == rv);}  /* ClientThread */static void OneInThenCancelled(Shared *shared){    PRStatus rv;    PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);    shared->timeout = PR_INTERVAL_NO_TIMEOUT;    desc_in->fd = PR_NewTCPSocket();    desc_in->timeout = shared->timeout;    if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");    rv = PR_AddWaitFileDesc(shared->group, desc_in);    MW_ASSERT(PR_SUCCESS == rv);    if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");    rv = PR_CancelWaitFileDesc(shared->group, desc_in);    MW_ASSERT(PR_SUCCESS == rv);    desc_out = PR_WaitRecvReady(shared->group);    MW_ASSERT(desc_out == desc_in);    MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);    MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());    if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");    rv = PR_Close(desc_in->fd);    MW_ASSERT(PR_SUCCESS == rv);    if (verbosity > quiet)        PR_fprintf(debug, "%s: destroying group\n", shared->title);    PR_DELETE(desc_in);}  /* OneInThenCancelled */static void OneOpOneThread(Shared *shared){    PRStatus rv;    PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);    desc_in->fd = PR_NewTCPSocket();    desc_in->timeout = shared->timeout;    if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");    rv = PR_AddWaitFileDesc(shared->group, desc_in);    MW_ASSERT(PR_SUCCESS == rv);    desc_out = PR_WaitRecvReady(shared->group);    MW_ASSERT(desc_out == desc_in);    MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);    MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());    if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");    rv = PR_Close(desc_in->fd);    MW_ASSERT(PR_SUCCESS == rv);    PR_DELETE(desc_in);}  /* OneOpOneThread */static void ManyOpOneThread(Shared *shared){    PRStatus rv;    PRIntn index;    PRRecvWait *desc_in;    PRRecvWait *desc_out;    if (verbosity > quiet)        PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);    for (index = 0; index < wait_objects; ++index)    {        desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);        rv = PR_AddWaitFileDesc(shared->group, desc_in);        MW_ASSERT(PR_SUCCESS == rv);    }    while (ops_done < ops_required)    {        desc_out = PR_WaitRecvReady(shared->group);        MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);        MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());        if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");        rv = PR_AddWaitFileDesc(shared->group, desc_out);        MW_ASSERT(PR_SUCCESS == rv);        (void)PR_AtomicIncrement(&ops_done);    }    CancelGroup(shared);}  /* ManyOpOneThread */static void PR_CALLBACK SomeOpsThread(void *arg){    PRRecvWait *desc_out;    PRStatus rv = PR_SUCCESS;    Shared *shared = (Shared*)arg;    do  /* until interrupted */    {        desc_out = PR_WaitRecvReady(shared->group);        if (NULL == desc_out)        {            MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());            if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");            break;        }        MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);        MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());        if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");        if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");        desc_out->timeout = shared->timeout;        rv = PR_AddWaitFileDesc(shared->group, desc_out);        PR_AtomicIncrement(&ops_done);        if (ops_done > ops_required) break;    } while (PR_SUCCESS == rv);    MW_ASSERT(PR_SUCCESS == rv);}  /* SomeOpsThread */static void SomeOpsSomeThreads(Shared *shared){    PRStatus rv;    PRThread **thread;    PRIntn index;    PRRecvWait *desc_in;    thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);    /* Create some threads */    if (verbosity > quiet)        PR_fprintf(debug, "%s: creating threads\n", shared->title);    for (index = 0; index < worker_threads; ++index)    {        thread[index] = PR_CreateThread(            PR_USER_THREAD, SomeOpsThread, shared,            PR_PRIORITY_HIGH, thread_scope,            PR_JOINABLE_THREAD, 16 * 1024);    }    /* then create some operations */    if (verbosity > quiet)        PR_fprintf(debug, "%s: creating desc\n", shared->title);    for (index = 0; index < wait_objects; ++index)    {        desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);        rv = PR_AddWaitFileDesc(shared->group, desc_in);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -