📄 provider.c
字号:
filebytes = 0; while (filebytes < descbytes) { netbytes = sizeof(buffer); if ((descbytes - filebytes) < netbytes) netbytes = descbytes - filebytes; TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tProcessRequest(0x%p): receive %d bytes\n", me, netbytes)); bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout); if (-1 == bytes) { rv = PR_FAILURE; if (Aborted(rv)) goto aborted; if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): receive data timeout\n", me)); goto aborted; } /* * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED) * on NT here. This is equivalent to ECONNRESET on Unix. * -wtc */ TEST_LOG( cltsrv_log_file, TEST_LOG_WARNING, ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n", me, PR_GetError(), PR_GetOSError())); goto aborted; } if(0 == bytes) { TEST_LOG( cltsrv_log_file, TEST_LOG_WARNING, ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me)); rv = PR_FAILURE; goto aborted; } filebytes += bytes; netbytes = bytes; /* The byte count for PR_Write should be positive */ MY_ASSERT(netbytes > 0); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tProcessRequest(0x%p): write %d bytes to file\n", me, netbytes)); bytes = PR_Write(file, buffer, netbytes); if (netbytes != bytes) { rv = PR_FAILURE; if (Aborted(rv)) goto aborted; if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): write file timeout\n", me)); goto aborted; } } TEST_ASSERT(bytes > 0); } PR_Lock(server->ml); server->operations += 1; server->bytesTransferred += filebytes; PR_Unlock(server->ml); rv = PR_Close(file); file = NULL; if (Aborted(rv)) goto aborted; TEST_ASSERT(PR_SUCCESS == rv); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\t\tProcessRequest(0x%p): opening %s\n", me, descriptor->filename)); file = PR_Open(descriptor->filename, PR_RDONLY, 0); if (NULL == file) { rv = PR_FAILURE; if (Aborted(rv)) goto aborted; if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): open file timeout\n", PR_CurrentThread())); goto aborted; } TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n", me, PR_GetError(), PR_GetOSError())); goto aborted; } TEST_ASSERT(NULL != file); netbytes = 0; while (netbytes < descbytes) { filebytes = sizeof(buffer); if ((descbytes - netbytes) < filebytes) filebytes = descbytes - netbytes; TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\tProcessRequest(0x%p): read %d bytes from file\n", me, filebytes)); bytes = PR_Read(file, buffer, filebytes); if (filebytes != bytes) { rv = PR_FAILURE; if (Aborted(rv)) goto aborted; if (PR_IO_TIMEOUT_ERROR == PR_GetError()) TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): read file timeout\n", me)); else TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n", me, PR_GetError(), PR_GetOSError())); goto aborted; } TEST_ASSERT(bytes > 0); netbytes += bytes; filebytes = bytes; TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\t\tProcessRequest(0x%p): sending %d bytes\n", me, filebytes)); bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout); if (filebytes != bytes) { rv = PR_FAILURE; if (Aborted(rv)) goto aborted; if (PR_IO_TIMEOUT_ERROR == PR_GetError()) { TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tProcessRequest(0x%p): send data timeout\n", me)); goto aborted; } break; } TEST_ASSERT(bytes > 0); } PR_Lock(server->ml); server->bytesTransferred += filebytes; PR_Unlock(server->ml); rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH); if (Aborted(rv)) goto aborted; rv = PR_Close(file); file = NULL; if (Aborted(rv)) goto aborted; TEST_ASSERT(PR_SUCCESS == rv);aborted: PR_ClearInterrupt(); if (NULL != file) PR_Close(file); drv = PR_Delete(descriptor->filename); TEST_ASSERT(PR_SUCCESS == drv);exit: TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\t\tProcessRequest(0x%p): Finished\n", me)); PR_DELETE(descriptor);#if defined(WIN95) PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */#endif return rv;} /* ProcessRequest */typedef void (*StartFn)(void*);typedef struct StartObject{ StartFn start; void *arg;} StartObject;#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)#include "md/_pth.h"#include <pthread.h>static void *pthread_start(void *arg){ StartObject *so = (StartObject*)arg; StartFn start = so->start; void *data = so->arg; PR_Free(so); start(data); return NULL;} /* pthread_start */#endif /* defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) */#if defined(IRIX) && !defined(_PR_PTHREADS)#include <sys/types.h>#include <sys/prctl.h>static void sproc_start(void *arg, PRSize size){ StartObject *so = (StartObject*)arg; StartFn start = so->start; void *data = so->arg; PR_Free(so); start(data);} /* sproc_start */#endif /* defined(IRIX) && !defined(_PR_PTHREADS) */#if defined(WIN32)#include <process.h> /* for _beginthreadex() */static PRUintn __stdcall windows_start(void *arg){ StartObject *so = (StartObject*)arg; StartFn start = so->start; void *data = so->arg; PR_Free(so); start(data); return 0;} /* windows_start */#endif /* defined(WIN32) */static PRStatus JoinThread(PRThread *thread){ PRStatus rv; switch (thread_provider) { case thread_nspr: rv = PR_JoinThread(thread); break; case thread_pthread:#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) rv = PR_SUCCESS; break;#endif /* defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) */ case thread_win32:#if defined(WIN32) rv = PR_SUCCESS; break;#endif default: rv = PR_FAILURE; break; } return rv; } /* JoinThread */static PRStatus NewThread( StartFn start, void *arg, PRThreadPriority prio, PRThreadState state){ PRStatus rv; switch (thread_provider) { case thread_nspr: { PRThread *thread = PR_CreateThread( PR_USER_THREAD, start, arg, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, 0); rv = (NULL == thread) ? PR_FAILURE : PR_SUCCESS; } break; case thread_pthread:#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) { int rv; pthread_t id; pthread_attr_t tattr; StartObject *start_object; start_object = PR_NEW(StartObject); PR_ASSERT(NULL != start_object); start_object->start = start; start_object->arg = arg; rv = _PT_PTHREAD_ATTR_INIT(&tattr); PR_ASSERT(0 == rv); rv = pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); PR_ASSERT(0 == rv);#if !defined(LINUX) rv = pthread_attr_setstacksize(&tattr, 64 * 1024); PR_ASSERT(0 == rv);#endif rv = _PT_PTHREAD_CREATE(&id, tattr, pthread_start, start_object); (void)_PT_PTHREAD_ATTR_DESTROY(&tattr); return (0 == rv) ? PR_SUCCESS : PR_FAILURE; }#else PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); rv = PR_FAILURE;#endif /* defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) */ break; case thread_sproc:#if defined(IRIX) && !defined(_PR_PTHREADS) { PRInt32 pid; StartObject *start_object; start_object = PR_NEW(StartObject); PR_ASSERT(NULL != start_object); start_object->start = start; start_object->arg = arg; pid = sprocsp( sproc_start, PR_SALL, start_object, NULL, 64 * 1024); rv = (0 < pid) ? PR_SUCCESS : PR_FAILURE; }#else PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); rv = PR_FAILURE;#endif /* defined(IRIX) && !defined(_PR_PTHREADS) */ break; case thread_win32:#if defined(WIN32) { void *th; PRUintn id; StartObject *start_object; start_object = PR_NEW(StartObject); PR_ASSERT(NULL != start_object); start_object->start = start; start_object->arg = arg; th = (void*)_beginthreadex( NULL, /* LPSECURITY_ATTRIBUTES - pointer to thread security attributes */ 0U, /* DWORD - initial thread stack size, in bytes */ windows_start, /* LPTHREAD_START_ROUTINE - pointer to thread function */ start_object, /* LPVOID - argument for new thread */ 0U, /*DWORD dwCreationFlags - creation flags */ &id /* LPDWORD - pointer to returned thread identifier */ ); rv = (NULL == th) ? PR_FAILURE : PR_SUCCESS; }#else PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); rv = PR_FAILURE;#endif break; default: PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); rv = PR_FAILURE; } return rv;} /* NewThread */static PRStatus CreateWorker(CSServer_t *server, CSPool_t *pool){ PRStatus rv; CSWorker_t *worker = PR_NEWZAP(CSWorker_t); worker->server = server; PR_INIT_CLIST(&worker->element); rv = NewThread( Worker, worker, DEFAULT_SERVER_PRIORITY, PR_UNJOINABLE_THREAD); if (PR_FAILURE == rv) PR_DELETE(worker); TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS, ("\tCreateWorker(0x%p): create new worker (0x%p)\n", PR_CurrentThread(), worker->thread)); return rv;} /* CreateWorker */static void PR_CALLBACK Worker(void *arg){ PRStatus rv; PRNetAddr from; PRFileDesc *fd = NULL; CSWorker_t *worker = (CSWorker_t*)arg; CSServer_t *server = worker->server; CSPool_t *pool = &server->pool; PRThread *me = worker->thread = PR_CurrentThread(); TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("\t\tWorker(0x%p): started [%u]\n", me, pool->workers + 1)); PR_Lock(server->ml); PR_APPEND_LINK(&worker->element, &server->list); pool->workers += 1; /* define our existance */ while (cs_run == server->state) { while (pool->accepting >= server->workers.accepting) { TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\t\tWorker(0x%p): waiting for accept slot[%d]\n", me, pool->accepting)); rv = PR_WaitCondVar(pool->acceptComplete, PR_INTERVAL_NO_TIMEOUT); if (Aborted(rv) || (cs_run != server->state)) { TEST_LOG( cltsrv_log_file, TEST_LOG_NOTICE, ("\tWorker(0x%p): has been %s\n", me, (Aborted(rv) ? "interrupted" : "stopped"))); goto exit; } } pool->accepting += 1; /* how many are really in accept */ PR_Unlock(server->ml); TEST_LOG( cltsrv_log_file, TEST_LOG_VERBOSE, ("\t\tWorker(0x%p): calling accept\n", me)); fd = PR_Accept(server->listener, &from, PR_INTERVAL_NO_TIMEOUT); PR_Lock(server->ml); pool->accepting -= 1; PR_NotifyCondVar(pool->acceptComplete); if ((NULL == fd) && Aborted(PR_FAILURE)) { if (NULL != server->listener) { PR_Close(server->listener); server->listener = NULL; } goto exit; } if (NULL != fd) { /* ** Create another worker of the total number of workers is ** less than the minimum specified or we have none left in ** accept() AND we're not over the maximum. ** This sort of presumes that the number allowed in accept ** is at least as many as the minimum. Otherwise we'll keep ** creating new threads and deleting them soon after. */ PRBool another = ((pool->workers < server->workers.minimum) || ((0 == pool->accepting) && (pool->workers < server->workers.maximum))) ? PR_TRUE : PR_FALSE; pool->active += 1; PR_Unlock(server->ml); if (another) (void)CreateWorker(server, pool); rv = ProcessRequest(fd, server); if (PR_SUCCESS != rv) TEST_LOG( cltsrv_log_file, TEST_LOG_ERROR, ("\t\tWorker(0x%p): server process ended abnormally\n", me)); (void)PR_Close(fd); fd = NULL; PR_Lock(server->ml); pool->active -= 1; } }exit: PR_ClearInterrupt(); PR_Unlock(server->ml); if (NULL != fd) { (void)PR_Shutdown(fd, PR_SHUTDOWN_BOTH); (void)PR_Close(fd); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -