📄 thrpool_server.c
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is the Netscape Portable Runtime (NSPR). * * The Initial Developer of the Original Code is Netscape * Communications Corporation. Portions created by Netscape are * Copyright (C) 1999-2000 Netscape Communications Corporation. All * Rights Reserved. * * Contributor(s): * * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable * instead of those above. If you wish to allow use of your * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL. If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. *//*************************************************************************** Name: thrpool.c**** Description: Test threadpool functionality.**** Modification History:*/#include "primpl.h"#include "plgetopt.h"#include <stdio.h>#include <string.h>#include <errno.h>#ifdef XP_UNIX#include <sys/mman.h>#endif#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)#include <pthread.h>#endif#ifdef WIN32#include <process.h>#endifstatic int _debug_on = 0;static char *program_name = NULL;static void serve_client_write(void *arg);#ifdef XP_MAC#include "prlog.h"#include "prsem.h"int fprintf(FILE *stream, const char *fmt, ...){ PR_LogPrint(fmt); return 0;}#define printf PR_LogPrintextern void SetupMacPrintfLog(char *logFile);#else#include "obsolete/prsem.h"#endif#ifdef XP_PC#define mode_t int#endif#define DPRINTF(arg) if (_debug_on) printf arg#define BUF_DATA_SIZE (2 * 1024)#define TCP_MESG_SIZE 1024#define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */#define NUM_TCP_CONNECTIONS_PER_CLIENT 10#define NUM_TCP_MESGS_PER_CONNECTION 10#define TCP_SERVER_PORT 10000#define SERVER_MAX_BIND_COUNT 100static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;static void TCP_Server_Accept(void *arg);int failed_already=0;typedef struct buffer { char data[BUF_DATA_SIZE];} buffer;typedef struct Server_Param { PRJobIoDesc iod; /* socket to read from/write to */ PRInt32 datalen; /* bytes of data transfered in each read/write */ PRNetAddr netaddr; PRMonitor *exit_mon; /* monitor to signal on exit */ PRInt32 *job_counterp; /* counter to decrement, before exit */ PRInt32 conn_counter; /* counter to decrement, before exit */ PRThreadPool *tp;} Server_Param;typedef struct Serve_Client_Param { PRJobIoDesc iod; /* socket to read from/write to */ PRInt32 datalen; /* bytes of data transfered in each read/write */ PRMonitor *exit_mon; /* monitor to signal on exit */ PRInt32 *job_counterp; /* counter to decrement, before exit */ PRThreadPool *tp;} Serve_Client_Param;typedef struct Session { PRJobIoDesc iod; /* socket to read from/write to */ buffer *in_buf; PRInt32 bytes; PRInt32 msg_num; PRInt32 bytes_read; PRMonitor *exit_mon; /* monitor to signal on exit */ PRInt32 *job_counterp; /* counter to decrement, before exit */ PRThreadPool *tp;} Session;static voidserve_client_read(void *arg){ Session *sp = (Session *) arg; int rem; int bytes; int offset; PRFileDesc *sockfd; char *buf; PRJob *jobp; PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; sockfd = sp->iod.socket; buf = sp->in_buf->data; PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection); PR_ASSERT(sp->bytes_read < sp->bytes); offset = sp->bytes_read; rem = sp->bytes - offset; bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout); if (bytes < 0) { return; } sp->bytes_read += bytes; sp->iod.timeout = PR_SecondsToInterval(60); if (sp->bytes_read < sp->bytes) { jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE); PR_ASSERT(NULL != jobp); return; } PR_ASSERT(sp->bytes_read == sp->bytes); DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num)); sp->iod.timeout = PR_SecondsToInterval(60); jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp, PR_FALSE); PR_ASSERT(NULL != jobp); return;}static voidserve_client_write(void *arg){ Session *sp = (Session *) arg; int bytes; PRFileDesc *sockfd; char *buf; PRJob *jobp; sockfd = sp->iod.socket; buf = sp->in_buf->data; PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection); bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT); PR_ASSERT(bytes == sp->bytes); if (bytes < 0) { return; } DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num)); sp->msg_num++; if (sp->msg_num < num_tcp_mesgs_per_connection) { sp->bytes_read = 0; sp->iod.timeout = PR_SecondsToInterval(60); jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE); PR_ASSERT(NULL != jobp); return; } DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num)); if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) { fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name); } PR_Close(sockfd); PR_EnterMonitor(sp->exit_mon); --(*sp->job_counterp); PR_Notify(sp->exit_mon); PR_ExitMonitor(sp->exit_mon); PR_DELETE(sp->in_buf); PR_DELETE(sp); return;}/* * Serve_Client * Thread, started by the server, for serving a client connection. * Reads data from socket and writes it back, unmodified, and * closes the socket */static void PR_CALLBACKServe_Client(void *arg){ Serve_Client_Param *scp = (Serve_Client_Param *) arg; buffer *in_buf; Session *sp; PRJob *jobp; sp = PR_NEW(Session); sp->iod = scp->iod; in_buf = PR_NEW(buffer); if (in_buf == NULL) { fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name); failed_already=1; return; } sp->in_buf = in_buf; sp->bytes = scp->datalen; sp->msg_num = 0; sp->bytes_read = 0; sp->tp = scp->tp; sp->exit_mon = scp->exit_mon; sp->job_counterp = scp->job_counterp; sp->iod.timeout = PR_SecondsToInterval(60); jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, PR_FALSE); PR_ASSERT(NULL != jobp); PR_DELETE(scp);}static voidprint_stats(void *arg){ Server_Param *sp = (Server_Param *) arg; PRThreadPool *tp = sp->tp; PRInt32 counter; PRJob *jobp; PR_EnterMonitor(sp->exit_mon); counter = (*sp->job_counterp); PR_ExitMonitor(sp->exit_mon); printf("PRINT_STATS: #client connections = %d\n",counter); jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), print_stats, sp, PR_FALSE); PR_ASSERT(NULL != jobp);}static int job_counter = 0;/* * TCP Server * Server binds an address to a socket, starts a client process and * listens for incoming connections. * Each client connects to the server and sends a chunk of data * Starts a Serve_Client job for each incoming connection, to read * the data from the client and send it back to the client, unmodified. * Each client checks that data received from server is same as the * data it sent to the server.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -