📄 spidercore.cpp
字号:
string domain = sc->domain; string ipaddr = sc->ipaddr; string path = sc->path; unsigned int port = sc->port; string step_param = ""; string parent_id = ""; int max_page = 0; //以上变量同GetMainURLProc中的一致 //获取该节点的双亲节点名称 string mapname_parent = sc->config_map[mapname + "_parent"]; map<string, string>::iterator map_it; map_it = sc->config_map.find(mapname+"_page_maxnum"); if (map_it != sc->config_map.end()) { max_page = atoi(sc->config_map[mapname+"_page_maxnum"].c_str()); } mysqlpp::Connection conn(false); mysqlpp::StoreQueryResult res; mysqlpp::Row row; DataBase db(sc->db_host, sc->db_user, sc->db_pwd, sc->db_name); string tmp_sql = ""; while (1) { if (!conn.connected()) { db.Connectdb(&conn, sc->encode); } mysqlpp::Query query=conn.query(); //query << "SELECT * FROM " << table << " WHERE 0=url_flag & " << t_number << " ORDER BY id ASC" << " LIMIT " << num_per_time; //查找出每个上级链接,已经抓取的链接 query << "SELECT c.id,c.url,c.step_param,d.parent_id,d.num FROM " << table << " c, ( SELECT MAX(a.id) AS maxid, b.id AS parent_id, count(b.id) AS num FROM " << table << " a ,( SELECT id FROM " << table << " WHERE 0=url_flag&"<< t_number << " AND parent=0 ORDER BY id ASC LIMIT "<< num_per_time << ") b WHERE a.parent=b.id or a.id=b.id GROUP BY b.id ) d WHERE c.id=d.maxid"; tmp_sql = query.str(); if (!(res =query.store())) { string logstr="In function FunGetSubURLProc:Thread exit, mapname: " + mapname + ", DataBase error:" + conn.error()+",sql:" + tmp_sql; fo.WriteToFileLn(spider_log_file, logstr, 1); return (1) ;//query failed } vector<string> id; vector<mysqlpp::Row >::size_type rows = res.num_rows(); vector<mysqlpp::Row >::size_type rows_i=0; //是否有数据,并且没有停止信号有没有被设置 if (rows > 0 && !SPIDER_STOP) { if (!sc->config_map[mapname + "_page"].empty()) { while (rows_i < rows && !SPIDER_STOP) { row=res[rows_i]; path =(string)row.at(1); step_param = (string)row.at(2); parent_id = (string)row.at(3); string row_fetch_num = (string)row.at(4); //record the id to update the database id.push_back(parent_id); if (max_page && atoi(row_fetch_num.c_str()) < max_page) { continue; } int geturl_rs=0; map<string, string>::iterator map_it; map_it = sc->config_map.find(mapname); if (map_it==sc->config_map.end()) { string logstr="In function FunGetSubURLProc:can not find mapname: " + mapname; fo.WriteToFileLn(spider_log_file, logstr, 1); return (1) ;//can not find mapname } geturl_rs = sc->FindUrl(domain, path, port, mapname + "_page", mapname , url_flag, parent_id, &conn, true, step_param, max_page); rows_i++; } }else{ //唤醒等待此线程提供数据的线程,并阻塞自己等待上级线程提供数据唤醒自己 string mapname_url_tmp = mapname + "_url"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); pthread_cond_broadcast((pthread_cond_t *)thread_event[mapname_url_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); string map_name_parent_url = mapname_parent + "_content"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[map_name_parent_url]); pthread_cond_wait( (pthread_cond_t *)thread_event[map_name_parent_url], (pthread_mutex_t *)thread_event_lock[map_name_parent_url]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[map_name_parent_url]); } if (!id.empty() && !SPIDER_STOP) { if (!conn.connected()) { db.Connectdb(&conn, sc->encode); } mysqlpp::Query query=conn.query(); query << "UPDATE " << table << " SET url_flag=url_flag | " << t_number << " WHERE id IN( "; for (vector<string>::iterator id_i=id.begin(); id_i != id.end(); id_i ++ ) { if (id_i!=id.begin()) query << ","; query << (*id_i); } query << ")"; query.exec(); } } else//没有数据 { map<string, unsigned int>::iterator it; pthread_mutex_lock(&count_lock); it = thread_number.find(mapname_parent+"_url"); //查看为该线程提供数据的线程有没有退出 if (thread_number.find(mapname_parent+"_url") != thread_number.end() || thread_number.find(mapname_parent+"_content") != thread_number.end()) { pthread_mutex_unlock(&count_lock); string map_name_parent_url = mapname_parent + "_content"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[map_name_parent_url]); pthread_cond_wait( (pthread_cond_t *)thread_event[map_name_parent_url], (pthread_mutex_t *)thread_event_lock[map_name_parent_url]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[map_name_parent_url]); //pthread_cond_signal((pthread_cond_t *)thread_event[map_name_parent_url]); continue; } //线程池中擦除该线程 thread_number.erase(mapname + "_url"); pthread_mutex_unlock(&count_lock); break; } } //唤醒所有等待此线程提供数据的线程 string mapname_url_tmp = mapname + "_url"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); pthread_cond_broadcast((pthread_cond_t *)thread_event[mapname_url_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); string logstr="In function FunGetSubURLProc:Thread exit, mapname: " + mapname + ",sql=" + tmp_sql ; fo.WriteToFileLn(spider_log_file, logstr, 1); cout << "Thread exit, mapname:" << mapname << "_url" << " thread_number:" << t_number << endl; return (0) ;}/* *@param lpParameter 线程参数指针 *说明:获取内容链接 */int SpiderCore::GetContentURLProc(void *lpParameter){ string response(""); string mapname = ((geturlParam *)lpParameter)->mapname; string table = ((geturlParam *)lpParameter)->table; string url_flag = ((geturlParam *)lpParameter)->url_flag; SpiderCore *sc = ((geturlParam *)lpParameter)->sc; int num_per_time = ((geturlParam *)lpParameter)->num_per_time; unsigned int t_number = ((geturlParam *)lpParameter)->thread_number; string domain = sc->domain; string ipaddr = sc->ipaddr; string path = sc->path; unsigned int port = sc->port; string step_param = ""; //变量参考GetMainURLProc中相应的变量 map<string, string>::iterator map_it; map_it = sc->config_map.find(mapname); if (map_it==sc->config_map.end()) return (1) ;//can not find mapname mysqlpp::Connection conn(false); mysqlpp::StoreQueryResult res; mysqlpp::Row row; DataBase db(sc->db_host, sc->db_user, sc->db_pwd, sc->db_name); string tmp_sql = ""; while (1) { if (!conn.connected()) { db.Connectdb(&conn, sc->encode); } mysqlpp::Query query=conn.query(); query << "SELECT * FROM " << table << " WHERE 0=content_flag & " << t_number << " ORDER BY id ASC" << " LIMIT " << num_per_time; tmp_sql = query.str(); if (!(res =query.store())) { string logstr="In function FunGetContentURLProc:Thread exit, mapname: " + mapname + ", DataBase error:" + conn.error(); fo.WriteToFileLn(spider_log_file, logstr, 1); return (1) ;//query failed } vector<string> id; vector<mysqlpp::Row >::size_type rows = res.num_rows(); vector<mysqlpp::Row >::size_type rows_i=0; if (rows > 0 && !SPIDER_STOP) { while (rows_i < rows) { row=res[rows_i]; path = (string)row.at(1); step_param = (string)row.at(2); int geturl_rs=0; geturl_rs = sc->FindContentURL(domain, path, port, mapname , mapname, &conn, step_param); //if(geturl_rs == 0) { string id_str=(string)row.at(0); id.push_back(id_str); } rows_i++; } if (!id.empty()) { if (!conn.connected()) { db.Connectdb(&conn, sc->encode); } mysqlpp::Query query=conn.query(); query << "UPDATE " << table << " SET content_flag=content_flag | " << t_number << " WHERE id IN( "; for (vector<string>::iterator id_i=id.begin(); id_i != id.end(); id_i ++ ) { if (id_i!=id.begin()) query << ","; query << (*id_i); } query << ")"; query.exec(); } string mapname_page_tmp = mapname+"_content"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_page_tmp]); pthread_cond_broadcast((pthread_cond_t *)thread_event[mapname_page_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_page_tmp]); } else { pthread_mutex_lock(&count_lock); if ( thread_number.find(mapname+"_url") != thread_number.end() ) { pthread_mutex_unlock(&count_lock); string mapname_content_tmp = mapname + "_url"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_content_tmp]); pthread_cond_wait( (pthread_cond_t *)thread_event[mapname_content_tmp], (pthread_mutex_t *)thread_event_lock[mapname_content_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_content_tmp]); continue; } thread_number.erase(mapname + "_content"); pthread_mutex_unlock(&count_lock); break; } } //free((geturlParam *)lpParameter); string mapname_page_tmp = mapname+"_content"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_page_tmp]); pthread_cond_broadcast((pthread_cond_t *)thread_event[mapname_page_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_page_tmp]); string logstr="In function FunGetContentURLProc:Thread exit, mapname: " + mapname + ",sql=" + tmp_sql ; fo.WriteToFileLn(spider_log_file, logstr, 1); cout << "Thread exit, mapname:" << mapname << "_content" << " thread_number:" << t_number << endl; return(0) ;}/* @param domain 域名 * @param path 路径 * @param port 端口号 * @param mapname 节点名称 * @param table 要插入的数据库表名 * @param url_flag 插入数据库中需要设置的url标志,通常是改线程对应的编号 * @param parent_id 插入数据的双亲id(数据库中的id) * @param conn 数据库连接标识符 * @param step_param 参数,传递到下一线程的参数 */int SpiderCore::FindContentURL(string domain, string path, unsigned int port, string mapname, string table, mysqlpp::Connection *conn, string step_param){ string content(""); map<string, string>::iterator map_it,map_child; char child_number[12]={0}; //soket response 内容开始位置 int insert_rs=0; //带"?"的sql语句 string sql(""); //sql中"?"的值 vector<string> param; //正则表达式 string mapname_regex(""); mapname_regex = this->config_map[mapname]; //解析正则表达式 if (this->SendSocket(content, domain, path, port)) { return 2; } bool get_param = false; //是否需要获取参数 if(!get_param) { map<int, string> param_match_result; int param_i = 0; char ch_param_i[12] = {0}; string param_mapname = mapname; int param_offset = 0; //依次遍历参数节点,参数节点存在则匹配出来,并插入到数据库中保存起来 while(param_i++ < 20) { sprintf(ch_param_i, "%d", param_i); param_mapname.append("_param_"); param_mapname.append(ch_param_i); map<string, string>::iterator param_it; //查找参数节点 param_it = this->config_map.find(param_mapname); if(param_it==this->config_map.end()) { break;//不存在,则跳出循环,默认已经没有参数节点了 } //匹配出参数 if(Functions::pcre_match(param_it->second.c_str(), content, ¶m_offset, param_match_result)==0) { step_param.append(param_match_result[0]); step_param.append("#$#"); get_param = true; } } } string childmap(""); int offset=0; map<int, string> match_result; while (Functions::pcre_match(mapname_regex, content, &offset, match_result)==0) { unsigned int childnum_i = 0; string url(""); size_t subnum=atoi(this->config_map[mapname + "_subnum"].c_str()); while (++childnum_i <= subnum) { param.clear(); sprintf(child_number, "%d", childnum_i); childmap = this->config_map[mapname + "_" + child_number]; //如果有链接,则插入数据库 if (Functions::getString(childmap, 1)=="1") { table = Functions::getString(childmap, 2); url = match_result[childnum_i]; sql = "INSERT INTO " + table + " SET url='?', step_param='?', content_flag=0"; param.push_back(url); param.push_back(step_param); insert_rs=this->InsertData(sql, param, conn); } } } return 0;}/* *@param lpParameter 线程参数指针 *说明:获取内容链接 */int SpiderCore::GetContentProc(void *lpParameter){ string response(""); string mapname = ((geturlParam *)lpParameter)->mapname;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -