⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiwait.c

📁 Netscape NSPR库源码
💻 C
📖 第 1 页 / 共 2 页
字号:
        MW_ASSERT(PR_SUCCESS == rv);    }    if (verbosity > quiet)        PR_fprintf(debug, "%s: sleeping\n", shared->title);    while (ops_done < ops_required) PR_Sleep(shared->timeout);    if (verbosity > quiet)        PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);    for (index = 0; index < worker_threads; ++index)    {        rv = PR_Interrupt(thread[index]);        MW_ASSERT(PR_SUCCESS == rv);        rv = PR_JoinThread(thread[index]);        MW_ASSERT(PR_SUCCESS == rv);    }    PR_DELETE(thread);    CancelGroup(shared);}  /* SomeOpsSomeThreads */static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc){    PRInt32 bytes_out;    if (verbosity > chatty)        PR_fprintf(            debug, "%s: Service received %d bytes\n",            shared->title, desc->bytesRecv);    if (0 == desc->bytesRecv) goto quitting;    if ((-1 == desc->bytesRecv)    && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;    bytes_out = PR_Send(        desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);    if (verbosity > chatty)        PR_fprintf(            debug, "%s: Service sent %d bytes\n",            shared->title, bytes_out);    if ((-1 == bytes_out)    && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;    MW_ASSERT(bytes_out == desc->bytesRecv);    return PR_SUCCESS;aborted:quitting:    return PR_FAILURE;}  /* ServiceRequest */static void PR_CALLBACK ServiceThread(void *arg){    PRStatus rv = PR_SUCCESS;    PRRecvWait *desc_out = NULL;    Shared *shared = (Shared*)arg;    do  /* until interrupted */    {        if (NULL != desc_out)        {            desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;            if (verbosity > chatty)                PrintRecvDesc(desc_out, "Service re-adding");            rv = PR_AddWaitFileDesc(shared->group, desc_out);            MW_ASSERT(PR_SUCCESS == rv);        }        desc_out = PR_WaitRecvReady(shared->group);        if (NULL == desc_out)        {            MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());            break;        }        switch (desc_out->outcome)        {            case PR_MW_SUCCESS:            {                PR_AtomicIncrement(&ops_done);                if (verbosity > chatty)                    PrintRecvDesc(desc_out, "Service ready");                rv = ServiceRequest(shared, desc_out);                break;            }            case PR_MW_INTERRUPT:                MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());                rv = PR_FAILURE;  /* if interrupted, then exit */                break;            case PR_MW_TIMEOUT:                MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());            case PR_MW_FAILURE:                if (verbosity > silent)                    PL_FPrintError(debug, "RecvReady failure");                break;            default:                break;        }    } while (PR_SUCCESS == rv);    if (NULL != desc_out) DestroyRecvWait(desc_out);}  /* ServiceThread */static void PR_CALLBACK EnumerationThread(void *arg){    PRStatus rv;    PRIntn count;    PRRecvWait *desc;    Shared *shared = (Shared*)arg;    PRIntervalTime five_seconds = PR_SecondsToInterval(5);    PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);    MW_ASSERT(NULL != enumerator);    while (PR_SUCCESS == PR_Sleep(five_seconds))    {        count = 0;        desc = NULL;        while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))        {            if (verbosity > chatty) PrintRecvDesc(desc, shared->title);            count += 1;        }        if (verbosity > silent)            PR_fprintf(debug,                "%s Enumerated %d objects\n", shared->title, count);    }    MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());    rv = PR_DestroyMWaitEnumerator(enumerator);    MW_ASSERT(PR_SUCCESS == rv);}  /* EnumerationThread */static void PR_CALLBACK ServerThread(void *arg){    PRStatus rv;    PRIntn index;    PRRecvWait *desc_in;    PRThread **worker_thread;    Shared *shared = (Shared*)arg;    PRFileDesc *listener, *service;    PRNetAddr server_address, client_address;    worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);    if (verbosity > quiet)        PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);    for (index = 0; index < worker_threads; ++index)    {        worker_thread[index] = PR_CreateThread(            PR_USER_THREAD, ServiceThread, shared,            PR_PRIORITY_HIGH, thread_scope,            PR_JOINABLE_THREAD, 16 * 1024);    }    rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);    MW_ASSERT(PR_SUCCESS == rv);    listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);    if (verbosity > chatty)        PR_fprintf(            debug, "%s: Server listener socket @0x%x\n",            shared->title, listener);    rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);    rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);    while (ops_done < ops_required)    {        if (verbosity > quiet)            PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);        service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);        if (NULL == service)        {            if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;            PL_PrintError("Accept failed");            MW_ASSERT(!"Accept failed");        }        else        {            desc_in = CreateRecvWait(service, shared->timeout);            desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;            if (verbosity > chatty)                PrintRecvDesc(desc_in, "Service adding");            rv = PR_AddWaitFileDesc(shared->group, desc_in);            MW_ASSERT(PR_SUCCESS == rv);        }    }    if (verbosity > quiet)        PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);    for (index = 0; index < worker_threads; ++index)    {        rv = PR_Interrupt(worker_thread[index]);        MW_ASSERT(PR_SUCCESS == rv);        rv = PR_JoinThread(worker_thread[index]);        MW_ASSERT(PR_SUCCESS == rv);    }    PR_DELETE(worker_thread);    PR_Close(listener);    CancelGroup(shared);}  /* ServerThread */static void RealOneGroupIO(Shared *shared){    /*    ** Create a server that listens for connections and then services    ** requests that come in over those connections. The server never    ** deletes a connection and assumes a basic RPC model of operation.    **    ** Use worker_threads threads to service how every many open ports    ** there might be.    **    ** Oh, ya. Almost forget. Create (some) clients as well.    */    PRStatus rv;    PRIntn index;    PRThread *server_thread, *enumeration_thread, **client_thread;    if (verbosity > quiet)        PR_fprintf(debug, "%s: creating server_thread\n", shared->title);    server_thread = PR_CreateThread(        PR_USER_THREAD, ServerThread, shared,        PR_PRIORITY_HIGH, thread_scope,        PR_JOINABLE_THREAD, 16 * 1024);    if (verbosity > quiet)        PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);    enumeration_thread = PR_CreateThread(        PR_USER_THREAD, EnumerationThread, shared,        PR_PRIORITY_HIGH, thread_scope,        PR_JOINABLE_THREAD, 16 * 1024);    if (verbosity > quiet)        PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);    PR_Sleep(5 * shared->timeout);    if (verbosity > quiet)        PR_fprintf(debug, "%s: creating client_threads\n", shared->title);    client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);    for (index = 0; index < client_threads; ++index)    {        client_thread[index] = PR_CreateThread(            PR_USER_THREAD, ClientThread, shared,            PR_PRIORITY_NORMAL, thread_scope,            PR_JOINABLE_THREAD, 16 * 1024);    }    while (ops_done < ops_required) PR_Sleep(shared->timeout);    if (verbosity > quiet)        PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);    for (index = 0; index < client_threads; ++index)    {        rv = PR_Interrupt(client_thread[index]);        MW_ASSERT(PR_SUCCESS == rv);        rv = PR_JoinThread(client_thread[index]);        MW_ASSERT(PR_SUCCESS == rv);    }    PR_DELETE(client_thread);    if (verbosity > quiet)        PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);    rv = PR_Interrupt(enumeration_thread);    MW_ASSERT(PR_SUCCESS == rv);    rv = PR_JoinThread(enumeration_thread);    MW_ASSERT(PR_SUCCESS == rv);    if (verbosity > quiet)        PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);    rv = PR_Interrupt(server_thread);    MW_ASSERT(PR_SUCCESS == rv);    rv = PR_JoinThread(server_thread);    MW_ASSERT(PR_SUCCESS == rv);}  /* RealOneGroupIO */static void RunThisOne(    void (*func)(Shared*), const char *name, const char *test_name){    Shared *shared;    if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))    {        if (verbosity > silent)            PR_fprintf(debug, "%s()\n", name);        shared = MakeShared(name);        ops_done = 0;        func(shared);  /* run the test */        MW_ASSERT(0 == desc_allocated);        DestroyShared(shared);    }}  /* RunThisOne */static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta){    PRIntn verbage = (PRIntn)verbosity;    return (Verbosity)(verbage += delta);}  /* ChangeVerbosity */PRIntn main(PRIntn argc, char **argv){    PLOptStatus os;    const char *test_name = NULL;    PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");    while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))    {        if (PL_OPT_BAD == os) continue;        switch (opt->option)        {        case 0:            test_name = opt->value;            break;        case 'd':  /* debug mode */            if (verbosity < noisy)                verbosity = ChangeVerbosity(verbosity, 1);            break;        case 'q':  /* debug mode */            if (verbosity > silent)                verbosity = ChangeVerbosity(verbosity, -1);            break;        case 'G':  /* use global threads */            thread_scope = PR_GLOBAL_THREAD;            break;        case 'c':  /* number of client threads */            client_threads = atoi(opt->value);            break;        case 'o':  /* operations to compelete */            ops_required = atoi(opt->value);            break;        case 'p':  /* default port */            default_port = atoi(opt->value);            break;        case 't':  /* number of threads waiting */            worker_threads = atoi(opt->value);            break;        case 'w':  /* number of wait objects */            wait_objects = atoi(opt->value);            break;        default:            break;        }    }    PL_DestroyOptState(opt);    if (verbosity > 0)        debug = PR_GetSpecialFD(PR_StandardError);    RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);    RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);    RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);    RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);    RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);    return 0;}  /* main *//* multwait.c */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -