⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 thrpool_server.c

📁 Netscape NSPR库源码
💻 C
📖 第 1 页 / 共 2 页
字号:
 *	  Finally, the threadpool is shutdown */static void PR_CALLBACKTCP_Server(void *arg){    PRThreadPool *tp = (PRThreadPool *) arg;    Server_Param *sp;    PRFileDesc *sockfd;    PRNetAddr netaddr;	PRMonitor *sc_mon;	PRJob *jobp;	int i;	PRStatus rval;    /*     * Create a tcp socket     */    if ((sockfd = PR_NewTCPSocket()) == NULL) {        fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);        return;    }    memset(&netaddr, 0 , sizeof(netaddr));    netaddr.inet.family = PR_AF_INET;    netaddr.inet.port = PR_htons(TCP_SERVER_PORT);    netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);    /*     * try a few times to bind server's address, if addresses are in     * use     */	i = 0;    while (PR_Bind(sockfd, &netaddr) < 0) {        if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {            netaddr.inet.port += 2;            if (i++ < SERVER_MAX_BIND_COUNT)                continue;        }        fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);        perror("PR_Bind");        failed_already=1;        return;    }    if (PR_Listen(sockfd, 32) < 0) {        fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);        failed_already=1;        return;    }    if (PR_GetSockName(sockfd, &netaddr) < 0) {        fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);        failed_already=1;        return;    }    DPRINTF((	"TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",        netaddr.inet.ip, netaddr.inet.port));	sp = PR_NEW(Server_Param);	if (sp == NULL) {		fprintf(stderr,"%s: PR_NEW failed\n", program_name);		failed_already=1;		return;	}	sp->iod.socket = sockfd;	sp->iod.timeout = PR_SecondsToInterval(60);	sp->datalen = tcp_mesg_size;	sp->exit_mon = sc_mon;	sp->job_counterp = &job_counter;	sp->conn_counter = 0;	sp->tp = tp;	sp->netaddr = netaddr;	/* create and cancel an io job */	jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,							PR_FALSE);	PR_ASSERT(NULL != jobp);	rval = PR_CancelJob(jobp);	PR_ASSERT(PR_SUCCESS == rval);	/*	 * create the client process	 */	{#define MAX_ARGS 4		char *argv[MAX_ARGS + 1];		int index = 0;		char port[32];        char path[1024 + sizeof("/thrpool_client")];        (void)getcwd(path, sizeof(path));        (void)strcat(path, "/thrpool_client");#ifdef XP_PC        (void)strcat(path, ".exe");#endif        argv[index++] = path;		sprintf(port,"%d",PR_ntohs(netaddr.inet.port));        if (_debug_on)        {            argv[index++] = "-d";            argv[index++] = "-p";            argv[index++] = port;            argv[index++] = NULL;        } else {            argv[index++] = "-p";            argv[index++] = port;			argv[index++] = NULL;		}		PR_ASSERT(MAX_ARGS >= (index - 1));                DPRINTF(("creating client process %s ...\n", path));        if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {        	fprintf(stderr,				"thrpool_server: ERROR - PR_CreateProcessDetached failed\n");        	failed_already=1;        	return;		}	}    sc_mon = PR_NewMonitor();    if (sc_mon == NULL) {        fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);        failed_already=1;        return;    }	sp->iod.socket = sockfd;	sp->iod.timeout = PR_SecondsToInterval(60);	sp->datalen = tcp_mesg_size;	sp->exit_mon = sc_mon;	sp->job_counterp = &job_counter;	sp->conn_counter = 0;	sp->tp = tp;	sp->netaddr = netaddr;	/* create and cancel a timer job */	jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),						print_stats, sp, PR_FALSE);	PR_ASSERT(NULL != jobp);	rval = PR_CancelJob(jobp);	PR_ASSERT(PR_SUCCESS == rval);    DPRINTF(("TCP_Server: Accepting connections \n"));	jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,							PR_FALSE);	PR_ASSERT(NULL != jobp);	return;}static voidTCP_Server_Accept(void *arg){    Server_Param *sp = (Server_Param *) arg;    PRThreadPool *tp = sp->tp;    Serve_Client_Param *scp;	PRFileDesc *newsockfd;	PRJob *jobp;	if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,		PR_INTERVAL_NO_TIMEOUT)) == NULL) {		fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);		failed_already=1;		goto exit;	}	scp = PR_NEW(Serve_Client_Param);	if (scp == NULL) {		fprintf(stderr,"%s: PR_NEW failed\n", program_name);		failed_already=1;		goto exit;	}	/*	 * Start a Serve_Client job for each incoming connection	 */	scp->iod.socket = newsockfd;	scp->iod.timeout = PR_SecondsToInterval(60);	scp->datalen = tcp_mesg_size;	scp->exit_mon = sp->exit_mon;	scp->job_counterp = sp->job_counterp;	scp->tp = sp->tp;	PR_EnterMonitor(sp->exit_mon);	(*sp->job_counterp)++;	PR_ExitMonitor(sp->exit_mon);	jobp = PR_QueueJob(tp, Serve_Client, scp,						PR_FALSE);	PR_ASSERT(NULL != jobp);	DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));	/*	 * single-threaded update; no lock needed	 */    sp->conn_counter++;    if (sp->conn_counter <			(num_tcp_clients * num_tcp_connections_per_client)) {		jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,								PR_FALSE);		PR_ASSERT(NULL != jobp);		return;	}	jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),						print_stats, sp, PR_FALSE);	PR_ASSERT(NULL != jobp);	DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));exit:	PR_EnterMonitor(sp->exit_mon);    /* Wait for server jobs to finish */    while (0 != *sp->job_counterp) {        PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);        DPRINTF(("TCP_Server: conn_counter = %d\n",												*sp->job_counterp));    }    PR_ExitMonitor(sp->exit_mon);    if (sp->iod.socket) {        PR_Close(sp->iod.socket);    }	PR_DestroyMonitor(sp->exit_mon);    printf("%30s","TCP_Socket_Client_Server_Test:");    printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,        num_tcp_clients, num_tcp_connections_per_client);    printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",        num_tcp_mesgs_per_connection, tcp_mesg_size);	DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));	PR_ShutdownThreadPool(sp->tp);	PR_DELETE(sp);}/************************************************************************/#define DEFAULT_INITIAL_THREADS		4#define DEFAULT_MAX_THREADS			100#define DEFAULT_STACKSIZE			(512 * 1024)intmain(int argc, char **argv){	PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;	PRInt32 max_threads = DEFAULT_MAX_THREADS;	PRInt32 stacksize = DEFAULT_STACKSIZE;	PRThreadPool *tp = NULL;	PRStatus rv;	PRJob *jobp;    /*     * -d           debug mode     */    PLOptStatus os;    PLOptState *opt;	program_name = argv[0];    opt = PL_CreateOptState(argc, argv, "d");    while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))    {        if (PL_OPT_BAD == os) continue;        switch (opt->option)        {        case 'd':  /* debug mode */            _debug_on = 1;            break;        default:            break;        }    }    PL_DestroyOptState(opt);    PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);    PR_STDIO_INIT();#ifdef XP_MAC    SetupMacPrintfLog("socket.log");#endif    PR_SetConcurrency(4);	tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);    if (NULL == tp) {        printf("PR_CreateThreadPool failed\n");        failed_already=1;        goto done;	}	jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);	rv = PR_JoinJob(jobp);			PR_ASSERT(PR_SUCCESS == rv);	DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));	rv = PR_JoinThreadPool(tp);	PR_ASSERT(PR_SUCCESS == rv);	DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));done:    PR_Cleanup();    if (failed_already) return 1;    else return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -