📄 spidercore.cpp
字号:
fo.WriteToFileLn(spider_log_file, logstr, 1); return 100;//can not find map_name } int subnum_i=0; //检查额外节点的子表达式节点是否存在 while(++subnum_i <= atoi(it_subnum->second.c_str())) { sprintf(num,"%d",subnum_i); string extra_mapname_sub = extra_mapname; extra_mapname_sub.append("_"); extra_mapname_sub.append(num); if(this->config_map.find(extra_mapname_sub) == this->config_map.end()) { string logstr="In function analytics_map_extra:can not find map node " + extra_mapname_sub + " in config file(spider.ini)!"; cout << logstr << endl; fo.WriteToFileLn(spider_log_file, logstr, 1); return 100;//can not find map_name } } } return 0;}//对sql语句把问号替换掉void SpiderCore::addslashes(string &sql, vector<string> vec, mysqlpp::Query &query){ vector<string>::iterator it; string::size_type pos=0; string back = sql; for (it=vec.begin(); it != vec.end(); it++) { pos = back.find("?"); query << back.substr(0, pos); back = back.substr(pos+1); query << mysqlpp::escape << *it; } query << back;}//插入数据库操作int SpiderCore::InsertData(string sql, vector<string> vec, mysqlpp::Connection *conn){ if(!conn->connected()) { DataBase dbase(this->db_host, this->db_user, this->db_pwd, this->db_name); dbase.Connectdb(conn, this->encode); } int insert_rs=0; mysqlpp::Query query=conn->query(); SpiderCore::addslashes(sql, vec, query); if (!query.exec()) { string logstr="In function InsertData: DataBase error, sql:"+query.str()+",error_str:"+conn->error(); fo.WriteToFileLn(spider_insert_error_log_file, logstr, 1); insert_rs = 1; } return insert_rs;}//发送socket请求int SpiderCore::SendSocket(string &content, string domain, string path, unsigned int port){ int socket_time=0; // pthread_mutex_lock(&count_lock);; //检查路径是否为空,为空直接退出 if(path.empty()) { return 2; } //如果出问题请求结果相应码不为200,重复请求,最多3次 while (socket_time++ < 3) { HttpSocket hsock( domain, port); hsock.Request(content, path); /* if(hsock.encode.substr(0,2) == "GB"){ string response = content; content = ""; size_t start=0; while(start < response.size()){ string a = response.substr(start, 60000); start += 60000; CodeConverter cc("GB2312", "UTF-8"); cc.convert(a); content.append(a); } }*/ //wsock.GetHeaderAttribute(); //fo.WriteToFile("b.txt", content, 0); if (hsock.reponse_number != "200") { string logstr="In function FindUrl:socket error, domain:" + domain + ",path:" + path + ", response_number:" + hsock.reponse_number + ",head_info" + hsock.header_info; fo.WriteToFileLn(spider_log_file, logstr, 1); // pthread_mutex_unlock(&mutex); sleep(5); continue; } break; } // pthread_mutex_unlock(&mutex); if (socket_time >= 3) { return 1; } return 0;}//发送http请求获取页面中的链接/* @param domain 域名 * @param path 路径 * @param port 端口号 * @param mapname 节点名称 * @param table 要插入的数据库表名 * @param url_flag 插入数据库中需要设置的url标志,通常是改线程对应的编号 * @param parent_id 插入数据的双亲id(数据库中的id) * @param conn 数据库连接标识符 * @param isfirst 是否是第一次,如果不是第一次说明数据库中已经有相应的数据,可能是spider"挂了"后重新启动 * @param step_param 参数,传递到下一线程的参数 */int SpiderCore::FindUrl( string domain, string path, unsigned int port, string mapname, string table, string url_flag, string parent_id, mysqlpp::Connection *conn, bool isfirst, string step_param, int max_page){ string content(""); map<string, string>::iterator it, it_sub; vector<string> param; it = this->config_map.find(mapname); if (it==this->config_map.end()) { string logstr="In function FindUrl:can not find map node " + mapname ; fo.WriteToFileLn(spider_log_file, logstr, 1); return 1; } int insert_rs=0; string sql(""); string mapname_value(""); string mapname_regex(""); int pageNum=1; int samePage = 0; string spage(""); mapname_regex = it->second; //检查节点的连接节点是否存在 it = this->config_map.find(mapname + "_path"); if (it == this->config_map.end()) { string logstr="In function FindUrl:can not find map node " + mapname + "_path"; fo.WriteToFileLn(spider_log_file, logstr, 1); return 2; } //获取连接信息是正则中的第几个子表达式 int page_path = atoi(it->second.c_str()); //检查节点的页码节点是否存在 it = this->config_map.find(mapname + "_number"); if (it == this->config_map.end()) { string logstr="In function FindUrl:can not find map node " + mapname + "_number"; fo.WriteToFileLn(spider_log_file, logstr, 1); return 3; } //获取页码是正则中的第几个子表达式 int page_number = atoi(it->second.c_str()); int fetch_data_num=0; string max_page_path = path; string path_tmp = path; int num=0; //是否是第一次的抓取 if (isfirst) { //sint pcre_pos[10] = {-1}; map<int, string> first_match_result; int first_offset = 0; //不是第一次抓取,获取已经抓取的最大页码 if(Functions::pcre_match(mapname_regex, path, &first_offset, first_match_result)==0) { pageNum=atoi(first_match_result[page_number].c_str()); isfirst = false; } } while (true) { //检查是否有停止标记 if(SPIDER_STOP) { break; } path_tmp = path; //发送socket请求 if (this->SendSocket(content, domain, path, port)) { break; } samePage = 0; //匹配文本的起始位置 int now_page=0; int offset=0; map<int, string> match_result; //循环进行正则表达式的匹配 while (Functions::pcre_match(mapname_regex, content, &offset, match_result)==0) { //找出页数 spage = match_result[page_number]; param.clear(); //记录最大的页数 now_page = atoi(spage.c_str()); //如果抓取的页码(now_page)大于当前最大页码,并且小于当前最大页面+2(防止直接抓到末页中去,而中间的却没有被抓取),插入数据库中,保存连接 if (pageNum < now_page && pageNum + 2 >= now_page) { path = match_result[page_path]; pageNum = now_page; sql = "INSERT INTO " + table + " SET url='?', step_param='?', url_flag=" + url_flag + ", parent='?'" ; mapname_value = match_result[1]; param.push_back(mapname_value); param.push_back(step_param); param.push_back(parent_id); insert_rs=this->InsertData(sql, param, conn); if (insert_rs == 0) { fetch_data_num++;//插入成功,记录增加 } } //如果两次抓相同页面就停止 } //每新插入20条记录,发送信息给等待此数据的线程,唤醒它们 if(fetch_data_num - num >= 20) { num = fetch_data_num;//更新插入记录 string mapname_url_tmp = table + "_url";//此线程的名称 pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); pthread_cond_broadcast((pthread_cond_t *)thread_event[mapname_url_tmp]);//广播线程信息 pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); } //抓取的同一个页面说明已经抓到末页,结束抓取链接 if (path == path_tmp) { string logstr="In function FindUrl:fetch the same page: " + domain + " " + path + " mapname:" + mapname; fo.WriteToFileLn(spider_log_file, logstr, 1); break; } if(max_page && num >= max_page) { break; } } return num;}/* *@param lpParameter 线程参数指针 *说明:获取首节点的导航链接 */int SpiderCore::GetMainURLProc(void *lpParameter){ string response(""); //节点名称 string mapname = ((geturlParam *)lpParameter)->mapname; //数据源的表名 string table = ((geturlParam *)lpParameter)->table; //链接的标志,通常为线程的编号 string url_flag = ((geturlParam *)lpParameter)->url_flag; //spider引擎对象指针 SpiderCore *sc = ((geturlParam *)lpParameter)->sc; //线程的标号 unsigned int t_number = ((geturlParam *)lpParameter)->thread_number; //抓取对象的域名 string domain = sc->domain; //抓取对象的ip string ipaddr = sc->ipaddr; string path = sc->path; unsigned int port = sc->port; string step_param = ""; mysqlpp::Connection conn(false); mysqlpp::Row row; DataBase db(sc->db_host, sc->db_user, sc->db_pwd, sc->db_name); db.Connectdb(&conn, sc->encode); if (!conn.connected()) { string logstr="In function FunGetMainURLProc:Thread exit, mapname: " + mapname; fo.WriteToFileLn(spider_log_file, logstr, 1); return(1); } //先从数据库中读取出数据,然后分析根据数据库数据,继续抓取,如果数据库中没有信息则从配置文件中的路径开始抓取 mysqlpp::Query query=conn.query(); query << "SELECT * FROM " << table << " ORDER BY id DESC" << " LIMIT 1"; string tmp_sql = query.str(); mysqlpp::StoreQueryResult res; if (!(res =query.store())) { string logstr="In function FunGetMainURLProc:Thread exit, mapname: " + mapname + ", DataBase error:" + conn.error(); fo.WriteToFileLn(spider_log_file, logstr, 1); return(1) ;//query failed } //如果数据库中有数据则从数据库中开始抓取 if (res.num_rows() > 0) { path = (string)res[0][1]; step_param = (string)res[0][2]; } else { //数据库中没有数据,把配置文件中的path插入到数据库中 vector<string> param; string sql = "INSERT INTO " + table + " SET url='?', url_flag=" + url_flag; param.push_back(path); int insert_rs=0; insert_rs=sc->InsertData(sql, param, &conn); } //如果没有配置导航链接,则不启动抓去导航链接 if(!sc->config_map[mapname + "_page"].empty()) { sc->FindUrl(domain, path, port, mapname + "_page", table, "0", "1", &conn, true, step_param, 0); } //将线程编号池中的该线程除去 pthread_mutex_lock(&count_lock); for (map<string, unsigned int>::iterator it=thread_number.begin(); it != thread_number.end(); it++) { if (it->second==t_number) { thread_number.erase(it); break; } } //广播唤醒等待此线程提供数据的阻塞线程 pthread_mutex_unlock(&count_lock); string mapname_url_tmp = mapname + "_url"; pthread_mutex_lock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); pthread_cond_broadcast((pthread_cond_t *)thread_event[mapname_url_tmp]); pthread_mutex_unlock((pthread_mutex_t *)thread_event_lock[mapname_url_tmp]); //free((geturlParam *)lpParameter); string logstr="In function FunGetMainURLProc:Thread exit, mapname: " + mapname + ",sql=" + tmp_sql; fo.WriteToFileLn(spider_log_file, logstr, 1); cout << "Thread exit, mapname:" << mapname << "_url" << " thread_number:" << t_number << endl; return(1) ;}/* *@param lpParameter 线程参数指针 *说明:获取非首节点的导航链接 */int SpiderCore::GetSubURLProc(void *lpParameter){ string response(""); string mapname = ((geturlParam *)lpParameter)->mapname; string table = ((geturlParam *)lpParameter)->table; string url_flag = ((geturlParam *)lpParameter)->url_flag; SpiderCore *sc = ((geturlParam *)lpParameter)->sc; int num_per_time = ((geturlParam *)lpParameter)->num_per_time; unsigned int t_number = ((geturlParam *)lpParameter)->thread_number;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -