📄 provider.c
字号:
TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("\t\tWorker(0x%p): exiting [%u]\n", PR_CurrentThread(), pool->workers)); PR_Lock(server->ml); pool->workers -= 1; /* undefine our existance */ PR_REMOVE_AND_INIT_LINK(&worker->element); PR_NotifyCondVar(pool->exiting); PR_Unlock(server->ml); PR_DELETE(worker); /* destruction of the "worker" object */} /* Worker */static void PR_CALLBACK Server(void *arg){ PRStatus rv; PRNetAddr serverAddress; CSServer_t *server = (CSServer_t*)arg; PRThread *me = server->thread = PR_CurrentThread(); server->listener = PR_Socket(domain, SOCK_STREAM, protocol); memset(&serverAddress, 0, sizeof(serverAddress)); rv = PR_InitializeNetAddr(PR_IpAddrAny, DEFAULT_PORT, &serverAddress); rv = PR_Bind(server->listener, &serverAddress); TEST_ASSERT(PR_SUCCESS == rv); rv = PR_Listen(server->listener, server->backlog); TEST_ASSERT(PR_SUCCESS == rv); server->started = PR_IntervalNow(); TimeOfDayMessage("Server started at", me); PR_Lock(server->ml); server->state = cs_run; PR_NotifyCondVar(server->stateChange); PR_Unlock(server->ml); /* ** Create the first worker (actually, a thread that accepts ** connections and then processes the work load as needed). ** From this point on, additional worker threads are created ** as they are needed by existing worker threads. */ rv = CreateWorker(server, &server->pool); TEST_ASSERT(PR_SUCCESS == rv); /* ** From here on this thread is merely hanging around as the contact ** point for the main test driver. It's just waiting for the driver ** to declare the test complete. */ TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tServer(0x%p): waiting for state change\n", me)); PR_Lock(server->ml); while ((cs_run == server->state) && !Aborted(rv)) { rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); } PR_Unlock(server->ml); PR_ClearInterrupt(); TEST_LOG( cltsrv_log_file, TEST_LOG_INFO, ("\tServer(0x%p): shutting down workers\n", me)); /* ** Get all the worker threads to exit. They know how to ** clean up after themselves, so this is just a matter of ** waiting for clorine in the pool to take effect. During ** this stage we're ignoring interrupts. */ server->workers.minimum = server->workers.maximum = 0; PR_Lock(server->ml); while (!PR_CLIST_IS_EMPTY(&server->list)) { PRCList *head = PR_LIST_HEAD(&server->list); CSWorker_t *worker = (CSWorker_t*)head; TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker)); rv = PR_Interrupt(worker->thread); TEST_ASSERT(PR_SUCCESS == rv); PR_REMOVE_AND_INIT_LINK(head); } while (server->pool.workers > 0) { TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("\tServer(0x%p): waiting for %u workers to exit\n", me, server->pool.workers)); (void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT); } server->state = cs_exit; PR_NotifyCondVar(server->stateChange); PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_ALWAYS, ("\tServer(0x%p): stopped after %u operations and %u bytes\n", me, server->operations, server->bytesTransferred)); if (NULL != server->listener) PR_Close(server->listener); server->stopped = PR_IntervalNow();} /* Server */static void WaitForCompletion(PRIntn execution){ while (execution > 0) { PRIntn dally = (execution > 30) ? 30 : execution; PR_Sleep(PR_SecondsToInterval(dally)); if (pthread_stats) PT_FPrintStats(debug_out, "\nPThread Statistics\n"); execution -= dally; }} /* WaitForCompletion */static void Help(void){ PR_fprintf(debug_out, "cltsrv test program usage:\n"); PR_fprintf(debug_out, "\t-a <n> threads allowed in accept (5)\n"); PR_fprintf(debug_out, "\t-b <n> backlock for listen (5)\n"); PR_fprintf(debug_out, "\t-c <threads> number of clients to create (1)\n"); PR_fprintf(debug_out, "\t-w <threads> minimal number of server threads (1)\n"); PR_fprintf(debug_out, "\t-W <threads> maximum number of server threads (1)\n"); PR_fprintf(debug_out, "\t-e <seconds> duration of the test in seconds (10)\n"); PR_fprintf(debug_out, "\t-s <string> dsn name of server (localhost)\n"); PR_fprintf(debug_out, "\t-G use GLOBAL threads (LOCAL)\n"); PR_fprintf(debug_out, "\t-T <string> thread provider ('n' | 'p' | 'w')(n)\n"); PR_fprintf(debug_out, "\t-X use XTP as transport (TCP)\n"); PR_fprintf(debug_out, "\t-6 Use IPv6 (IPv4)\n"); PR_fprintf(debug_out, "\t-v verbosity (accumulative) (0)\n"); PR_fprintf(debug_out, "\t-p pthread statistics (FALSE)\n"); PR_fprintf(debug_out, "\t-d debug mode (FALSE)\n"); PR_fprintf(debug_out, "\t-h this message\n");} /* Help */static Verbosity IncrementVerbosity(void){ PRIntn verboge = (PRIntn)verbosity + 1; return (Verbosity)verboge;} /* IncrementVerbosity */PRIntn main(PRIntn argc, char** argv){ PRUintn index; PRBool boolean; CSClient_t *client; PRStatus rv, joinStatus; CSServer_t *server = NULL; char *thread_type; PRUintn backlog = DEFAULT_BACKLOG; PRUintn clients = DEFAULT_CLIENTS; const char *serverName = DEFAULT_SERVER; PRBool serverIsLocal = PR_TRUE; PRUintn accepting = ALLOWED_IN_ACCEPT; PRUintn workersMin = DEFAULT_WORKERS_MIN; PRUintn workersMax = DEFAULT_WORKERS_MAX; PRIntn execution = DEFAULT_EXECUTION_TIME; /* * -G use global threads * -a <n> threads allowed in accept * -b <n> backlock for listen * -c <threads> number of clients to create * -w <threads> minimal number of server threads * -W <threads> maximum number of server threads * -e <seconds> duration of the test in seconds * -s <string> dsn name of server (implies no server here) * -v verbosity */ PLOptStatus os; PLOptState *opt = PL_CreateOptState(argc, argv, "GX6b:a:c:w:W:e:s:T:vdhp");#if defined(WIN32) thread_provider = thread_win32;#elif defined(_PR_PTHREADS) thread_provider = thread_pthread;#elif defined(IRIX) thread_provider = thread_sproc;#else thread_provider = thread_nspr;#endif debug_out = PR_GetSpecialFD(PR_StandardError); while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { if (PL_OPT_BAD == os) continue; switch (opt->option) { case 'G': /* use global threads */ thread_scope = PR_GLOBAL_THREAD; break; case 'X': /* use XTP as transport */ protocol = 36; break; case '6': /* Use IPv6 */ domain = PR_AF_INET6; break; case 'a': /* the value for accepting */ accepting = atoi(opt->value); break; case 'b': /* the value for backlock */ backlog = atoi(opt->value); break; case 'T': /* the thread provider */ if ('n' == *opt->value) thread_provider = thread_nspr; else if ('p' == *opt->value) thread_provider = thread_pthread; else if ('w' == *opt->value) thread_provider = thread_win32; else {Help(); return 2; } break; case 'c': /* number of client threads */ clients = atoi(opt->value); break; case 'w': /* minimum server worker threads */ workersMin = atoi(opt->value); break; case 'W': /* maximum server worker threads */ workersMax = atoi(opt->value); break; case 'e': /* program execution time in seconds */ execution = atoi(opt->value); break; case 's': /* server's address */ serverName = opt->value; break; case 'v': /* verbosity */ verbosity = IncrementVerbosity(); break; case 'd': /* debug mode */ debug_mode = PR_TRUE; break; case 'p': /* pthread mode */ pthread_stats = PR_TRUE; break; case 'h': default: Help(); return 2; } } PL_DestroyOptState(opt); if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) serverIsLocal = PR_FALSE; if (0 == execution) execution = DEFAULT_EXECUTION_TIME; if (0 == workersMax) workersMax = DEFAULT_WORKERS_MAX; if (0 == workersMin) workersMin = DEFAULT_WORKERS_MIN; if (0 == accepting) accepting = ALLOWED_IN_ACCEPT; if (0 == backlog) backlog = DEFAULT_BACKLOG; if (workersMin > accepting) accepting = workersMin; PR_STDIO_INIT(); TimeOfDayMessage("Client/Server started at", PR_CurrentThread()); cltsrv_log_file = PR_NewLogModule("cltsrv_log"); MY_ASSERT(NULL != cltsrv_log_file); boolean = PR_SetLogFile("cltsrv.log"); MY_ASSERT(boolean);#ifdef XP_MAC debug_mode = PR_TRUE;#endif if (serverIsLocal) { /* Establish the server */ TEST_LOG( cltsrv_log_file, TEST_LOG_INFO, ("main(0x%p): starting server\n", PR_CurrentThread())); server = PR_NEWZAP(CSServer_t); PR_INIT_CLIST(&server->list); server->state = cs_init; server->ml = PR_NewLock(); server->backlog = backlog; server->port = DEFAULT_PORT; server->workers.minimum = workersMin; server->workers.maximum = workersMax; server->workers.accepting = accepting; server->stateChange = PR_NewCondVar(server->ml); server->pool.exiting = PR_NewCondVar(server->ml); server->pool.acceptComplete = PR_NewCondVar(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("main(0x%p): creating server thread\n", PR_CurrentThread())); rv = NewThread( Server, server, PR_PRIORITY_HIGH, PR_JOINABLE_THREAD); TEST_ASSERT(PR_SUCCESS == rv); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): waiting for server init\n", PR_CurrentThread())); PR_Lock(server->ml); while (server->state == cs_init) PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): server init complete (port #%d)\n", PR_CurrentThread(), server->port)); } if (clients != 0) { /* Create all of the clients */ PRHostEnt host; char buffer[BUFFER_SIZE]; client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t)); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): creating %d client threads\n", PR_CurrentThread(), clients)); if (!serverIsLocal) { rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host); if (PR_SUCCESS != rv) { PL_FPrintError(PR_STDERR, "PR_GetHostByName"); return 2; } } for (index = 0; index < clients; ++index) { client[index].state = cs_init; client[index].ml = PR_NewLock(); if (serverIsLocal) { (void)PR_InitializeNetAddr( PR_IpAddrLoopback, DEFAULT_PORT, &client[index].serverAddress); } else { (void)PR_EnumerateHostEnt( 0, &host, DEFAULT_PORT, &client[index].serverAddress); } client[index].stateChange = PR_NewCondVar(client[index].ml); TEST_LOG( cltsrv_log_file, TEST_LOG_INFO, ("main(0x%p): creating client threads\n", PR_CurrentThread())); rv = NewThread( Client, &client[index], PR_PRIORITY_NORMAL, PR_JOINABLE_THREAD); TEST_ASSERT(PR_SUCCESS == rv); PR_Lock(client[index].ml); while (cs_init == client[index].state) PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(client[index].ml); } } /* Then just let them go at it for a bit */ TEST_LOG( cltsrv_log_file, TEST_LOG_ALWAYS, ("main(0x%p): waiting for execution interval (%d seconds)\n", PR_CurrentThread(), execution)); WaitForCompletion(execution); TimeOfDayMessage("Shutting down", PR_CurrentThread()); if (clients != 0) { for (index = 0; index < clients; ++index) { TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS, ("main(0x%p): notifying client(0x%p) to stop\n", PR_CurrentThread(), client[index].thread)); PR_Lock(client[index].ml); if (cs_run == client[index].state) { client[index].state = cs_stop; PR_Interrupt(client[index].thread); while (cs_stop == client[index].state) PR_WaitCondVar( client[index].stateChange, PR_INTERVAL_NO_TIMEOUT); } PR_Unlock(client[index].ml); TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): joining client(0x%p)\n", PR_CurrentThread(), client[index].thread)); joinStatus = JoinThread(client[index].thread); TEST_ASSERT(PR_SUCCESS == joinStatus); PR_DestroyCondVar(client[index].stateChange); PR_DestroyLock(client[index].ml); } PR_DELETE(client); } if (NULL != server) { /* All clients joined - retrieve the server */ TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("main(0x%p): notifying server(0x%p) to stop\n", PR_CurrentThread(), server->thread)); PR_Lock(server->ml); server->state = cs_stop; PR_Interrupt(server->thread); while (cs_exit != server->state) PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("main(0x%p): joining server(0x%p)\n", PR_CurrentThread(), server->thread)); joinStatus = JoinThread(server->thread); TEST_ASSERT(PR_SUCCESS == joinStatus); PR_DestroyCondVar(server->stateChange); PR_DestroyCondVar(server->pool.exiting); PR_DestroyCondVar(server->pool.acceptComplete); PR_DestroyLock(server->ml); PR_DELETE(server); } TEST_LOG( cltsrv_log_file, TEST_LOG_ALWAYS, ("main(0x%p): test complete\n", PR_CurrentThread())); if (thread_provider == thread_win32) thread_type = "\nWin32 Thread Statistics\n"; else if (thread_provider == thread_pthread) thread_type = "\npthread Statistics\n"; else if (thread_provider == thread_sproc) thread_type = "\nsproc Statistics\n"; else { PR_ASSERT(thread_provider == thread_nspr); thread_type = "\nPRThread Statistics\nn"; } PT_FPrintStats(debug_out, thread_type); TimeOfDayMessage("Test exiting at", PR_CurrentThread()); PR_Cleanup(); return 0;} /* main *//* cltsrv.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -