📄 boost_wide_report.py
字号:
# Copyright (c) MetaCommunications, Inc. 2003-2007## Distributed under the Boost Software License, Version 1.0. # (See accompanying file LICENSE_1_0.txt or copy at # http://www.boost.org/LICENSE_1_0.txt)import shutilimport codecsimport xml.sax.handlerimport globimport reimport os.pathimport osimport stringimport timeimport sysimport ftplibimport utilsimport runnerreport_types = [ 'us', 'ds', 'ud', 'dd', 'l', 'p', 'i', 'n', 'ddr', 'dsr', 'udr', 'usr' ]if __name__ == '__main__': run_dir = os.path.abspath( os.path.dirname( sys.argv[ 0 ] ) )else: run_dir = os.path.abspath( os.path.dirname( sys.modules[ __name__ ].__file__ ) )def map_path( path ): return os.path.join( run_dir, path ) def xsl_path( xsl_file_name ): return map_path( os.path.join( 'xsl/v2', xsl_file_name ) )class file_info: def __init__( self, file_name, file_size, file_date ): self.name = file_name self.size = file_size self.date = file_date def __repr__( self ): return "name: %s, size: %s, date %s" % ( self.name, self.size, self.date )## Find the mod time from unix format directory listing line#def get_date( words ): date = words[ 5: -1 ] t = time.localtime() month_names = [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" ] year = time.localtime()[0] # If year is not secified is it the current year month = month_names.index( date[0] ) + 1 day = int( date[1] ) hours = 0 minutes = 0 if date[2].find( ":" ) != -1: ( hours, minutes ) = [ int(x) for x in date[2].split( ":" ) ] else: # there is no way to get seconds for not current year dates year = int( date[2] ) return ( year, month, day, hours, minutes, 0, 0, 0, 0 )def list_ftp( f ): # f is an ftp object utils.log( "listing source content" ) lines = [] # 1. get all lines f.dir( lambda x: lines.append( x ) ) # 2. split lines into words word_lines = [ x.split( None, 8 ) for x in lines ] # we don't need directories result = [ file_info( l[-1], None, get_date( l ) ) for l in word_lines if l[0][0] != "d" ] for f in result: utils.log( " %s" % f ) return resultdef list_dir( dir ): utils.log( "listing destination content %s" % dir ) result = [] for file_path in glob.glob( os.path.join( dir, "*.zip" ) ): if os.path.isfile( file_path ): mod_time = time.localtime( os.path.getmtime( file_path ) ) mod_time = ( mod_time[0], mod_time[1], mod_time[2], mod_time[3], mod_time[4], mod_time[5], 0, 0, mod_time[8] ) # no size (for now) result.append( file_info( os.path.basename( file_path ), None, mod_time ) ) for fi in result: utils.log( " %s" % fi ) return resultdef find_by_name( d, name ): for dd in d: if dd.name == name: return dd return Nonedef diff( source_dir_content, destination_dir_content ): utils.log( "Finding updated files" ) result = ( [], [] ) # ( changed_files, obsolete_files ) for source_file in source_dir_content: found = find_by_name( destination_dir_content, source_file.name ) if found is None: result[0].append( source_file.name ) elif time.mktime( found.date ) != time.mktime( source_file.date ): result[0].append( source_file.name ) else: pass for destination_file in destination_dir_content: found = find_by_name( source_dir_content, destination_file.name ) if found is None: result[1].append( destination_file.name ) utils.log( " Updated files:" ) for f in result[0]: utils.log( " %s" % f ) utils.log( " Obsolete files:" ) for f in result[1]: utils.log( " %s" % f ) return result def _modtime_timestamp( file ): return os.stat( file ).st_mtime root_paths = []def shorten( file_path ): root_paths.sort( lambda x, y: cmp( len(y ), len( x ) ) ) for root in root_paths: if file_path.lower().startswith( root.lower() ): return file_path[ len( root ): ].replace( "\\", "/" ) return file_path.replace( "\\", "/" )class action: def __init__( self, file_path ): self.file_path_ = file_path self.relevant_paths_ = [ self.file_path_ ] self.boost_paths_ = [] self.dependencies_ = [] self.other_results_ = [] def run( self ): utils.log( "%s: run" % shorten( self.file_path_ ) ) __log__ = 2 for dependency in self.dependencies_: if not os.path.exists( dependency ): utils.log( "%s doesn't exists, removing target" % shorten( dependency ) ) self.clean() return if not os.path.exists( self.file_path_ ): utils.log( "target doesn't exists, building" ) self.update() return dst_timestamp = _modtime_timestamp( self.file_path_ ) utils.log( " target: %s [%s]" % ( shorten( self.file_path_ ), dst_timestamp ) ) needs_updating = 0 utils.log( " dependencies:" ) for dependency in self.dependencies_: dm = _modtime_timestamp( dependency ) update_mark = "" if dm > dst_timestamp: needs_updating = 1 utils.log( ' %s [%s] %s' % ( shorten( dependency ), dm, update_mark ) ) if needs_updating: utils.log( "target needs updating, rebuilding" ) self.update() return else: utils.log( "target is up-to-date" ) def clean( self ): to_unlink = self.other_results_ + [ self.file_path_ ] for result in to_unlink: utils.log( ' Deleting obsolete "%s"' % shorten( result ) ) if os.path.exists( result ): os.unlink( result ) class merge_xml_action( action ): def __init__( self, source, destination, expected_results_file, failures_markup_file, tag ): action.__init__( self, destination ) self.source_ = source self.destination_ = destination self.tag_ = tag self.expected_results_file_ = expected_results_file self.failures_markup_file_ = failures_markup_file self.dependencies_.extend( [ self.source_ , self.expected_results_file_ , self.failures_markup_file_ ] ) self.relevant_paths_.extend( [ self.source_ ] ) self.boost_paths_.extend( [ self.expected_results_file_, self.failures_markup_file_ ] ) def update( self ): def filter_xml( src, dest ): class xmlgen( xml.sax.saxutils.XMLGenerator ): def __init__( self, writer ): xml.sax.saxutils.XMLGenerator.__init__( self, writer ) self.trimmed = 0 self.character_content = "" def startElement( self, name, attrs): self.flush() xml.sax.saxutils.XMLGenerator.startElement( self, name, attrs ) def endElement( self, name ): self.flush() xml.sax.saxutils.XMLGenerator.endElement( self, name ) def flush( self ): content = self.character_content self.character_content = "" self.trimmed = 0 xml.sax.saxutils.XMLGenerator.characters( self, content ) def characters( self, content ): if not self.trimmed: max_size = pow( 2, 16 ) self.character_content += content if len( self.character_content ) > max_size: self.character_content = self.character_content[ : max_size ] + "...\n\n[The content has been trimmed by the report system because it exceeds %d bytes]" % max_size self.trimmed = 1 o = open( dest, "w" ) try: gen = xmlgen( o ) xml.sax.parse( src, gen ) finally: o.close() return dest utils.log( 'Merging "%s" with expected results...' % shorten( self.source_ ) ) try: trimmed_source = filter_xml( self.source_, '%s-trimmed.xml' % os.path.splitext( self.source_ )[0] ) utils.libxslt( utils.log , trimmed_source , xsl_path( 'add_expected_results.xsl' ) , self.file_path_ , { "expected_results_file" : self.expected_results_file_ , "failures_markup_file": self.failures_markup_file_ , "source" : self.tag_ } ) os.unlink( trimmed_source ) except Exception, msg: utils.log( ' Skipping "%s" due to errors (%s)' % ( self.source_, msg ) ) if os.path.exists( self.file_path_ ): os.unlink( self.file_path_ ) def _xml_timestamp( xml_path ): class timestamp_reader( xml.sax.handler.ContentHandler ): def startElement( self, name, attrs ): if name == 'test-run': self.timestamp = attrs.getValue( 'timestamp' ) raise self try: xml.sax.parse( xml_path, timestamp_reader() ) raise 'Cannot extract timestamp from "%s". Invalid XML file format?' % xml_path except timestamp_reader, x: return x.timestampclass make_links_action( action ): def __init__( self, source, destination, output_dir, tag, run_date, comment_file, failures_markup_file ): action.__init__( self, destination ) self.dependencies_.append( source ) self.source_ = source self.output_dir_ = output_dir self.tag_ = tag self.run_date_ = run_date self.comment_file_ = comment_file self.failures_markup_file_ = failures_markup_file self.links_file_path_ = os.path.join( output_dir, 'links.html' ) def update( self ): utils.makedirs( os.path.join( os.path.dirname( self.links_file_path_ ), "output" ) ) utils.makedirs( os.path.join( os.path.dirname( self.links_file_path_ ), "developer", "output" ) ) utils.makedirs( os.path.join( os.path.dirname( self.links_file_path_ ), "user", "output" ) ) utils.log( ' Making test output files...' ) try: utils.libxslt( utils.log , self.source_ , xsl_path( 'links_page.xsl' ) , self.links_file_path_ , { 'source': self.tag_ , 'run_date': self.run_date_ , 'comment_file': self.comment_file_ , 'explicit_markup_file': self.failures_markup_file_ } ) except Exception, msg: utils.log( ' Skipping "%s" due to errors (%s)' % ( self.source_, msg ) ) open( self.file_path_, "w" ).close()class unzip_action( action ): def __init__( self, source, destination, unzip_func ): action.__init__( self, destination ) self.dependencies_.append( source ) self.source_ = source self.unzip_func_ = unzip_func def update( self ): try: utils.log( ' Unzipping "%s" ... into "%s"' % ( shorten( self.source_ ), os.path.dirname( self.file_path_ ) ) ) self.unzip_func_( self.source_, os.path.dirname( self.file_path_ ) ) except Exception, msg: utils.log( ' Skipping "%s" due to errors (%s)' % ( self.source_, msg ) )def ftp_task( site, site_path , destination ): __log__ = 1 utils.log( '' ) utils.log( 'ftp_task: "ftp://%s/%s" -> %s' % ( site, site_path, destination ) ) utils.log( ' logging on ftp site %s' % site ) f = ftplib.FTP( site ) f.login() utils.log( ' cwd to "%s"' % site_path ) f.cwd( site_path ) source_content = list_ftp( f ) source_content = [ x for x in source_content if re.match( r'.+[.](?<!log[.])zip', x.name ) and x.name.lower() != 'boostbook.zip' ] destination_content = list_dir( destination ) d = diff( source_content, destination_content ) def synchronize(): for source in d[0]: utils.log( 'Copying "%s"' % source ) result = open( os.path.join( destination, source ), 'wb' ) f.retrbinary( 'RETR %s' % source, result.write ) result.close() mod_date = find_by_name( source_content, source ).date m = time.mktime( mod_date ) os.utime( os.path.join( destination, source ), ( m, m ) ) for obsolete in d[1]: utils.log( 'Deleting "%s"' % obsolete ) os.unlink( os.path.join( destination, obsolete ) ) utils.log( " Synchronizing..." ) __log__ = 2 synchronize() f.quit() def unzip_archives_task( source_dir, processed_dir, unzip_func ): utils.log( '' ) utils.log( 'unzip_archives_task: unpacking updated archives in "%s" into "%s"...' % ( source_dir, processed_dir ) ) __log__ = 1 target_files = [ os.path.join( processed_dir, os.path.basename( x.replace( ".zip", ".xml" ) ) ) for x in glob.glob( os.path.join( source_dir, "*.zip" ) ) ] + glob.glob( os.path.join( processed_dir, "*.xml" ) ) actions = [ unzip_action( os.path.join( source_dir, os.path.basename( x.replace( ".xml", ".zip" ) ) ), x, unzip_func ) for x in target_files ] for a in actions: a.run() def merge_xmls_task( source_dir, processed_dir, merged_dir, expected_results_file, failures_markup_file, tag ): utils.log( '' ) utils.log( 'merge_xmls_task: merging updated XMLs in "%s"...' % source_dir ) __log__ = 1 utils.makedirs( merged_dir ) target_files = [ os.path.join( merged_dir, os.path.basename( x ) ) for x in glob.glob( os.path.join( processed_dir, "*.xml" ) ) ] + glob.glob( os.path.join( merged_dir, "*.xml" ) ) actions = [ merge_xml_action( os.path.join( processed_dir, os.path.basename( x ) ) , x , expected_results_file , failures_markup_file , tag ) for x in target_files ] for a in actions: a.run()def make_links_task( input_dir, output_dir, tag, run_date, comment_file, extended_test_results, failures_markup_file ): utils.log( '' ) utils.log( 'make_links_task: make output files for test results in "%s"...' % input_dir ) __log__ = 1 target_files = [ x + ".links" for x in glob.glob( os.path.join( input_dir, "*.xml" ) ) ] + glob.glob( os.path.join( input_dir, "*.links" ) ) actions = [ make_links_action( x.replace( ".links", "" ) , x , output_dir , tag , run_date , comment_file , failures_markup_file ) for x in target_files ]
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -