⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 collect_and_upload_logs.py

📁 Boost provides free peer-reviewed portable C++ source libraries. We emphasize libraries that work
💻 PY
📖 第 1 页 / 共 2 页
字号:
# Copyright (c) MetaCommunications, Inc. 2003-2007## Distributed under the Boost Software License, Version 1.0. # (See accompanying file LICENSE_1_0.txt or copy at # http://www.boost.org/LICENSE_1_0.txt)import xml.sax.saxutilsimport zipfileimport ftplibimport timeimport statimport xml.dom.minidomimport xmlrpclibimport httplibimport os.pathimport stringimport sysdef process_xml_file( input_file, output_file ):    utils.log( 'Processing test log "%s"' % input_file )        f = open( input_file, 'r' )    xml = f.readlines()    f.close()        for i in range( 0, len(xml)):        xml[i] = string.translate( xml[i], utils.char_translation_table )    output_file.writelines( xml )def process_test_log_files( output_file, dir, names ):    for file in names:        if os.path.basename( file ) == 'test_log.xml':            process_xml_file( os.path.join( dir, file ), output_file )def collect_test_logs( input_dirs, test_results_writer ):    __log__ = 1    utils.log( 'Collecting test logs ...' )    for input_dir in input_dirs:        utils.log( 'Walking directory "%s" ...' % input_dir )        os.path.walk( input_dir, process_test_log_files, test_results_writer )dart_status_from_result = {    'succeed': 'passed',    'fail': 'failed',    'note': 'passed',    '': 'notrun'    }dart_project = {    'trunk': 'Boost_HEAD',    '': 'Boost_HEAD'    }dart_track = {    'full': 'Nightly',    'incremental': 'Continuous',    '': 'Experimental'    }ascii_only_table = ""for i in range(0,256):    if chr(i) == '\n' or chr(i) == '\r':        ascii_only_table += chr(i)    elif i < 32 or i >= 0x80:        ascii_only_table += '?'    else:        ascii_only_table += chr(i)class xmlrpcProxyTransport(xmlrpclib.Transport):    def __init__(self, proxy):        self.proxy = proxy    def make_connection(self, host):        self.realhost = host        return httplib.HTTP(self.proxy)    def send_request(self, connection, handler, request_body):        connection.putrequest('POST','http://%s%s' % (self.realhost,handler))    def send_host(self, connection, host):        connection.putheader('Host',self.realhost)    def publish_test_logs(    input_dirs,    runner_id, tag, platform, comment_file, timestamp, user, source, run_type,    dart_server = None,    http_proxy = None,    **unused    ):    __log__ = 1    utils.log( 'Publishing test logs ...' )    dart_rpc = None    dart_dom = {}        def _publish_test_log_files_ ( unused, dir, names ):        for file in names:            if os.path.basename( file ) == 'test_log.xml':                utils.log( 'Publishing test log "%s"' % os.path.join(dir,file) )                if dart_server:                    log_xml = open(os.path.join(dir,file)).read().translate(ascii_only_table)                    #~ utils.log( '--- XML:\n%s' % log_xml)                    #~ It seems possible to get an empty XML result file :-(                    if log_xml == "": continue                    log_dom = xml.dom.minidom.parseString(log_xml)                    test = {                        'library': log_dom.documentElement.getAttribute('library'),                        'test-name': log_dom.documentElement.getAttribute('test-name'),                        'toolset': log_dom.documentElement.getAttribute('toolset')                        }                    if not test['test-name'] or test['test-name'] == '':                        test['test-name'] = 'unknown'                    if not test['toolset'] or test['toolset'] == '':                        test['toolset'] = 'unknown'                    if not dart_dom.has_key(test['toolset']):                        dart_dom[test['toolset']] = xml.dom.minidom.parseString('''<?xml version="1.0" encoding="UTF-8"?><DartSubmission version="2.0" createdby="collect_and_upload_logs.py">    <Site>%(site)s</Site>    <BuildName>%(buildname)s</BuildName>    <Track>%(track)s</Track>    <DateTimeStamp>%(datetimestamp)s</DateTimeStamp></DartSubmission>'''                         % {                                'site': runner_id,                                'buildname': "%s -- %s (%s)" % (platform,test['toolset'],run_type),                                'track': dart_track[run_type],                                'datetimestamp' : timestamp                            } )                    submission_dom = dart_dom[test['toolset']]                    for node in log_dom.documentElement.childNodes:                        if node.nodeType == xml.dom.Node.ELEMENT_NODE:                            if node.firstChild:                                log_data = xml.sax.saxutils.escape(node.firstChild.data)                            else:                                log_data = ''                            test_dom = xml.dom.minidom.parseString('''<?xml version="1.0" encoding="UTF-8"?><Test>    <Name>.Test.Boost.%(tag)s.%(library)s.%(test-name)s.%(type)s</Name>    <Status>%(result)s</Status>    <Measurement name="Toolset" type="text/string">%(toolset)s</Measurement>    <Measurement name="Timestamp" type="text/string">%(timestamp)s</Measurement>    <Measurement name="Log" type="text/text">%(log)s</Measurement></Test>    '''                         % {                                    'tag': tag,                                    'library': test['library'],                                    'test-name': test['test-name'],                                    'toolset': test['toolset'],                                    'type': node.nodeName,                                    'result': dart_status_from_result[node.getAttribute('result')],                                    'timestamp': node.getAttribute('timestamp'),                                    'log': log_data                                })                            submission_dom.documentElement.appendChild(                                test_dom.documentElement.cloneNode(1) )        for input_dir in input_dirs:        utils.log( 'Walking directory "%s" ...' % input_dir )        os.path.walk( input_dir, _publish_test_log_files_, None )    if dart_server:        try:            rpc_transport = None            if http_proxy:                rpc_transport = xmlrpcProxyTransport(http_proxy)            dart_rpc = xmlrpclib.ServerProxy(                'http://%s/%s/Command/' % (dart_server,dart_project[tag]),                rpc_transport )            for dom in dart_dom.values():                #~ utils.log('Dart XML: %s' % dom.toxml('utf-8'))                dart_rpc.Submit.put(xmlrpclib.Binary(dom.toxml('utf-8')))        except Exception, e:            utils.log('Dart server error: %s' % e)def upload_to_ftp( tag, results_file, ftp_proxy, debug_level ):    ftp_site = 'fx.meta-comm.com'    site_path = '/boost-regression'    utils.log( 'Uploading log archive "%s" to ftp://%s%s/%s' % ( results_file, ftp_site, site_path, tag ) )        if not ftp_proxy:        ftp = ftplib.FTP( ftp_site )        ftp.set_debuglevel( debug_level )        ftp.login()    else:        utils.log( '    Connecting through FTP proxy server "%s"' % ftp_proxy )        ftp = ftplib.FTP( ftp_proxy )        ftp.set_debuglevel( debug_level )        ftp.set_pasv (0) # turn off PASV mode        ftp.login( 'anonymous@%s' % ftp_site, 'anonymous@' )    ftp.cwd( site_path )    try:        ftp.cwd( tag )    except ftplib.error_perm:        for dir in tag.split( '/' ):            ftp.mkd( dir )            ftp.cwd( dir )    f = open( results_file, 'rb' )    ftp.storbinary( 'STOR %s' % os.path.basename( results_file ), f )    ftp.quit()def copy_comments( results_xml, comment_file ):    results_xml.startElement( 'comment', {} )    if os.path.exists( comment_file ):        utils.log( 'Reading comments file "%s"...' % comment_file )        f = open( comment_file, 'r' )        try:            results_xml.characters( f.read() )        finally:            f.close()        else:        utils.log( 'Warning: comment file "%s" is not found.' % comment_file )     results_xml.endElement( 'comment' )def compress_file( file_path, archive_path ):    utils.log( 'Compressing "%s"...' % file_path )    try:        z = zipfile.ZipFile( archive_path, 'w', zipfile.ZIP_DEFLATED )        z.write( file_path, os.path.basename( file_path ) )        z.close()        utils.log( 'Done writing "%s".'% archive_path )    except Exception, msg:        utils.log( 'Warning: Compressing falied (%s)' % msg )        utils.log( '         Trying to compress using a platform-specific tool...' )        try: import zip_cmd        except ImportError:            script_dir = os.path.dirname( os.path.abspath( sys.argv[0] ) )            utils.log( 'Could not find \'zip_cmd\' module in the script directory (%s).' % script_dir )            raise Exception( 'Compressing failed!' )        else:            if os.path.exists( archive_path ):                os.unlink( archive_path )                utils.log( 'Removing stale "%s".' % archive_path )                            zip_cmd.main( file_path, archive_path )            utils.log( 'Done compressing "%s".' % archive_path )def read_timestamp( file ):    if not os.path.exists( file ):        result = time.gmtime()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -