common.py

来自「Harvestman-最新版本」· Python 代码 · 共 604 行 · 第 1/2 页

PY
604
字号
    a0=chr(operator.xor(ord(a1), ord(e1)))    a0 = "".join((a0, out))    out = a0    return outdef send_url(data, host, port):        cfg = objects.config    if cfg.urlserver_protocol == 'tcp':        return send_url_tcp(data, host, port)    elif cfg.urlserver_protocol == 'udp':        return send_url_udp(data, host, port)    def send_url_tcp(data, host, port):    """ Send url to url server """    # Return's server response if connection    # succeeded and null string if failed.    try:        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.connect((host,port))        sock.sendall(data)        response = sock.recv(8192)        sock.close()        return response    except socket.error, e:        # print 'url server error:',e        pass    return ''def send_url_udp(data, host, port):    """ Send url to url server """    # Return's server response if connection    # succeeded and null string if failed.    try:        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)        sock.sendto(data,0,(host, port))        response, addr = sock.recvfrom(8192, 0)        sock.close()        return response    except socket.error:        pass    return ''def ping_urlserver(host, port):        cfg = objects.config        if cfg.urlserver_protocol == 'tcp':        return ping_urlserver_tcp(host, port)    elif cfg.urlserver_protocol == 'udp':        return ping_urlserver_udp(host, port)        def ping_urlserver_tcp(host, port):    """ Ping url server to see if it is alive """    # Returns server's response if server is    # alive & null string if server is not alive.    try:        debug('Pinging server at (%s:%d)' % (host, port))        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.connect((host,port))        # Send a small packet        sock.sendall("ping")        response = sock.recv(8192)        if response:            debug('Url server is alive')        sock.close()        return response    except socket.error:        debug('Could not connect to (%s:%d)' % (host, port))        return ''def ping_urlserver_udp(host, port):    """ Ping url server to see if it is alive """    # Returns server's response if server is    # alive & null string if server is not alive.    try:        debug('Pinging server at (%s:%d)' % (host, port))        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)        # Send a small packet        sock.sendto("ping", 0, (host,port))        response, addr = sock.recvfrom(8192,0)        if response:            debug('Url server is alive')        sock.close()        return response    except socket.error:        debug('Could not connect to (%s:%d)' % (host, port))        return ''    def GetTempDir():    """ Return the temporary directory """    # Currently used by hget    tmpdir = max(map(lambda x: os.environ.get(x, ''), ['TEMP','TMP','TEMPDIR','TMPDIR']))    if tmpdir=='':        # No temp dir env variable        if os.name == 'posix':            if os.path.isdir('/tmp'):                return '/tmp'            elif os.path.isdir('/usr/tmp'):                return '/usr/tmp'        elif os.name == 'nt':            profiledir = os.environ.get('USERPROFILE','')            if profiledir:                return os.path.join(profiledir,'Local Settings','Temp')    else:        return os.path.abspath(tmpdir)def GetMyTempDir():    """ Return temporary directory for HarvestMan. Also creates    it if the directory is not there """    # This is tempdir/HarvestMan    tmpdir = os.path.join(GetTempDir(), 'harvestman')    if not os.path.isdir(tmpdir):        try:            os.makedirs(tmpdir)        except OSError, e:            return ''    return tmpdirdef debug(arg, *args):    """ Log information, will log if verbosity is equal to DEBUG level """    objects.logger.debug(arg, *args)    def info(arg, *args):    """ Log information, will log if verbosity is <= INFO level """    objects.logger.info(arg, *args)def extrainfo(arg, *args):    """ Log information, will log if verbosity is <= EXTRAINFO level """    objects.logger.extrainfo(arg, *args)    def warning(arg, *args):    """ Log information, will log if verbosity is <= WARNING level """    objects.logger.warning(arg, *args)        def error(arg, *args):    """ Log information, will log if verbosity is <= ERROR level """    objects.logger.error(arg, *args)        def critical(arg, *args):    """ Log information, will log if verbosity is <= CRITICAL level """    objects.logger.critical(arg, *args)        def logconsole(arg, *args):    """ Log directly to sys.stdout using print """    # Setting verbosity to 5 will print maximum information    # plus maximum debugging information.    objects.logger.logconsole(arg, *args)        def logtraceback(console=False):    """ Log the most recent exception traceback. By default    the trace goes only to the log file """    s = cStringIO.StringIO()    traceback.print_tb(sys.exc_info()[-1], None, s)    if not console:        objects.logger.disableConsoleLogging()    # Log to logger    objects.logger.debug(s.getvalue())    # Enable console logging again    objects.logger.enableConsoleLogging()    def hexit(arg):    """ Exit wrapper for HarvestMan """    print_traceback()    sys.exit(arg)    def print_traceback():    print 'Printing error traceback for debugging...'    traceback.print_tb(sys.exc_info()[-1], None, sys.stdout)# Effbot's simple_eval function which is a safe replacement# for Python's eval for tuples...def atom(next, token):    if token[1] == "(":        out = []        token = next()        while token[1] != ")":            out.append(atom(next, token))            token = next()            if token[1] == ",":                token = next()        return tuple(out)    elif token[0] is tokenize.STRING:        return token[1][1:-1].decode("string-escape")    elif token[0] is tokenize.NUMBER:        try:            return int(token[1], 0)        except ValueError:            return float(token[1])    raise SyntaxError("malformed expression (%s)" % token[1])def simple_eval(source):    src = cStringIO.StringIO(source).readline    src = tokenize.generate_tokens(src)    res = atom(src.next, src.next())    if src.next()[0] is not tokenize.ENDMARKER:        raise SyntaxError("bogus data after expression")    return resdef set_aliases(path=None):    if path != None:        sys.path.append(path)            import config    SetAlias(config.HarvestManStateObject())    import datamgr    import rules    import connector    import urlqueue    import logger    import event    SetAlias(logger.HarvestManLogger())        # Data manager object    dmgr = datamgr.HarvestManDataManager()    dmgr.initialize()    SetAlias(dmgr)        # Rules checker object    ruleschecker = rules.HarvestManRulesChecker()    SetAlias(ruleschecker)        # Connector manager object    connmgr = connector.HarvestManNetworkConnector()    SetAlias(connmgr)        # Connector factory    conn_factory = connector.HarvestManUrlConnectorFactory(objects.config.connections)    SetAlias(conn_factory)        queuemgr = urlqueue.HarvestManCrawlerQueue()    SetAlias(queuemgr)        SetAlias(event.HarvestManEvent())        def test_sgmlop():    """ Test whether sgmlop is available and working """    html="""\    <html><    title>Test sgmlop</title>    <body>    <p>This is a pargraph</p>    <img src="img.jpg"/>    <a href="http://www.python.org'>Python</a>    </body>    </html>    """        # Return True for working and False for not-working    # or not-present...    try:        import sgmlop                class DummyHandler(object):            links = []            def finish_starttag(self, tag, attrs):                self.links.append(tag)                pass                    parser = sgmlop.SGMLParser()        parser.register(DummyHandler())        parser.feed(html)        # Check if we got all the links...        if len(DummyHandler.links)==4:            return True        else:            return False            except ImportError, e:        return Falseif __name__=="__main__":    pass    

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?