w32sock.cpp

来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C++ 代码 · 共 992 行 · 第 1/2 页

CPP
992
字号
//// Local windows sockets//int local_win_socket::read(void* buf, size_t min_size, size_t max_size, 			   time_t timeout){    time_t start = 0;    char* dst = (char*)buf;    size_t size = 0;    Error = ok;    if (timeout != WAIT_FOREVER) { 	start = time(NULL); 	timeout *= 1000; // convert seconds to miliseconds    }    while (size < min_size && state == ss_open) {	        RcvBuf->RcvWaitFlag = true;	serialize();	size_t begin = RcvBuf->DataBeg;	size_t end = RcvBuf->DataEnd;	size_t rcv_size = (begin <= end)	    ? end - begin : sizeof(RcvBuf->Data) - begin;	if (rcv_size > 0) { 	    RcvBuf->RcvWaitFlag = false;            if (rcv_size >= max_size) { 	        memcpy(dst, &RcvBuf->Data[begin], max_size);		begin += max_size;		size += max_size;	    } else { 	        memcpy(dst, &RcvBuf->Data[begin], rcv_size);		begin += rcv_size;		dst += rcv_size;		size += rcv_size;		max_size -= rcv_size;	    } 	    RcvBuf->DataBeg = (begin == sizeof(RcvBuf->Data)) ? 0 : begin;	    if (RcvBuf->SndWaitFlag) { 		SetEvent(Signal[RTR]);    	    }			} else {	    HANDLE h[2];	    h[0] = Signal[RD];	    h[1] = Mutex;	    int rc = WaitForMultipleObjects(2, h, false, timeout);	    RcvBuf->RcvWaitFlag = false;	    if (rc != WAIT_OBJECT_0) {		if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) { 		    Error = broken_pipe;		    ReleaseMutex(Mutex);		} else if (rc == WAIT_TIMEOUT) { 		    return size;		} else { 		    Error = GetLastError();		}		return -1;	    }	    if (timeout != WAIT_FOREVER) { 		time_t now = time(NULL);		timeout = timeout >= (now - start)*1000 		    ? timeout - (now - start)*1000 : 0;  	    }	}    }			    return size < min_size ? -1 : (int)size;}bool local_win_socket::write(const void* buf, size_t size){    char* src = (char*)buf;    Error = ok;    while (size > 0 && state == ss_open) {	        SndBuf->SndWaitFlag = true;	serialize();	size_t begin = SndBuf->DataBeg;	size_t end = SndBuf->DataEnd;	size_t snd_size = (begin <= end) 	    ? sizeof(SndBuf->Data) - end - (begin == 0)	    : begin - end - 1;	if (snd_size > 0) { 	    SndBuf->SndWaitFlag = false;            if (snd_size >= size) { 	        memcpy(&SndBuf->Data[end], src, size);		end += size;	        size = 0;	    } else { 	        memcpy(&SndBuf->Data[end], src, snd_size);		end += snd_size;		src += snd_size;		size -= snd_size;	    } 	    SndBuf->DataEnd = (end == sizeof(SndBuf->Data)) ? 0 : end;	    if (SndBuf->RcvWaitFlag) { 		SetEvent(Signal[TD]);    	    }			} else {	    HANDLE h[2];	    h[0] = Signal[RTT];	    h[1] = Mutex;	    int rc = WaitForMultipleObjects(2, h, false, INFINITE);	    RcvBuf->SndWaitFlag = false;	    if (rc != WAIT_OBJECT_0) {		if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) { 		    Error = broken_pipe;		    ReleaseMutex(Mutex);		} else { 		    Error = GetLastError();		}			return false;	    }	}    }				    return size == 0;}#define MAX_ADDRESS_LEN 64local_win_socket::local_win_socket(const char* address){    Name = new char[strlen(address)+1];    strcpy(Name, address);    Error = not_opened;    Mutex = NULL;} bool local_win_socket::open(int){    char buf[MAX_ADDRESS_LEN];	    int  i;    for (i = RD; i <= RTT; i++) {          sprintf(buf, "%s.%c", Name, i + '0');	Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, buf);	if (GetLastError() == ERROR_ALREADY_EXISTS) { 	    WaitForSingleObject(Signal[i], 0);	}	if (!Signal[i]) {	    Error = GetLastError();	    while (--i >= 0) { 		CloseHandle(Signal[i]);            }	    return false;        }	    }    sprintf(buf, "%s.shr", Name);    BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,                               0, sizeof(socket_buf)*2, buf);    if (!BufHnd) {	Error = GetLastError();	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	return false;    }    RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);    if (!RcvBuf) {	Error = GetLastError();	CloseHandle(BufHnd);	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	return false;    }	    SndBuf = RcvBuf+1;    RcvBuf->DataBeg = RcvBuf->DataEnd = 0;    SndBuf->DataBeg = SndBuf->DataEnd = 0;	     Error = ok;    state = ss_open;    return true;}local_win_socket::local_win_socket(){    int i;    BufHnd = NULL;    Mutex = NULL;     Name = NULL;    for (i = RD; i <= RTT; i++) {  	Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, NULL);	if (!Signal[i]) {	    Error = GetLastError();	    while (--i >= 0) { 		CloseHandle(Signal[i]);            }	    return;        }	    }    // create anonymous shared memory section    BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,                               0, sizeof(socket_buf)*2, NULL);    if (!BufHnd) {	Error = GetLastError();	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	return;    }    RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);    if (!RcvBuf) {	Error = GetLastError();	CloseHandle(BufHnd);	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	BufHnd = NULL;	return;    }	    SndBuf = RcvBuf+1;    RcvBuf->DataBeg = RcvBuf->DataEnd = 0;    SndBuf->DataBeg = SndBuf->DataEnd = 0;	     Error = ok;    state = ss_open;}local_win_socket::~local_win_socket(){    close();    delete[] Name;}	socket_t* local_win_socket::accept(){       HANDLE h[2];    if (state != ss_open) {		return NULL;    }		        connect_data* cdp = (connect_data*)SndBuf->Data;    cdp->Pid = GetCurrentProcessId();    cdp->Mutex = WatchDogMutex;    while (true) { 	SetEvent(Signal[RTR]);	int rc = WaitForSingleObject(Signal[RD], ACCEPT_TIMEOUT);	if (rc == WAIT_OBJECT_0) {	    if (state != ss_open) { 		Error = not_opened;		return NULL;	    }	    Error = ok;	    break;	} else if (rc != WAIT_TIMEOUT) { 	    Error = GetLastError();	    return NULL;	}    }    local_win_socket* sock = new local_win_socket();    sock->Mutex = ((connect_data*)RcvBuf->Data)->Mutex;    accept_data* adp = (accept_data*)SndBuf->Data;    adp->BufHnd = sock->BufHnd;    for (int i = RD; i <= RTT; i++) { 	adp->Signal[(i + TD - RD) & RTT] = sock->Signal[i];     }    SetEvent(Signal[TD]);    h[0] = Signal[RD];    h[1] = sock->Mutex;    int rc = WaitForMultipleObjects(2, h, false, INFINITE);    if (rc != WAIT_OBJECT_0) {	if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) { 	    Error = broken_pipe;	    ReleaseMutex(Mutex);	} else { 	    Error = GetLastError();	}		delete sock;	return NULL;    }        return sock;}bool local_win_socket::cancel_accept() {    state = ss_shutdown;    SetEvent(Signal[RD]);    SetEvent(Signal[RTT]);    return true;}    char* local_win_socket::get_peer_name(){    if (state != ss_open) { 	Error = not_opened;	return NULL;    }    char* addr = "127.0.0.1";    char* addr_copy = new char[strlen(addr)+1];    strcpy(addr_copy, addr);    Error = ok;    return addr_copy;}bool local_win_socket::is_ok(){    return !Error;}bool local_win_socket::close(){    if (state != ss_close) { 			state = ss_close;	if (Mutex) { 	    CloseHandle(Mutex);	}	for (int i = RD; i <= RTT; i++) { 	    CloseHandle(Signal[i]);        }	UnmapViewOfFile(RcvBuf < SndBuf ? RcvBuf : SndBuf);        CloseHandle(BufHnd);		Error = not_opened;    }    return true;}void local_win_socket::get_error_text(char* buf, size_t buf_size){    switch (Error) {       case ok:        strncpy(buf, "ok", buf_size);	break;      case not_opened:        strncpy(buf, "socket not opened", buf_size);	break;      case broken_pipe:        strncpy(buf, "connection is broken", buf_size);	break;      case timeout_expired:        strncpy(buf, "connection timeout expired", buf_size);	break;      default: 	#ifndef PHAR_LAP	FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,		      NULL,		      Error,		      0,		      buf,		      buf_size,		      NULL);#else	strncpy(buf, "Unknown socket error", buf_size);#endif    }}bool local_win_socket::shutdown(){    if (state == ss_open) {     	state = ss_shutdown;	SetEvent(Signal[RD]); 		SetEvent(Signal[RTT]); 	    }    return true;}bool local_win_socket::connect(int max_attempts, time_t timeout){    char buf[MAX_ADDRESS_LEN];    int  rc, i, error_code;    HANDLE h[2];    for (i = RD; i <= RTT; i++) {          sprintf(buf, "%s.%c", Name, ((i + TD - RD) & RTT) + '0');	Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, buf);	if (!Signal[i]) {	    Error = GetLastError();	    while (--i >= 0) { 		CloseHandle(Signal[i]);            }	    return false;        }	    }    sprintf(buf, "%s.shr", Name);    BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,                               0, sizeof(socket_buf)*2, buf);    if (!BufHnd) {	Error = GetLastError();	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	return false;    }    SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);    if (!SndBuf) { 	Error = GetLastError();	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	CloseHandle(BufHnd);	return false;    }    RcvBuf = SndBuf+1;    state = ss_shutdown;    Mutex = NULL;    rc = WaitForSingleObject(Signal[RTT],timeout*max_attempts*MILLISECOND);    if (rc != WAIT_OBJECT_0) {	error_code = rc == WAIT_TIMEOUT ? timeout_expired : GetLastError();	close();	Error = error_code;	return false;    }    connect_data* cdp = (connect_data*)RcvBuf->Data;    HANDLE hServer = OpenProcess(STANDARD_RIGHTS_REQUIRED|PROCESS_DUP_HANDLE,				 false, cdp->Pid);    if (!hServer) { 	error_code = GetLastError();	close();	Error = error_code;	return false;    }    HANDLE hSelf = GetCurrentProcess();    if (!DuplicateHandle(hServer, cdp->Mutex, hSelf, &Mutex, 			 0, false, DUPLICATE_SAME_ACCESS) ||	!DuplicateHandle(hSelf, WatchDogMutex, hServer, 			 &((connect_data*)SndBuf->Data)->Mutex, 			 0, false, DUPLICATE_SAME_ACCESS))    {        error_code = GetLastError();	CloseHandle(hServer);	close();	Error = error_code;	return false;    }    SetEvent(Signal[TD]);    h[0] = Signal[RD];    h[1] = Mutex;    rc = WaitForMultipleObjects(2, h, false, INFINITE);    if (rc != WAIT_OBJECT_0) { 	if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) { 	    error_code = broken_pipe;	    ReleaseMutex(Mutex);	} else { 	    error_code = GetLastError();	}	CloseHandle(hServer);	close();	Error = error_code;	return false;    }    accept_data ad = *(accept_data*)RcvBuf->Data;    SetEvent(Signal[TD]);    for (i = RD; i <= RTT; i++) { 	CloseHandle(Signal[i]);    }    UnmapViewOfFile(SndBuf);    CloseHandle(BufHnd);	    BufHnd = NULL;    if (!DuplicateHandle(hServer, ad.BufHnd, hSelf, &BufHnd, 			 0, false, DUPLICATE_SAME_ACCESS))    {	Error = GetLastError();	CloseHandle(hServer);	CloseHandle(Mutex); 	return false;    } else { 	for (i = RD; i <= RTT; i++) { 	    if (!DuplicateHandle(hServer, ad.Signal[i], 				 hSelf, &Signal[i], 				 0, false, DUPLICATE_SAME_ACCESS))	    {		Error = GetLastError();		CloseHandle(hServer);		CloseHandle(BufHnd); 		CloseHandle(Mutex); 		while (--i >= 0) CloseHandle(Signal[1]);		return false;	    }	}    }    CloseHandle(hServer);    SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);    if (!SndBuf) { 	Error = GetLastError();	CloseHandle(BufHnd); 	CloseHandle(Mutex); 	for (i = RD; i <= RTT; i++) {  	    CloseHandle(Signal[i]);        }	return false;    }    RcvBuf = SndBuf+1;    Error = ok;    state = ss_open;     return true;}int local_win_socket::get_handle(){    return -1;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?