📄 thrpool_server.c
字号:
* Finally, the threadpool is shutdown */static void PR_CALLBACKTCP_Server(void *arg){ PRThreadPool *tp = (PRThreadPool *) arg; Server_Param *sp; PRFileDesc *sockfd; PRNetAddr netaddr; PRMonitor *sc_mon; PRJob *jobp; int i; PRStatus rval; /* * Create a tcp socket */ if ((sockfd = PR_NewTCPSocket()) == NULL) { fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name); return; } memset(&netaddr, 0 , sizeof(netaddr)); netaddr.inet.family = PR_AF_INET; netaddr.inet.port = PR_htons(TCP_SERVER_PORT); netaddr.inet.ip = PR_htonl(PR_INADDR_ANY); /* * try a few times to bind server's address, if addresses are in * use */ i = 0; while (PR_Bind(sockfd, &netaddr) < 0) { if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) { netaddr.inet.port += 2; if (i++ < SERVER_MAX_BIND_COUNT) continue; } fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name); perror("PR_Bind"); failed_already=1; return; } if (PR_Listen(sockfd, 32) < 0) { fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name); failed_already=1; return; } if (PR_GetSockName(sockfd, &netaddr) < 0) { fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name); failed_already=1; return; } DPRINTF(( "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n", netaddr.inet.ip, netaddr.inet.port)); sp = PR_NEW(Server_Param); if (sp == NULL) { fprintf(stderr,"%s: PR_NEW failed\n", program_name); failed_already=1; return; } sp->iod.socket = sockfd; sp->iod.timeout = PR_SecondsToInterval(60); sp->datalen = tcp_mesg_size; sp->exit_mon = sc_mon; sp->job_counterp = &job_counter; sp->conn_counter = 0; sp->tp = tp; sp->netaddr = netaddr; /* create and cancel an io job */ jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE); PR_ASSERT(NULL != jobp); rval = PR_CancelJob(jobp); PR_ASSERT(PR_SUCCESS == rval); /* * create the client process */ {#define MAX_ARGS 4 char *argv[MAX_ARGS + 1]; int index = 0; char port[32]; char path[1024 + sizeof("/thrpool_client")]; (void)getcwd(path, sizeof(path)); (void)strcat(path, "/thrpool_client");#ifdef XP_PC (void)strcat(path, ".exe");#endif argv[index++] = path; sprintf(port,"%d",PR_ntohs(netaddr.inet.port)); if (_debug_on) { argv[index++] = "-d"; argv[index++] = "-p"; argv[index++] = port; argv[index++] = NULL; } else { argv[index++] = "-p"; argv[index++] = port; argv[index++] = NULL; } PR_ASSERT(MAX_ARGS >= (index - 1)); DPRINTF(("creating client process %s ...\n", path)); if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) { fprintf(stderr, "thrpool_server: ERROR - PR_CreateProcessDetached failed\n"); failed_already=1; return; } } sc_mon = PR_NewMonitor(); if (sc_mon == NULL) { fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name); failed_already=1; return; } sp->iod.socket = sockfd; sp->iod.timeout = PR_SecondsToInterval(60); sp->datalen = tcp_mesg_size; sp->exit_mon = sc_mon; sp->job_counterp = &job_counter; sp->conn_counter = 0; sp->tp = tp; sp->netaddr = netaddr; /* create and cancel a timer job */ jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000), print_stats, sp, PR_FALSE); PR_ASSERT(NULL != jobp); rval = PR_CancelJob(jobp); PR_ASSERT(PR_SUCCESS == rval); DPRINTF(("TCP_Server: Accepting connections \n")); jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE); PR_ASSERT(NULL != jobp); return;}static voidTCP_Server_Accept(void *arg){ Server_Param *sp = (Server_Param *) arg; PRThreadPool *tp = sp->tp; Serve_Client_Param *scp; PRFileDesc *newsockfd; PRJob *jobp; if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr, PR_INTERVAL_NO_TIMEOUT)) == NULL) { fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name); failed_already=1; goto exit; } scp = PR_NEW(Serve_Client_Param); if (scp == NULL) { fprintf(stderr,"%s: PR_NEW failed\n", program_name); failed_already=1; goto exit; } /* * Start a Serve_Client job for each incoming connection */ scp->iod.socket = newsockfd; scp->iod.timeout = PR_SecondsToInterval(60); scp->datalen = tcp_mesg_size; scp->exit_mon = sp->exit_mon; scp->job_counterp = sp->job_counterp; scp->tp = sp->tp; PR_EnterMonitor(sp->exit_mon); (*sp->job_counterp)++; PR_ExitMonitor(sp->exit_mon); jobp = PR_QueueJob(tp, Serve_Client, scp, PR_FALSE); PR_ASSERT(NULL != jobp); DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp)); /* * single-threaded update; no lock needed */ sp->conn_counter++; if (sp->conn_counter < (num_tcp_clients * num_tcp_connections_per_client)) { jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, PR_FALSE); PR_ASSERT(NULL != jobp); return; } jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), print_stats, sp, PR_FALSE); PR_ASSERT(NULL != jobp); DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));exit: PR_EnterMonitor(sp->exit_mon); /* Wait for server jobs to finish */ while (0 != *sp->job_counterp) { PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT); DPRINTF(("TCP_Server: conn_counter = %d\n", *sp->job_counterp)); } PR_ExitMonitor(sp->exit_mon); if (sp->iod.socket) { PR_Close(sp->iod.socket); } PR_DestroyMonitor(sp->exit_mon); printf("%30s","TCP_Socket_Client_Server_Test:"); printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l, num_tcp_clients, num_tcp_connections_per_client); printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":", num_tcp_mesgs_per_connection, tcp_mesg_size); DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name)); PR_ShutdownThreadPool(sp->tp); PR_DELETE(sp);}/************************************************************************/#define DEFAULT_INITIAL_THREADS 4#define DEFAULT_MAX_THREADS 100#define DEFAULT_STACKSIZE (512 * 1024)intmain(int argc, char **argv){ PRInt32 initial_threads = DEFAULT_INITIAL_THREADS; PRInt32 max_threads = DEFAULT_MAX_THREADS; PRInt32 stacksize = DEFAULT_STACKSIZE; PRThreadPool *tp = NULL; PRStatus rv; PRJob *jobp; /* * -d debug mode */ PLOptStatus os; PLOptState *opt; program_name = argv[0]; opt = PL_CreateOptState(argc, argv, "d"); while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) { if (PL_OPT_BAD == os) continue; switch (opt->option) { case 'd': /* debug mode */ _debug_on = 1; break; default: break; } } PL_DestroyOptState(opt); PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); PR_STDIO_INIT();#ifdef XP_MAC SetupMacPrintfLog("socket.log");#endif PR_SetConcurrency(4); tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize); if (NULL == tp) { printf("PR_CreateThreadPool failed\n"); failed_already=1; goto done; } jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE); rv = PR_JoinJob(jobp); PR_ASSERT(PR_SUCCESS == rv); DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name)); rv = PR_JoinThreadPool(tp); PR_ASSERT(PR_SUCCESS == rv); DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));done: PR_Cleanup(); if (failed_already) return 1; else return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -