⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp_cache_server.cpp

📁 C++ web POSIX framewark
💻 CPP
字号:
#include "asio_config.h"// MUST BE FIRST TO COMPILE CORRECTLY UNDER CYGWIN#include "tcp_cache_protocol.h"#include "session_storage.h"#include "archive.h"#include "thread_cache.h"#include <boost/bind.hpp>#include <boost/shared_ptr.hpp>#include <boost/enable_shared_from_this.hpp>#include <ctime>#include <cstdlib>#include <pthread.h>#include <signal.h>#include <boost/date_time/posix_time/posix_time.hpp>#include "session_file_storage.h"#ifdef EN_SQLITE_SESSIONS#include "session_sqlite_storage.h"#endifusing namespace std;using namespace cppcms;using boost::shared_ptr;class session : public boost::enable_shared_from_this<session> {	vector<char> data_in;	string data_out;	cppcms::tcp_operation_header hout;	cppcms::tcp_operation_header hin;public:	tcp::socket socket_;	base_cache &cache;	session_server_storage &sessions;	session(aio::io_service &srv,base_cache &c,session_server_storage &s) : 		socket_(srv), cache(c),sessions(s)	{	}	void run()	{		aio::async_read(socket_,aio::buffer(&hin,sizeof(hin)),				boost::bind(&session::on_header_in,shared_from_this(),						aio::placeholders::error));	}	void on_header_in(error_code const &e)	{		data_in.clear();		data_in.resize(hin.size);		aio::async_read(socket_,aio::buffer(data_in,hin.size),				boost::bind(&session::on_data_in,shared_from_this(),						aio::placeholders::error));	}	void fetch_page()	{		string key;		key.assign(data_in.begin(),data_in.end());		if(cache.fetch_page(key,data_out,hin.operations.fetch_page.gzip)) {			hout.opcode=opcodes::page_data;			hout.size=data_out.size();			hout.operations.page_data.strlen=data_out.size();		}		else {			hout.opcode=opcodes::no_data;		}	}	void fetch()	{		archive a;		set<string> tags;		string key;		key.assign(data_in.begin(),data_in.end());		if(!cache.fetch(key,a,tags)) {			hout.opcode=opcodes::no_data;		}		else {			hout.opcode=opcodes::data;			data_out=a.get();			hout.operations.data.data_len=data_out.size();			for(set<string>::iterator p=tags.begin(),e=tags.end();p!=e;++p) {				data_out.append(p->c_str(),p->size()+1);			}			hout.operations.data.triggers_len=data_out.size()-hout.operations.data.data_len;			hout.size=data_out.size();		}	}	void rise()	{		string key;		key.assign(data_in.begin(),data_in.end());		cache.rise(key);		hout.opcode=opcodes::done;	}	void clear()	{		cache.clear();		hout.opcode=opcodes::done;	}	void stats()	{		unsigned k,t;		cache.stats(k,t);		hout.opcode=opcodes::out_stats;		hout.operations.out_stats.keys=k;		hout.operations.out_stats.triggers=t;	}	bool load_triggers(set<string> &triggers,char const *start,unsigned len)	{		int slen=len;		while(slen>0) {			unsigned size=strlen(start);			if(size==0) {				return false;			}			string tmp;			tmp.assign(start,size);			slen-=size+1;			start+=size+1;			triggers.insert(tmp);		}		return true;	}	void store()	{		set<string> triggers;		if(	hin.operations.store.key_len			+hin.operations.store.data_len			+hin.operations.store.triggers_len != hin.size			|| hin.operations.store.key_len == 0)		{			hout.opcode=opcodes::error;			return;		}		string ts;		vector<char>::iterator p=data_in.begin()			+hin.operations.store.key_len			+hin.operations.store.data_len;		ts.assign(p,p + hin.operations.store.triggers_len);		if(!load_triggers(triggers,ts.c_str(),					hin.operations.store.triggers_len))		{			hout.opcode=opcodes::error;			return;		}		time_t now;		std::time(&now);		time_t timeout=now+(time_t)hin.operations.store.timeout;		string key;		key.assign(data_in.begin(),data_in.begin()+hin.operations.store.key_len);		string data;		data.assign(data_in.begin()+hin.operations.store.key_len,				data_in.begin() + hin.operations.store.key_len + hin.operations.store.data_len);		archive a(data);		cache.store(key,triggers,timeout,a);		hout.opcode=opcodes::done;	}		void save()	{		if(hin.size <= 32)		{			hout.opcode=opcodes::error;			return;		}		time_t timeout=hin.operations.session_save.timeout + time(NULL);		string sid(data_in.begin(),data_in.begin()+32);		string value(data_in.begin()+32,data_in.end());		sessions.save(sid,timeout,value);		hout.opcode=opcodes::done;	}	void load()	{		if(hin.size!=32) {			hout.opcode=opcodes::error;			return;		}		time_t timeout;		int toffset;		string sid(data_in.begin(),data_in.end());		if(!sessions.load(sid,&timeout,data_out) && (toffset=(timeout-time(NULL))) < 0) {			hout.opcode=opcodes::no_data;			return;		}		hout.opcode=opcodes::session_load_data;		hout.size=data_out.size();		hout.operations.session_data.timeout=toffset;	}	void remove()	{		if(hin.size!=32) {			hout.opcode=opcodes::error;			return;		}		string sid(data_in.begin(),data_in.end());		sessions.remove(sid);	}	void on_data_in(error_code const &e)	{		if(e) return;		memset(&hout,0,sizeof(hout));		switch(hin.opcode){		case opcodes::fetch_page:	fetch_page(); break;		case opcodes::fetch:		fetch(); break;		case opcodes::rise:		rise(); break;		case opcodes::clear:		clear(); break;		case opcodes::store:		store(); break;		case opcodes::stats:		stats(); break;		case opcodes::session_save:	save(); break;		case opcodes::session_load:	load(); break;		case opcodes::session_remove:	remove(); break;		default:			hout.opcode=opcodes::error;		}		async_write(socket_,aio::buffer(&hout,sizeof(hout)),			boost::bind(&session::on_header_out,shared_from_this(),				aio::placeholders::error));	}	void on_header_out(error_code const &e)	{		if(e) return;		if(hout.size==0) {			run();			return ;		}		async_write(socket_,aio::buffer(data_out.c_str(),hout.size),			boost::bind(&session::on_data_out,shared_from_this(),				aio::placeholders::error));	}	void on_data_out(error_code const &e)	{		if(e) return;		run();	}};class tcp_cache_server  {	tcp::acceptor acceptor_;	base_cache &cache;	session_server_storage &sessions;	void on_accept(error_code const &e,shared_ptr<session> s)	{		if(!e) {			tcp::no_delay nd(true);			s->socket_.set_option(nd);			s->run();			start_accept();		}	}	void start_accept()	{		shared_ptr<session> s(new session(acceptor_.io_service(),cache,sessions));		acceptor_.async_accept(s->socket_,boost::bind(&tcp_cache_server::on_accept,this,aio::placeholders::error,s));	}public:	tcp_cache_server(	aio::io_service &io,				string ip,				int port,				base_cache &c,				session_server_storage &s) : 		acceptor_(io,			  tcp::endpoint(aio::ip::address::from_string(ip),			  port)),		cache(c),		sessions(s)	{		start_accept();	}};struct params {	bool en_cache;	enum { none , files , sqlite3 } en_sessions;	string session_backend;	string session_file;	string session_dir;	int items_limit;	int gc_frequency;	int files_no;	int port;	string ip;	int threads;	void help()	{		cerr<<	"Usage cppcms_tcp_scale [parameter]\n"			"    --bind IP          ipv4/ipv6 IPto bind (default 0.0.0.0)\n"			"    --port N           port to bind -- MANDATORY\n"			"    --threads N        number of threads, default 1\n"			"    --cache            Enable cache module\n"			"    --limit N          maximal Number of items to store\n"			"                       mandatory if cache enabled\n"			"    --session-files    Enable files bases session backend\n"			"    --dir              Directory where files stored\n"			"                       mandatory if session-files enabled\n"			"    --gc N             gc frequencty seconds (default 600)\n"			"                       it is enabled if threads > 1\n"#ifdef EN_SQLITE_SESSIONS			"    --session-sqlite3  Enable sqlite session backend\n"			"    --file             Sqlite3 DB file. Mandatory for sqlite\n"			"                       session backend\n"			"    --dbfiles          Number of DB files, default 0\n"			"                       0->1 file, 1-> 2 files, 2 -> 4 files, etc\n"#endif			"\n"			"    At least one of   --session-files,"#ifdef EN_SQLITE_SESSIONS			" --session-sqlite3,"#endif			" --cache\n"			"    should be defined\n"			"\n";	}	params(int argc,char **argv) :		en_cache(false),		en_sessions(none),		items_limit(-1),		gc_frequency(-1),		files_no(0),		port(-1),		ip("0.0.0.0"),		threads(1)	{		argv++;		while(*argv) {			string param=*argv;			char *next= *(argv+1);			if(param=="--bind" && next) {				ip=next;				argv++;			}			else if(param=="--port" && next) {				port=atoi(next);				argv++;			}			else if(param=="--threads" && next) {				threads=atoi(next);				argv++;			}			else if(param=="--gc" && next) {				gc_frequency=atoi(next);				argv++;			}			else if(param=="--limit" && next) {				items_limit=atoi(next);				argv++;			}			else if(param=="--session-files") {				en_sessions=files;			}			else if(param=="--dir" && next) {				session_dir=next;				argv++;			}#ifdef EN_SQLITE_SESSIONS			else if(param=="--file" && next) {				session_file=next;				argv++;			}			else if(param=="--dbfiles" && next) {				files_no=atoi(next);				argv++;			}			else if(param=="--session-sqlite3") {				en_sessions=sqlite3;			}#endif			else if(param=="--cache") {				en_cache=true;			}			else {				help();				throw runtime_error("Incorrect parameter:"+param);			}			argv++;		}		if(!en_cache && !en_sessions) {			help();			throw runtime_error("Neither cache nor sessions mods are defined");		}		if(en_sessions == files && session_dir.empty()) {			help();			throw runtime_error("parameter --dir undefined");		}		if(en_sessions == sqlite3 && session_file.empty()) {			help();			throw runtime_error("patameter --file undefined");		}		if(files_no == -1) files_no=1;		if(port==-1) {			help();			throw runtime_error("parameter --port undefined");		}		if(en_cache && items_limit == -1) {			help();			throw runtime_error("parameter --limit undefined");		}		if(gc_frequency != -1) {			if(threads == 1) {				throw runtime_error("You have to use more then one thread to enable gc");			}		}		if(threads > 1 && gc_frequency==-1) {			gc_frequency = 600;		}	}};class garbage_collector{	aio::deadline_timer timer;	boost::shared_ptr<storage::io> io;	int seconds;	void submit()	{		timer.expires_from_now(boost::posix_time::seconds(seconds));		timer.async_wait(boost::bind(&garbage_collector::gc,this,_1));	}	void gc(error_code const &e)	{		session_file_storage::gc(io);		submit();	}public:	garbage_collector(aio::io_service &srv,int sec,boost::shared_ptr<storage::io> io_) :		timer(srv),		seconds(sec),		io(io_)	{		submit();		}};void *thread_function(void *ptr){	aio::io_service &io=*(aio::io_service *)(ptr);	bool stop=false;	try{		while(!stop) {			try {				io.run();				stop=true;			}			catch(cppcms_error const &e) {				// Not much to do...				// Object will be destroyed automatically 				// Because it does not resubmit itself				fprintf(stderr,"CppCMS Error %s\n",e.what());			}		}	}	catch(exception const &e)	{		fprintf(stderr,"Fatal:%s",e.what());	}	catch(...){		fprintf(stderr,"Unknown exception");	}	io.stop();	return NULL;}int main(int argc,char **argv){	try 	{		params par(argc,argv);		aio::io_service io;		auto_ptr<base_cache> cache;		auto_ptr<session_server_storage> storage;		auto_ptr <garbage_collector> gc;		if(par.en_cache)			cache.reset(new thread_cache(par.items_limit));		else			cache.reset(new base_cache());		if(par.en_sessions==params::files) {			boost::shared_ptr<storage::io> storage_io(new storage::thread_io(par.session_dir));			storage.reset(new session_file_storage(storage_io));			if(par.threads > 1 && par.gc_frequency > 0) {				gc.reset(new garbage_collector(io,par.gc_frequency,storage_io));			}		}#ifdef EN_SQLITE_SESSIONS		else if(par.en_sessions==params::sqlite3) {			boost::shared_ptr<storage::sqlite_N>				sql(new storage::sqlite_N(par.session_file,1<<par.files_no,true,1000,5));			storage.reset(new session_sqlite_storage(sql));		}#endif		else {			storage.reset(new empty_session_server_storage());		}				tcp_cache_server srv_cache(io,par.ip,par.port,*cache,*storage);		sigset_t new_mask;		sigfillset(&new_mask);		sigset_t old_mask;		pthread_sigmask(SIG_BLOCK, &new_mask, &old_mask);		vector<pthread_t> threads;		threads.resize(par.threads);		int i;		for(i=0;i<par.threads;i++){			if(pthread_create(&threads[i],NULL,thread_function,&io)!=0) {				perror("pthread_create failed:");				io.stop();				for(i=i-1;i>=0;i--) {					pthread_join(threads[i],NULL);				}			}		}		// Restore previous mask		pthread_sigmask(SIG_SETMASK,&old_mask,0);		// Wait for signlas for exit		sigset_t wait_mask;		sigemptyset(&wait_mask);		sigaddset(&wait_mask, SIGINT);		sigaddset(&wait_mask, SIGQUIT);		sigaddset(&wait_mask, SIGTERM);		pthread_sigmask(SIG_BLOCK, &wait_mask, 0);		int sig = 0;		sigwait(&wait_mask, &sig);				cout<<"Catched signal:"<<sig<<" exiting..."<<endl;		io.stop();		for(i=0;i<par.threads;i++) {			pthread_join(threads[i],NULL);		}	}	catch(std::exception const &e) {		cerr<<"Error:"<<e.what()<<endl;		return 1;	}	cout<<"Done"<<endl;	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -