📄 multiwait.c
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is the Netscape Portable Runtime (NSPR). * * The Initial Developer of the Original Code is Netscape * Communications Corporation. Portions created by Netscape are * Copyright (C) 1998-2000 Netscape Communications Corporation. All * Rights Reserved. * * Contributor(s): * * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable * instead of those above. If you wish to allow use of your * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL. If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. */#include "prio.h"#include "prprf.h"#include "prlog.h"#include "prmem.h"#include "pratom.h"#include "prlock.h"#include "prmwait.h"#include "prclist.h"#include "prerror.h"#include "prinrval.h"#include "prnetdb.h"#include "prthread.h"#include "plstr.h"#include "plerror.h"#include "plgetopt.h"#include <string.h>typedef struct Shared{ const char *title; PRLock *list_lock; PRWaitGroup *group; PRIntervalTime timeout;} Shared;typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;static PRFileDesc *debug = NULL;static PRInt32 desc_allocated = 0;static PRUint16 default_port = 12273;static enum Verbosity verbosity = quiet;static PRInt32 ops_required = 1000, ops_done = 0;static PRThreadScope thread_scope = PR_LOCAL_THREAD;static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;#if defined(DEBUG)#define MW_ASSERT(_expr) \ ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))static void _MW_Assert(const char *s, const char *file, PRIntn ln){ if (NULL != debug) PL_FPrintError(debug, NULL); PR_Assert(s, file, ln);} /* _MW_Assert */#else#define MW_ASSERT(_expr)#endifstatic void PrintRecvDesc(PRRecvWait *desc, const char *msg){ const char *tag[] = { "PR_MW_INTERRUPT", "PR_MW_TIMEOUT", "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"}; PR_fprintf( debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);} /* PrintRecvDesc */static Shared *MakeShared(const char *title){ Shared *shared = PR_NEWZAP(Shared); shared->group = PR_CreateWaitGroup(1); shared->timeout = PR_SecondsToInterval(1); shared->list_lock = PR_NewLock(); shared->title = title; return shared;} /* MakeShared */static void DestroyShared(Shared *shared){ PRStatus rv; if (verbosity > quiet) PR_fprintf(debug, "%s: destroying group\n", shared->title); rv = PR_DestroyWaitGroup(shared->group); MW_ASSERT(PR_SUCCESS == rv); PR_DestroyLock(shared->list_lock); PR_DELETE(shared);} /* DestroyShared */static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout){ PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait); MW_ASSERT(NULL != desc_out); MW_ASSERT(NULL != fd); desc_out->fd = fd; desc_out->timeout = timeout; desc_out->buffer.length = 120; desc_out->buffer.start = PR_CALLOC(120); PR_AtomicIncrement(&desc_allocated); if (verbosity > chatty) PrintRecvDesc(desc_out, "Allocated"); return desc_out;} /* CreateRecvWait */static void DestroyRecvWait(PRRecvWait *desc_out){ if (verbosity > chatty) PrintRecvDesc(desc_out, "Destroying"); PR_Close(desc_out->fd); if (NULL != desc_out->buffer.start) PR_DELETE(desc_out->buffer.start); PR_Free(desc_out); (void)PR_AtomicDecrement(&desc_allocated);} /* DestroyRecvWait */static void CancelGroup(Shared *shared){ PRRecvWait *desc_out; if (verbosity > quiet) PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); do { desc_out = PR_CancelWaitGroup(shared->group); if (NULL != desc_out) DestroyRecvWait(desc_out); } while (NULL != desc_out); MW_ASSERT(0 == desc_allocated); MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());} /* CancelGroup */static void PR_CALLBACK ClientThread(void* arg){ PRStatus rv; PRInt32 bytes; PRIntn empty_flags = 0; PRNetAddr server_address; unsigned char buffer[100]; Shared *shared = (Shared*)arg; PRFileDesc *server = PR_NewTCPSocket(); if ((NULL == server) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return; MW_ASSERT(NULL != server); if (verbosity > chatty) PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); /* Initialize the buffer so that Purify won't complain */ memset(buffer, 0, sizeof(buffer)); rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); MW_ASSERT(PR_SUCCESS == rv); if (verbosity > quiet) PR_fprintf(debug, "%s: Client opening connection\n", shared->title); rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); if (PR_FAILURE == rv) { if (verbosity > silent) PL_FPrintError(debug, "Client connect failed"); return; } while (ops_done < ops_required) { bytes = PR_Send( server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; MW_ASSERT(sizeof(buffer) == bytes); if (verbosity > chatty) PR_fprintf( debug, "%s: Client sent %d bytes\n", shared->title, sizeof(buffer)); bytes = PR_Recv( server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); if (verbosity > chatty) PR_fprintf( debug, "%s: Client received %d bytes\n", shared->title, sizeof(buffer)); if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; MW_ASSERT(sizeof(buffer) == bytes); PR_Sleep(shared->timeout); } rv = PR_Close(server); MW_ASSERT(PR_SUCCESS == rv);} /* ClientThread */static void OneInThenCancelled(Shared *shared){ PRStatus rv; PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); shared->timeout = PR_INTERVAL_NO_TIMEOUT; desc_in->fd = PR_NewTCPSocket(); desc_in->timeout = shared->timeout; if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); rv = PR_AddWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling"); rv = PR_CancelWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); desc_out = PR_WaitRecvReady(shared->group); MW_ASSERT(desc_out == desc_in); MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); rv = PR_Close(desc_in->fd); MW_ASSERT(PR_SUCCESS == rv); if (verbosity > quiet) PR_fprintf(debug, "%s: destroying group\n", shared->title); PR_DELETE(desc_in);} /* OneInThenCancelled */static void OneOpOneThread(Shared *shared){ PRStatus rv; PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); desc_in->fd = PR_NewTCPSocket(); desc_in->timeout = shared->timeout; if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); rv = PR_AddWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); desc_out = PR_WaitRecvReady(shared->group); MW_ASSERT(desc_out == desc_in); MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); rv = PR_Close(desc_in->fd); MW_ASSERT(PR_SUCCESS == rv); PR_DELETE(desc_in);} /* OneOpOneThread */static void ManyOpOneThread(Shared *shared){ PRStatus rv; PRIntn index; PRRecvWait *desc_in; PRRecvWait *desc_out; if (verbosity > quiet) PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); for (index = 0; index < wait_objects; ++index) { desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); rv = PR_AddWaitFileDesc(shared->group, desc_in); MW_ASSERT(PR_SUCCESS == rv); } while (ops_done < ops_required) { desc_out = PR_WaitRecvReady(shared->group); MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding"); rv = PR_AddWaitFileDesc(shared->group, desc_out); MW_ASSERT(PR_SUCCESS == rv); (void)PR_AtomicIncrement(&ops_done); } CancelGroup(shared);} /* ManyOpOneThread */static void PR_CALLBACK SomeOpsThread(void *arg){ PRRecvWait *desc_out; PRStatus rv = PR_SUCCESS; Shared *shared = (Shared*)arg; do /* until interrupted */ { desc_out = PR_WaitRecvReady(shared->group); if (NULL == desc_out) { MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); if (verbosity > quiet) PR_fprintf(debug, "Aborted\n"); break; } MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding"); desc_out->timeout = shared->timeout; rv = PR_AddWaitFileDesc(shared->group, desc_out); PR_AtomicIncrement(&ops_done); if (ops_done > ops_required) break; } while (PR_SUCCESS == rv); MW_ASSERT(PR_SUCCESS == rv);} /* SomeOpsThread */static void SomeOpsSomeThreads(Shared *shared){ PRStatus rv; PRThread **thread; PRIntn index; PRRecvWait *desc_in; thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); /* Create some threads */ if (verbosity > quiet) PR_fprintf(debug, "%s: creating threads\n", shared->title); for (index = 0; index < worker_threads; ++index) { thread[index] = PR_CreateThread( PR_USER_THREAD, SomeOpsThread, shared, PR_PRIORITY_HIGH, thread_scope, PR_JOINABLE_THREAD, 16 * 1024); } /* then create some operations */ if (verbosity > quiet) PR_fprintf(debug, "%s: creating desc\n", shared->title); for (index = 0; index < wait_objects; ++index) { desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); rv = PR_AddWaitFileDesc(shared->group, desc_in);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -