📄 spidercore.cpp
字号:
string table = ((geturlParam *)lpParameter)->table; SpiderCore *sc = ((geturlParam *)lpParameter)->sc; int num_per_time = ((geturlParam *)lpParameter)->num_per_time; unsigned int t_number = ((geturlParam *)lpParameter)->thread_number; string domain = sc->domain; string ipaddr = sc->ipaddr; string path = sc->path; unsigned int port = sc->port; string step_param = ""; //变量参考GetMainURLProc中相应的变量 mysqlpp::Connection conn(false); mysqlpp::StoreQueryResult res; mysqlpp::Row row; DataBase db(sc->db_host, sc->db_user, sc->db_pwd, sc->db_name); string tmp_sql = ""; while (1) { if (!conn.connected()) { db.Connectdb(&conn, sc->encode); } pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname]);; mysqlpp::Query query=conn.query(); query << "SELECT * FROM " << table << " WHERE 0=content_flag & " << t_number << " ORDER BY id ASC" << " LIMIT " << num_per_time; tmp_sql = query.str(); if (!(res =query.store())) { string logstr="In function FunGetContentProc:Thread exit, mapname: " + mapname + ", DataBase error:" + conn.error(); fo.WriteToFileLn(spider_log_file, logstr, 1); return(1) ;//query failed } vector<string> id; vector<mysqlpp::Row >::size_type rows = res.num_rows(); vector<mysqlpp::Row >::size_type rows_i=0; query.reset(); query << "UPDATE " << table << " SET content_flag=content_flag | " << t_number << " WHERE id IN( "; while (rows_i < rows) { if (rows_i != 0) query << ","; query << res[rows_i][0]; rows_i++; } query << ")"; query.exec(); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname]); rows_i = 0; if (rows > 0 && !SPIDER_STOP) { while (rows_i < rows) { row=res[rows_i]; path = (string)row.at(1); step_param = (string)row.at(2); int geturl_rs=0; map<string, string>::iterator map_it; map_it = sc->config_map.find(mapname); if (map_it==sc->config_map.end()) { string logstr="In function FunGetContentProc:can not find mapname: " + mapname; fo.WriteToFileLn(spider_log_file, logstr, 1); return(1);//can not find mapname } if("pic"==sc->config_map[mapname + "_type"])//如果节点类型为pic则启动抓取图片模块 { geturl_rs = sc->GetFile(domain, path, port, mapname, &conn, step_param); }else{ geturl_rs = sc->FindContent(domain, path, port, mapname, &conn, step_param); } //if(geturl_rs == 0) { string id_str = (string)row.at(0); id.push_back(id_str); } rows_i++; } } else { string mapname_parent = sc->config_map[mapname + "_parent"]; map<string, unsigned int>::iterator it; pthread_mutex_lock(&count_lock);; it = thread_number.find(mapname_parent+"_content"); if (thread_number.find(mapname_parent+"_content") != thread_number.end() || thread_number.find(mapname_parent+"_url") != thread_number.end() ) { pthread_mutex_unlock(&count_lock); string mapname_parent_content_tmp = mapname_parent + "_content"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_parent_content_tmp]); pthread_cond_wait((pthread_cond_t *)thread_event[mapname_parent_content_tmp], (pthread_mutex_t *)thread_event_lock[mapname_parent_content_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_parent_content_tmp]); //pthread_cond_signal((pthread_cond_t *)thread_event[mapname_parent_content_tmp]); continue; } thread_number.erase(mapname); pthread_mutex_unlock(&count_lock); break; } } string logstr = "In function FunGetContentProc:Thread exit, mapname: " + mapname +",sql=" + tmp_sql; fo.WriteToFileLn(spider_log_file, logstr, 1); cout << "Thread exit, mapname:" << mapname << "_content" << " thread_number:" << t_number << endl; return(0);}int SpiderCore::FindContent(string domain, string path, unsigned int port, string mapname, mysqlpp::Connection *conn, string step_param){ string sql(""); string response(""); unsigned int subnum_i = 0, insert_rs=0; char subnum_char[12]={0}; vector<string> param; int i = 0; //找到插入数据库表名 map<string, string>::iterator map_table = this->config_map.find(mapname + "_table"); if (map_table==this->config_map.end()) { string logstr = "In function FindContent:can not find mapname: " + mapname; fo.WriteToFileLn(spider_log_file, logstr, 1); return 3; } string table = map_table->second; sql = "INSERT INTO " + table +" SET step_param='?'"; i=1; param.push_back(step_param); if (this->SendSocket(response, domain, path, port)) { return 2; } this->FindContentNode(domain, path, port, mapname, sql, param, &i, response);//抓取节点 while (1) { //依次遍历该节点的额外节点并抓取,直到无则退出 sprintf(subnum_char, "%d", ++subnum_i); map<string, string>::iterator map_child = this->config_map.find(mapname + "_extra_" + subnum_char); if (map_child == this->config_map.end()) { break; } this->FindContentNode(domain, path, port, map_child->first, sql, param, &i, response); } //如果匹配出至少一个内容信息元,插入数据库中 if(param.size() > 1) { insert_rs = this->InsertData(sql, param, conn); } if (insert_rs) { string logstr = "In function FindContent:insert error, url:" + domain + "/" + path ; fo.WriteToFileLn(spider_log_file, logstr, 1); } return insert_rs;}int SpiderCore::FindContentNode(string domain, string path, unsigned int port, string mapname, string &sql, vector<string> ¶m, int *pi, string &response){ //socket 返回的内容 string expression = this->config_map[mapname]; string childmapname(""); //解析正则表达式 char subnum_char[12]={0}; unsigned int subnum_i = 0; map<int, string> match_result; int offset=0; if (Functions::pcre_match(expression, response, &offset, match_result)==0) { string value(""); unsigned int subnum = 0; subnum = atoi(this->config_map[mapname + "_subnum"].c_str()); //依次遍历子表达式节点 while (++subnum_i <= subnum) { sprintf(subnum_char, "%d", subnum_i); childmapname = this->config_map[mapname + "_" + subnum_char]; //判断子表达式是否需要保存 if (Functions::getString(childmapname, 3) == "1") { if (Functions::getString(childmapname, 5) != "1")//如果不需要重新socket { string field = Functions::getString(childmapname, 4);//获取字段名 value = match_result[subnum_i]; if ( (*pi)++ !=0 ) { sql = sql + ","; } sql = sql + field + "='?'"; param.push_back(value); } else//获取在另一个页面的内容信息 { string sub_content(""); path = match_result[subnum_i]; if(this->SendSocket(sub_content, domain, path, port)) { continue; } //查找socket节点 string childmapname_child = Functions::getString(childmapname, 6); map<string, string>::iterator mapname_child_name_it; mapname_child_name_it = this->config_map.find(childmapname_child); if(mapname_child_name_it == this->config_map.end()) { continue; } int sub_offset = 0; map<int, string> sub_match_result; string mapname_child_name = mapname_child_name_it->first; string mapname_child_regex = mapname_child_name_it->second; if(Functions::pcre_match(mapname_child_regex, sub_content, &sub_offset, sub_match_result)==0) { int sub_link_num = atoi(this->config_map[mapname_child_name + "_subnum"].c_str()); int sub_link_i = 0; //检查socket节点子表达式节点是否存在 while(++sub_link_i <= sub_link_num) { sprintf(subnum_char, "%d", sub_link_i); mapname_child_name_it = this->config_map.find(childmapname_child + "_" + subnum_char); if(mapname_child_name_it == this->config_map.end()) { continue; } if(Functions::getString(mapname_child_name_it->second, 3) == "1")//socket节点的子表达式是否保存 { string field = Functions::getString(mapname_child_name_it->second, 4);//socket节点的子表达式对应的字段名 value = sub_match_result[sub_link_i]; sql = sql + "," + field + " ='?'"; param.push_back(value); } }//end while(++sub_link_i < sub_link_num) } int subchildnum_i = 0; int subchild_extra_subnum = 0; int subchild_extra_subnum_i = 0; string subchild_extra_sub_name = ""; string mapname_child_extra_name = ""; //在socket节点中处理额外节点 while(true) { sub_offset = 0; sprintf(subnum_char, "%d", ++subchildnum_i); mapname_child_extra_name = mapname_child_name + "_extra_" + subnum_char; map<string, string>::iterator map_child_extra_it = this->config_map.find(mapname_child_extra_name); if(map_child_extra_it == this->config_map.end()) { break; } if(Functions::pcre_match(map_child_extra_it->second, sub_content, &sub_offset, sub_match_result)==0) { subchild_extra_subnum = atoi(this->config_map[mapname_child_extra_name + "_subnum"].c_str()); subchild_extra_subnum_i=0; while(++subchild_extra_subnum_i <= subchild_extra_subnum) { sprintf(subnum_char, "%d", subchild_extra_subnum_i); subchild_extra_sub_name = mapname_child_extra_name + "_" + subnum_char; if(Functions::getString(this->config_map[subchild_extra_sub_name], 3) == "1") { string field = Functions::getString(this->config_map[subchild_extra_sub_name], 4); value = sub_match_result[subchild_extra_subnum_i]; sql = sql + "," + field + "='?'"; param.push_back(value); } } } } }//end }else{ }//end if(Functions::getString(childmapname, 3) == "1"){ }//end while (++subnum_i <= subnum){ }//end pcre_match return 0;}int SpiderCore::GetFile(string domain, string path, unsigned int port, string mapname, mysqlpp::Connection *conn, string step_param){ vector<string> param; string sql(""); string filename(""); int insert_rs=0; string badimg = this->config_map[mapname+"_badimg"]; //找到插入数据库表名 map<string, string>::iterator map_table = this->config_map.find(mapname + "_table"); if (map_table==this->config_map.end()) { string logstr = "In function GetFile:can not find mapname: " + mapname; fo.WriteToFileLn(spider_log_file, logstr, 1); return 3; } string table = map_table->second; sql = "INSERT INTO " + table +" SET step_param='?'"; param.push_back(step_param); if(path==badimg) { return 1; } HttpSocket hsock( domain, port); hsock.GetFile(path,filename); sql = sql + ",oldfile='?',newfile='?'"; param.push_back(path); param.push_back(filename); insert_rs = this->InsertData(sql, param, conn); if (insert_rs) { string logstr = "In function GetFile:insert error, url:" + domain + "/" + path ; fo.WriteToFileLn(spider_log_file, logstr, 1); } return insert_rs;}void* FunGetMainURLProc(void* lpParameter){ SpiderCore::GetMainURLProc(lpParameter); return 0;}void* FunGetSubURLProc(void* lpParameter){ SpiderCore::GetSubURLProc(lpParameter); return 0;}void* FunGetContentProc(void* lpParameter){ SpiderCore::GetContentProc(lpParameter); return 0;}void* FunGetContentURLProc(void* lpParameter){ SpiderCore::GetContentURLProc(lpParameter); return 0;}/* *Threads Controle below */int SpiderCore::DoCreateThread(void* plp){ int ret=0; geturlParam *lp = (geturlParam *)plp; pthread_t *phandle = (pthread_t *)new pthread_t; ret = pthread_create(phandle, NULL, lp->pFunc, (void *)lp); lp->pthread_handle = phandle; if(lp->thread_type!=1) { thread_number.insert(make_pair(lp->thread_name, lp->thread_number)); } if(ret==0) { cout << "Congratulations, Thread start up succeed!"; } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -