📄 fetchp~1.cc
字号:
// Larbin// Sebastien Ailleret// 15-11-99 -> 10-04-00#include <iostream.h>#include <unistd.h>#include <errno.h>#include <assert.h>#include <time.h>#include <string.h>#include <sys/types.h>#include <unistd.h>#include <sys/time.h>#include <sys/socket.h>#include "types.h"#include "global.h"#include "xutils/url.h"#include "xutils/text.h"#include "xutils/Site.h"#include "xutils/string.h"#include "xutils/connexion.h"#include "xfetcher/file.h"#include "xinterf/output.h"#include "xutils/debug.h"static void checkAll ();static void checkTimeout (time_t now);static void pipeRead (Connexion *conn);static void pipeWrite (Connexion *conn);static void endOfFile (Connexion *conn, FetchError err);/** A kind of select between all opened sockets * This function is launch in a new thread by the main thread */void *startFetchPipe (void *none) { crash("FetchPipe on"); time_t oldtime = time(NULL); time_t now; for (;;) { now = time(NULL); if (now == oldtime) { // Check read and writes checkAll(); } else { // Check timeout oldtime = now; checkTimeout(now); } } return NULL;}/** Check timeout */static void checkTimeout (time_t now) { for (uint i=0; i<global::nb_conn; i++) { Connexion *conn = global::connexions[i]; if (conn->state != EMPTY && now > conn->timeout + conn->pos / 1000) { // This server doesn't answer (time out) endOfFile(conn, timeout); } }}/** Read and write on sockets if possible */static void checkAll () {#ifndef NDEBUG static uint count = 0;#endif // NDEBUG fd_set rfds, wfds; struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 500; FD_ZERO(&rfds); FD_ZERO(&wfds); int n=-1; for (uint i=0; i<global::nb_conn; i++) { int nn = global::connexions[i]->socket; switch (global::connexions[i]->state) { case CONNECTING: FD_SET(nn, &wfds); if (nn > n) { n = nn; } break; case OPEN: FD_SET(nn, &rfds); if (nn > n) { n = nn; } break; } } assert(n < __FD_SETSIZE); statePipe(1); select(n+1, &rfds, &wfds, NULL, &tv); statePipe(count++); for (uint i=0; i<global::nb_conn; i++) { Connexion *conn = global::connexions[i]; switch (conn->state) { case CONNECTING: case WRITE: if (FD_ISSET(conn->socket, &wfds)) { // trying to finish the connection pipeWrite(conn); } break; case OPEN: if (FD_ISSET(conn->socket, &rfds)) { // The socket is open, let's try to read it pipeRead(conn); } break; } }}/** The socket is finally open ! * Make sure it's all right, and write the request */static void pipeWrite (Connexion *conn) { int res; int wrtn, len; unsigned int size = sizeof(int); switch (conn->state) { case CONNECTING: // not connected yet getsockopt(conn->socket, SOL_SOCKET, SO_ERROR, &res, &size); if (res) { // Unable to connect endOfFile(conn, noConnection); return; } // Connection succesfull conn->state = WRITE; // no break case WRITE: // writing the first string len = strlen(conn->request.getString()); wrtn = write(conn->socket, conn->request.getString()+conn->pos, len - conn->pos); if (wrtn >= 0) { conn->pos += wrtn; if (conn->pos < len) { // Some chars of this string are not written yet return; } } else { if (errno == EAGAIN || errno == EINTR) { // little error, come back soon return; } else { // unrecoverable error, forget it endOfFile(conn, earlyStop); return; } } conn->pos = 0; // All the request has been written shutdown(conn->socket, 1); conn->state = OPEN; }}/** Is there something to read on this socket * (which is open) */static void pipeRead (Connexion *conn) { int cont = 1; while (cont) { char c[BUF_SIZE]; int size = read (conn->socket, c, BUF_SIZE); switch (size) { case 0: // End of file (success); cont = 0; endOfFile(conn, success); break; case -1: switch (errno) { case EAGAIN: case EINTR: // Nothing to read now, we'll try again later cont = 0; break; default: // Error : let's forget this page cont = 0; endOfFile(conn, earlyStop); break; } break; default: // Something has been read assert(size > 0); switch (conn->parser->input(c, size)) { case 0: // nothing special conn->pos += size; if (conn->pos > maxPageSize) { // We've read enough... cont = 0; endOfFile(conn, tooBig); } else { // Go on in the loop only if it is usefull cont = (size == BUF_SIZE); } break; case 1: // The parser does not want any more input (errno explains why) cont = 0; endOfFile(conn, (enum FetchError) errno); break; } break; } }}/* What are we doing when it's over with one file ? */static void endOfFile (Connexion *conn, FetchError err) { crash("End of file"); // If you modifie this function, be very careful // some synchro are done without lock, // so the order is very important conn->state = EMPTY; shutdown(conn->socket, 2); close(conn->socket); if (conn->parser->isRobots()) { // That was a robots.txt ((robots *) conn->parser)->parse(err != success); url *u = ((robots *) conn->parser)->toGet(); conn->recycle(); global::siteList[u->hostHashCode() % siteListSize].connectThisUrl(conn, u); } else { // give control back to fetchOpen for this socket url *u = ((html *)conn->parser)->getUrl(); global::siteList[u->hostHashCode() % siteListSize].putInFifo(); // Report the situation conn->pos = err; global::userConns->put(conn); // The connexion is not recycled now // else the crawler might overload user // so we recycle the connexion after the user manage the result // this way, the crawler adapt to the user speed }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -