📄 cltsrv.c
字号:
rv = CreateWorker(server, &server->pool); TEST_ASSERT(PR_SUCCESS == rv); /* ** From here on this thread is merely hanging around as the contact ** point for the main test driver. It's just waiting for the driver ** to declare the test complete. */ TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tServer(0x%p): waiting for state change\n", me)); PR_Lock(server->ml); while ((cs_run == server->state) && !Aborted(rv)) { rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); } PR_Unlock(server->ml); PR_ClearInterrupt(); TEST_LOG( cltsrv_log_file, TEST_LOG_INFO, ("\tServer(0x%p): shutting down workers\n", me)); /* ** Get all the worker threads to exit. They know how to ** clean up after themselves, so this is just a matter of ** waiting for clorine in the pool to take effect. During ** this stage we're ignoring interrupts. */ server->workers.minimum = server->workers.maximum = 0; PR_Lock(server->ml); while (!PR_CLIST_IS_EMPTY(&server->list)) { PRCList *head = PR_LIST_HEAD(&server->list); CSWorker_t *worker = (CSWorker_t*)head; TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker)); rv = PR_Interrupt(worker->thread); TEST_ASSERT(PR_SUCCESS == rv); PR_REMOVE_AND_INIT_LINK(head); } while (server->pool.workers > 0) { TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("\tServer(0x%p): waiting for %u workers to exit\n", me, server->pool.workers)); (void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT); } server->state = cs_exit; PR_NotifyCondVar(server->stateChange); PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_ALWAYS, ("\tServer(0x%p): stopped after %u operations and %u bytes\n", me, server->operations, server->bytesTransferred)); if (NULL != server->listener) PR_Close(server->listener); server->stopped = PR_IntervalNow();} /* Server */static void WaitForCompletion(PRIntn execution){ while (execution > 0) { PRIntn dally = (execution > 30) ? 30 : execution; PR_Sleep(PR_SecondsToInterval(dally)); if (pthread_stats) PT_FPrintStats(debug_out, "\nPThread Statistics\n"); execution -= dally; }} /* WaitForCompletion */static void Help(void){ PR_fprintf(debug_out, "cltsrv test program usage:\n"); PR_fprintf(debug_out, "\t-a <n> threads allowed in accept (5)\n"); PR_fprintf(debug_out, "\t-b <n> backlock for listen (5)\n"); PR_fprintf(debug_out, "\t-c <threads> number of clients to create (1)\n"); PR_fprintf(debug_out, "\t-f <low> low water mark for fd caching (0)\n"); PR_fprintf(debug_out, "\t-F <high> high water mark for fd caching (0)\n"); PR_fprintf(debug_out, "\t-w <threads> minimal number of server threads (1)\n"); PR_fprintf(debug_out, "\t-W <threads> maximum number of server threads (1)\n"); PR_fprintf(debug_out, "\t-e <seconds> duration of the test in seconds (10)\n"); PR_fprintf(debug_out, "\t-s <string> dsn name of server (localhost)\n"); PR_fprintf(debug_out, "\t-G use GLOBAL threads (LOCAL)\n"); PR_fprintf(debug_out, "\t-X use XTP as transport (TCP)\n"); PR_fprintf(debug_out, "\t-6 Use IPv6 (IPv4)\n"); PR_fprintf(debug_out, "\t-v verbosity (accumulative) (0)\n"); PR_fprintf(debug_out, "\t-p pthread statistics (FALSE)\n"); PR_fprintf(debug_out, "\t-d debug mode (FALSE)\n"); PR_fprintf(debug_out, "\t-h this message\n");} /* Help */static Verbosity IncrementVerbosity(void){ PRIntn verboge = (PRIntn)verbosity + 1; return (Verbosity)verboge;} /* IncrementVerbosity */PRIntn main(PRIntn argc, char** argv){ PRUintn index; PRBool boolean; CSClient_t *client; PRStatus rv, joinStatus; CSServer_t *server = NULL; PRUintn backlog = DEFAULT_BACKLOG; PRUintn clients = DEFAULT_CLIENTS; const char *serverName = DEFAULT_SERVER; PRBool serverIsLocal = PR_TRUE; PRUintn accepting = ALLOWED_IN_ACCEPT; PRUintn workersMin = DEFAULT_WORKERS_MIN; PRUintn workersMax = DEFAULT_WORKERS_MAX; PRIntn execution = DEFAULT_EXECUTION_TIME; PRIntn low = DEFAULT_LOW, high = DEFAULT_HIGH; /* * -G use global threads * -a <n> threads allowed in accept * -b <n> backlock for listen * -c <threads> number of clients to create * -f <low> low water mark for caching FDs * -F <high> high water mark for caching FDs * -w <threads> minimal number of server threads * -W <threads> maximum number of server threads * -e <seconds> duration of the test in seconds * -s <string> dsn name of server (implies no server here) * -v verbosity */ PLOptStatus os; PLOptState *opt = PL_CreateOptState(argc, argv, "GX6b:a:c:f:F:w:W:e:s:vdhp"); debug_out = PR_GetSpecialFD(PR_StandardError); while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { if (PL_OPT_BAD == os) continue; switch (opt->option) { case 'G': /* use global threads */ thread_scope = PR_GLOBAL_THREAD; break; case 'X': /* use XTP as transport */ protocol = 36; break; case '6': /* Use IPv6 */ domain = PR_AF_INET6; break; case 'a': /* the value for accepting */ accepting = atoi(opt->value); break; case 'b': /* the value for backlock */ backlog = atoi(opt->value); break; case 'c': /* number of client threads */ clients = atoi(opt->value); break; case 'f': /* low water fd cache */ low = atoi(opt->value); break; case 'F': /* low water fd cache */ high = atoi(opt->value); break; case 'w': /* minimum server worker threads */ workersMin = atoi(opt->value); break; case 'W': /* maximum server worker threads */ workersMax = atoi(opt->value); break; case 'e': /* program execution time in seconds */ execution = atoi(opt->value); break; case 's': /* server's address */ serverName = opt->value; break; case 'v': /* verbosity */ verbosity = IncrementVerbosity(); break; case 'd': /* debug mode */ debug_mode = PR_TRUE; break; case 'p': /* pthread mode */ pthread_stats = PR_TRUE; break; case 'h': default: Help(); return 2; } } PL_DestroyOptState(opt); if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) serverIsLocal = PR_FALSE; if (0 == execution) execution = DEFAULT_EXECUTION_TIME; if (0 == workersMax) workersMax = DEFAULT_WORKERS_MAX; if (0 == workersMin) workersMin = DEFAULT_WORKERS_MIN; if (0 == accepting) accepting = ALLOWED_IN_ACCEPT; if (0 == backlog) backlog = DEFAULT_BACKLOG; if (workersMin > accepting) accepting = workersMin; PR_STDIO_INIT(); TimeOfDayMessage("Client/Server started at", PR_CurrentThread()); cltsrv_log_file = PR_NewLogModule("cltsrv_log"); MY_ASSERT(NULL != cltsrv_log_file); boolean = PR_SetLogFile("cltsrv.log"); MY_ASSERT(boolean);#ifdef XP_MAC debug_mode = PR_TRUE;#endif rv = PR_SetFDCacheSize(low, high); PR_ASSERT(PR_SUCCESS == rv); if (serverIsLocal) { /* Establish the server */ TEST_LOG( cltsrv_log_file, TEST_LOG_INFO, ("main(0x%p): starting server\n", PR_CurrentThread())); server = PR_NEWZAP(CSServer_t); PR_INIT_CLIST(&server->list); server->state = cs_init; server->ml = PR_NewLock(); server->backlog = backlog; server->port = DEFAULT_PORT; server->workers.minimum = workersMin; server->workers.maximum = workersMax; server->workers.accepting = accepting; server->stateChange = PR_NewCondVar(server->ml); server->pool.exiting = PR_NewCondVar(server->ml); server->pool.acceptComplete = PR_NewCondVar(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("main(0x%p): creating server thread\n", PR_CurrentThread())); server->thread = PR_CreateThread( PR_USER_THREAD, Server, server, PR_PRIORITY_HIGH, thread_scope, PR_JOINABLE_THREAD, 0); TEST_ASSERT(NULL != server->thread); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): waiting for server init\n", PR_CurrentThread())); PR_Lock(server->ml); while (server->state == cs_init) PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): server init complete (port #%d)\n", PR_CurrentThread(), server->port)); } if (clients != 0) { /* Create all of the clients */ PRHostEnt host; char buffer[BUFFER_SIZE]; client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t)); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): creating %d client threads\n", PR_CurrentThread(), clients)); if (!serverIsLocal) { rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host); if (PR_SUCCESS != rv) { PL_FPrintError(PR_STDERR, "PR_GetHostByName"); return 2; } } for (index = 0; index < clients; ++index) { client[index].state = cs_init; client[index].ml = PR_NewLock(); if (serverIsLocal) { if (PR_AF_INET6 != domain) (void)PR_InitializeNetAddr( PR_IpAddrLoopback, DEFAULT_PORT, &client[index].serverAddress); else rv = PR_SetNetAddr(PR_IpAddrLoopback, PR_AF_INET6, DEFAULT_PORT, &client[index].serverAddress); } else { (void)PR_EnumerateHostEnt( 0, &host, DEFAULT_PORT, &client[index].serverAddress); } client[index].stateChange = PR_NewCondVar(client[index].ml); TEST_LOG( cltsrv_log_file, TEST_LOG_INFO, ("main(0x%p): creating client threads\n", PR_CurrentThread())); client[index].thread = PR_CreateThread( PR_USER_THREAD, Client, &client[index], PR_PRIORITY_NORMAL, thread_scope, PR_JOINABLE_THREAD, 0); TEST_ASSERT(NULL != client[index].thread); PR_Lock(client[index].ml); while (cs_init == client[index].state) PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(client[index].ml); } } /* Then just let them go at it for a bit */ TEST_LOG( cltsrv_log_file, TEST_LOG_ALWAYS, ("main(0x%p): waiting for execution interval (%d seconds)\n", PR_CurrentThread(), execution)); WaitForCompletion(execution); TimeOfDayMessage("Shutting down", PR_CurrentThread()); if (clients != 0) { for (index = 0; index < clients; ++index) { TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS, ("main(0x%p): notifying client(0x%p) to stop\n", PR_CurrentThread(), client[index].thread)); PR_Lock(client[index].ml); if (cs_run == client[index].state) { client[index].state = cs_stop; PR_Interrupt(client[index].thread); while (cs_stop == client[index].state) PR_WaitCondVar( client[index].stateChange, PR_INTERVAL_NO_TIMEOUT); } PR_Unlock(client[index].ml); TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE, ("main(0x%p): joining client(0x%p)\n", PR_CurrentThread(), client[index].thread)); joinStatus = PR_JoinThread(client[index].thread); TEST_ASSERT(PR_SUCCESS == joinStatus); PR_DestroyCondVar(client[index].stateChange); PR_DestroyLock(client[index].ml); } PR_DELETE(client); } if (NULL != server) { /* All clients joined - retrieve the server */ TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("main(0x%p): notifying server(0x%p) to stop\n", PR_CurrentThread(), server->thread)); PR_Lock(server->ml); server->state = cs_stop; PR_Interrupt(server->thread); while (cs_exit != server->state) PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT); PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("main(0x%p): joining server(0x%p)\n", PR_CurrentThread(), server->thread)); joinStatus = PR_JoinThread(server->thread); TEST_ASSERT(PR_SUCCESS == joinStatus); PR_DestroyCondVar(server->stateChange); PR_DestroyCondVar(server->pool.exiting); PR_DestroyCondVar(server->pool.acceptComplete); PR_DestroyLock(server->ml); PR_DELETE(server); } TEST_LOG( cltsrv_log_file, TEST_LOG_ALWAYS, ("main(0x%p): test complete\n", PR_CurrentThread())); PT_FPrintStats(debug_out, "\nPThread Statistics\n"); TimeOfDayMessage("Test exiting at", PR_CurrentThread()); PR_Cleanup(); return 0;} /* main *//* cltsrv.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -