⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sbase.c

📁 这是基于C语言开发的分布式搜索源代码
💻 C
字号:
#include "core.h"#include "sbase.h"#include "server.h"#include "utils/timer.h"#include "utils/buffer.h"#include "utils/chunk.h"#include "utils/log.h"#ifndef assignval#define assignval(_dst, _val, _op, _dval)\{\	if( _val _op  _dval)\	{\		_dst = _dval;\	}\	else\	{\		_dst = _val ;\	}\}#endif #ifndef toval#define toval(_dst, _val, _dval){ \	if(_val > 0 && _val < _dval) \		_dst = _val; \	else \		_dst = _dval; \}#endif#ifndef SETRLIMIT#define SETRLIMIT(NAME, RLIM, rlim_set)\{\	struct rlimit rlim;\	rlim.rlim_cur = rlim_set;\	rlim.rlim_max = rlim_set;\	if(setrlimit(RLIM, (&rlim)) != 0) {\		_ERROR_LOG("setrlimit RLIM[%s] cur[%ld] max[%ld] failed, %s",\				NAME, rlim.rlim_cur, rlim.rlim_max, strerror(errno));\		 _exit(-1);\	} else {\		_DEBUG_LOG("setrlimit RLIM[%s] cur[%ld] max[%ld]",\				NAME, rlim.rlim_cur, rlim.rlim_max);\	}\}#define GETRLIMIT(NAME, RLIM)\{\	struct rlimit rlim;\	if(getrlimit(RLIM, &rlim) != 0 ) {\		_ERROR_LOG("regetrlimit RLIM[%s] failed, %s",\				NAME, strerror(errno));\	} else {\		_DEBUG_LOG("getrlimit RLIM[%s] cur[%ld] max[%ld]", \				NAME, rlim.rlim_cur, rlim.rlim_max);\	}\}#endif#define HANDLER_CHECK_RET(handler, ret)\{\	if(handler == NULL)\	{\		_ERROR_LOG("FATAL: handler is NULL\n");\		return ret;\	}\}#define HANDLER_CHECK(handler)\{\        if(handler == NULL)\        {\                _ERROR_LOG("FATAL: handler is NULL\n");\                return ;\        }\}int sbase_init(struct _SBASE *);int sbase_start(struct _SBASE *);int sbase_stop(struct _SBASE *);void sbase_clean(struct _SBASE *);/* push cache */int sbase_push_cache(const HANDLER *, const void *, const size_t);/* recv  CHUNK  from remote */int sbase_recv_chunk(const HANDLER *, const size_t);/* recv FILE CHUNK from remote */int sbase_recv_file(const HANDLER *, const char *, const uint64_t, const uint64_t );/* send a chunk */int sbase_push_chunk(const HANDLER *, const void *, const size_t);/* send file */int sbase_push_file(const HANDLER *, const char *, const uint64_t, const uint64_t );/* close connection */int sbase_close_handler(const HANDLER *);/* caculate data sum */short sbase_datasum(void *, int);/* setting resource limit */int set_rlimit(rlim_t , rlim_t);/* Initialize struct sbase */SBASE *sbase(){	SBASE *sb = (SBASE *)calloc(1, sizeof(SBASE));		if(sb == NULL )	{		_ERROR_LOG("ERROR:Calloc NEW SBASE failed, %s", strerror(errno));		return NULL;	}	/* methods init */        sb->init        	= sbase_init;        sb->start       	= sbase_start;        sb->stop        	= sbase_stop;	sb->clean		= sbase_clean;        sb->push_cache 		= sbase_push_cache;	sb->recv_chunk		= sbase_recv_chunk;	sb->recv_file		= sbase_recv_file;        sb->push_chunk		= sbase_push_chunk;        sb->push_file   	= sbase_push_file;        sb->close_handler 	= sbase_close_handler;	return sb;}/* Initialize SBASE */int sbase_init(SBASE *sb){	SERVER *sv = NULL;	assert(sb);	/* check running status */	if(sb->sv && ((SERVER *)(sb->sv))->running_status )	{		_ERROR_LOG("ERROR:sbase is running");		return -1;	}	else	{		sb->sv = (void *)server_init();	}	sv = (SERVER *)sb->sv;		sv->sb = sb;	/* Server  type */	sv->server_type = (sb->server_type)?sb->server_type: SERVER_NORMAL;	/* Check value */	if( !(sb->sock_t & INET_SOCK_MIX) )	{		_ERROR_LOG("ERROR:Invalide SOCKET TYPE");		return -1;	}	if( !(sb->domain & INET_DOMAIN_MIX) )	{		_ERROR_LOG("ERROR:Invalide DOMAIN TYPE");		return -1;	}	if( sb->port <= 0 )	{		_ERROR_LOG("ERROR:Invalide PORT TYPE");		return -1;	}	sv->sock_t	= sb->sock_t;	sv->domain	= sb->domain;	sv->family	= sb->family;	sv->ip  	= sb->ip;	sv->port  	= sb->port;	/* set log */	sv->log  	= log_init(sb->logfile);		if(sb->packet_t & PACKET_T) sv->packet_t = sb->packet_t;	else sv->packet_t = PACKET_T_DEFAULT;		if(sb->delimiter && sb->delimiter_len > 0)	{		sv->delimiter = sb->delimiter;		sv->delimiter_len = sb->delimiter_len;	}	else	{		sv->delimiter = NULL;                sv->delimiter_len = 1;	}	sv->buf_size = sb->buf_size;	toval(sv->backlog, sb->backlog, INET_BACKLOG);	toval(sv->packet_len, sb->packet_len, MAX_PACKET_LEN);	toval(sv->max_threads, sb->max_threads, MAX_THREADS);	toval(sv->max_connections, sb->max_connections, MAX_CONNECTIONS);	assignval(sv->sleep_usec, sb->sleep_usec, < , MIN_SLEEP_USEC);	assignval(sv->heartbeat_interval, sb->heartbeat_interval, < , MIN_HEARTBEAT_INTERVAL);	assignval(sv->conn_timeout, sb->conn_timeout, < , MIN_CONN_TIMEOUT);	/* set resouce limit */	SETRLIMIT("RLIMIT_AS", RLIMIT_AS, RLIM_INFINITY);        //SETRLIMIT("RLIMIT_CORE", RLIMIT_CORE, RLIM_INFINITY);        SETRLIMIT("RLIMIT_CPU", RLIMIT_CPU, RLIM_INFINITY);        SETRLIMIT("RLIMIT_DATA", RLIMIT_DATA, RLIM_INFINITY);        SETRLIMIT("RLIMIT_FSIZE", RLIMIT_FSIZE, RLIM_INFINITY);        SETRLIMIT("RLIMIT_LOCKS", RLIMIT_LOCKS, RLIM_INFINITY);        SETRLIMIT("RLIMIT_MEMLOCK", RLIMIT_MEMLOCK, RLIM_INFINITY);        SETRLIMIT("RLIMIT_MSGQUEUE", RLIMIT_MSGQUEUE, RLIM_INFINITY);        //SETRLIMIT("RLIMIT_NICE", RLIMIT_NICE, RLIM_INFINITY);        SETRLIMIT("RLIMIT_NOFILE", RLIMIT_NOFILE, MAX_CONNECTIONS);        SETRLIMIT("RLIMIT_NPROC", RLIMIT_NPROC, RLIM_INFINITY);        SETRLIMIT("RLIMIT_RSS", RLIMIT_RSS, RLIM_INFINITY);        //SETRLIMIT("RLIMIT_RTPRIO", RLIMIT_RTPRIO, RLIM_INFINITY);        SETRLIMIT("RLIMIT_SIGPENDING", RLIMIT_SIGPENDING, RLIM_INFINITY);        SETRLIMIT("RLIMIT_STACK", RLIMIT_STACK, RLIM_INFINITY);		/*  Callback Handler */	sv->heartbeat_handler 	= sb->heartbeat_handler;	sv->packet_reader  	= sb->packet_reader;	sv->packet_handler 	= sb->packet_handler;	sv->data_handler	= sb->data_handler;	sv->oob_handler		= sb->oob_handler;	return sv->init(sv);}/* clean SBASE */void sbase_clean(struct _SBASE *sb){	((SERVER *)sb->sv)->clean((SERVER **)&(sb->sv));}/* starting SBASE */int sbase_start(struct _SBASE *sb){	SERVER *sv = NULL;	if(sb && sb->sv)	{		sv = (SERVER *) sb->sv;                sv->start(sv);	}        return -1;}/* stopping SBASE */int sbase_stop(struct _SBASE *sb){	SERVER *sv = NULL;        if(sb && sb->sv)	{		sv = (SERVER *) sb->sv;		//if(sv->running_status)		_DEBUG_LOG("Terminating SBASE ");               	sv->stop(sv);		if(sv) free(sv);		return 0;	}        return -1;}/* caculating data sum */short sbase_datasum(void *data, int len){        int u = 0;        char *p = (char *)data;        int unit_len = sizeof(short);        short sum = 0;	assert(data);        u = (len % unit_len)?((len/unit_len) + 1):(len/unit_len);        while(u-- >  0){                sum += ~((short)*p);                p += unit_len;        }        return sum;}/* push cache */int sbase_push_cache(const HANDLER *handler, const void *data, const size_t size){	HANDLER_CHECK_RET(handler, -1);	if(handler && data && size > 0)	{		((SESSION *)handler)->push_cache(((SESSION *)handler), (void *)data, (size_t)size);		return 0;	}		return -1;}/* recv  CHUNK  from remote */int sbase_recv_chunk(const HANDLER *handler, const size_t len){	HANDLER_CHECK_RET(handler, -1);	((SESSION *)handler)->transaction_state = READ_CHUNK_STATE;	_DEBUG_LOG("Ready for receiving CHUNK SIZE:%u", len);	((SESSION *)handler)->chunk->set(((SESSION *)handler)->chunk,		 ((SESSION *)handler)->transaction_id, MEM_CHUNK, NULL, 0, len); 	return ((SESSION *)handler)->chunk_reader((SESSION *)handler);}/* recv FILE CHUNK from remote */int sbase_recv_file(const HANDLER *handler, const char *filename,	 const uint64_t offset, const uint64_t len){	HANDLER_CHECK_RET(handler, -1);	((SESSION *)handler)->transaction_state = READ_CHUNK_STATE;       	((SESSION *)handler)->chunk->set(((SESSION *)handler)->chunk,		 ((SESSION *)handler)->transaction_id,		 FILE_CHUNK, (char *)filename, (uint64_t)offset, (uint64_t)len);	 	return ((SESSION *)handler)->chunk_reader((SESSION *)handler);}/* push chunk to SESSION via handler */int sbase_push_chunk(const HANDLER *handler, const void *data, const size_t len){	HANDLER_CHECK_RET(handler, -1);	if(data == NULL ) return -1;	return ((SESSION *)handler)->push_chunk(((SESSION *)handler), (void *)data, (size_t) len);	}/* push file */int sbase_push_file(const HANDLER *handler, const char *file,		 const uint64_t offset, const uint64_t len){	HANDLER_CHECK_RET(handler, -1);	return ((SESSION *)handler)->push_file(((SESSION *)handler),			(char *)file, (uint64_t )offset, (uint64_t)len);}/* close SESSION */int sbase_close_handler(const HANDLER *handler){	HANDLER_CHECK_RET(handler, -1);        ((SESSION *)handler)->push_message(((SESSION *)handler), MESSAGE_QUIT);	return 0;}/*reset resource limit */int set_rlimit(rlim_t rlim, rlim_t rlim_set){        int ret;        struct rlimit rl, rlset;        memset(&rl, 0, sizeof(struct rlimit));        if( (ret = getrlimit(rlim, &rl)) != 0 )	{                _ERROR_LOG("ERROR:Getting resource limit failed, %s",strerror(errno));                return -1;        }        if(rl.rlim_max < rlim_set)	{                rl.rlim_cur = rlim_set;                rl.rlim_max = rlim_set;                if((ret = setrlimit(rlim, &rl)) != 0 )		{                        _ERROR_LOG("ERROR:Setting resource limit failed, %s",                                        strerror(errno));                        return -1;                }                if((ret = getrlimit(rlim, &rl)) != 0                                || rl.rlim_max < rlim_set )		{                        _ERROR_LOG("ERROR:setting resource limit result error, %s",                                        strerror(errno));                        return -1;                }		else		{                        _DEBUG_LOG("DEBUG:setting resource[0x%0x] cur[%d] max[%d]",                                        rlim, rl.rlim_cur, rl.rlim_max);                }        }        return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -