common.py
来自「Harvestman-最新版本」· Python 代码 · 共 604 行 · 第 1/2 页
PY
604 行
a0=chr(operator.xor(ord(a1), ord(e1))) a0 = "".join((a0, out)) out = a0 return outdef send_url(data, host, port): cfg = objects.config if cfg.urlserver_protocol == 'tcp': return send_url_tcp(data, host, port) elif cfg.urlserver_protocol == 'udp': return send_url_udp(data, host, port) def send_url_tcp(data, host, port): """ Send url to url server """ # Return's server response if connection # succeeded and null string if failed. try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host,port)) sock.sendall(data) response = sock.recv(8192) sock.close() return response except socket.error, e: # print 'url server error:',e pass return ''def send_url_udp(data, host, port): """ Send url to url server """ # Return's server response if connection # succeeded and null string if failed. try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(data,0,(host, port)) response, addr = sock.recvfrom(8192, 0) sock.close() return response except socket.error: pass return ''def ping_urlserver(host, port): cfg = objects.config if cfg.urlserver_protocol == 'tcp': return ping_urlserver_tcp(host, port) elif cfg.urlserver_protocol == 'udp': return ping_urlserver_udp(host, port) def ping_urlserver_tcp(host, port): """ Ping url server to see if it is alive """ # Returns server's response if server is # alive & null string if server is not alive. try: debug('Pinging server at (%s:%d)' % (host, port)) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host,port)) # Send a small packet sock.sendall("ping") response = sock.recv(8192) if response: debug('Url server is alive') sock.close() return response except socket.error: debug('Could not connect to (%s:%d)' % (host, port)) return ''def ping_urlserver_udp(host, port): """ Ping url server to see if it is alive """ # Returns server's response if server is # alive & null string if server is not alive. try: debug('Pinging server at (%s:%d)' % (host, port)) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Send a small packet sock.sendto("ping", 0, (host,port)) response, addr = sock.recvfrom(8192,0) if response: debug('Url server is alive') sock.close() return response except socket.error: debug('Could not connect to (%s:%d)' % (host, port)) return '' def GetTempDir(): """ Return the temporary directory """ # Currently used by hget tmpdir = max(map(lambda x: os.environ.get(x, ''), ['TEMP','TMP','TEMPDIR','TMPDIR'])) if tmpdir=='': # No temp dir env variable if os.name == 'posix': if os.path.isdir('/tmp'): return '/tmp' elif os.path.isdir('/usr/tmp'): return '/usr/tmp' elif os.name == 'nt': profiledir = os.environ.get('USERPROFILE','') if profiledir: return os.path.join(profiledir,'Local Settings','Temp') else: return os.path.abspath(tmpdir)def GetMyTempDir(): """ Return temporary directory for HarvestMan. Also creates it if the directory is not there """ # This is tempdir/HarvestMan tmpdir = os.path.join(GetTempDir(), 'harvestman') if not os.path.isdir(tmpdir): try: os.makedirs(tmpdir) except OSError, e: return '' return tmpdirdef debug(arg, *args): """ Log information, will log if verbosity is equal to DEBUG level """ objects.logger.debug(arg, *args) def info(arg, *args): """ Log information, will log if verbosity is <= INFO level """ objects.logger.info(arg, *args)def extrainfo(arg, *args): """ Log information, will log if verbosity is <= EXTRAINFO level """ objects.logger.extrainfo(arg, *args) def warning(arg, *args): """ Log information, will log if verbosity is <= WARNING level """ objects.logger.warning(arg, *args) def error(arg, *args): """ Log information, will log if verbosity is <= ERROR level """ objects.logger.error(arg, *args) def critical(arg, *args): """ Log information, will log if verbosity is <= CRITICAL level """ objects.logger.critical(arg, *args) def logconsole(arg, *args): """ Log directly to sys.stdout using print """ # Setting verbosity to 5 will print maximum information # plus maximum debugging information. objects.logger.logconsole(arg, *args) def logtraceback(console=False): """ Log the most recent exception traceback. By default the trace goes only to the log file """ s = cStringIO.StringIO() traceback.print_tb(sys.exc_info()[-1], None, s) if not console: objects.logger.disableConsoleLogging() # Log to logger objects.logger.debug(s.getvalue()) # Enable console logging again objects.logger.enableConsoleLogging() def hexit(arg): """ Exit wrapper for HarvestMan """ print_traceback() sys.exit(arg) def print_traceback(): print 'Printing error traceback for debugging...' traceback.print_tb(sys.exc_info()[-1], None, sys.stdout)# Effbot's simple_eval function which is a safe replacement# for Python's eval for tuples...def atom(next, token): if token[1] == "(": out = [] token = next() while token[1] != ")": out.append(atom(next, token)) token = next() if token[1] == ",": token = next() return tuple(out) elif token[0] is tokenize.STRING: return token[1][1:-1].decode("string-escape") elif token[0] is tokenize.NUMBER: try: return int(token[1], 0) except ValueError: return float(token[1]) raise SyntaxError("malformed expression (%s)" % token[1])def simple_eval(source): src = cStringIO.StringIO(source).readline src = tokenize.generate_tokens(src) res = atom(src.next, src.next()) if src.next()[0] is not tokenize.ENDMARKER: raise SyntaxError("bogus data after expression") return resdef set_aliases(path=None): if path != None: sys.path.append(path) import config SetAlias(config.HarvestManStateObject()) import datamgr import rules import connector import urlqueue import logger import event SetAlias(logger.HarvestManLogger()) # Data manager object dmgr = datamgr.HarvestManDataManager() dmgr.initialize() SetAlias(dmgr) # Rules checker object ruleschecker = rules.HarvestManRulesChecker() SetAlias(ruleschecker) # Connector manager object connmgr = connector.HarvestManNetworkConnector() SetAlias(connmgr) # Connector factory conn_factory = connector.HarvestManUrlConnectorFactory(objects.config.connections) SetAlias(conn_factory) queuemgr = urlqueue.HarvestManCrawlerQueue() SetAlias(queuemgr) SetAlias(event.HarvestManEvent()) def test_sgmlop(): """ Test whether sgmlop is available and working """ html="""\ <html>< title>Test sgmlop</title> <body> <p>This is a pargraph</p> <img src="img.jpg"/> <a href="http://www.python.org'>Python</a> </body> </html> """ # Return True for working and False for not-working # or not-present... try: import sgmlop class DummyHandler(object): links = [] def finish_starttag(self, tag, attrs): self.links.append(tag) pass parser = sgmlop.SGMLParser() parser.register(DummyHandler()) parser.feed(html) # Check if we got all the links... if len(DummyHandler.links)==4: return True else: return False except ImportError, e: return Falseif __name__=="__main__": pass
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?