📄 boost_wide_report.py
字号:
# Copyright (c) MetaCommunications, Inc. 2003-2005
#
# Distributed under the Boost Software License, Version 1.0.
# (See accompanying file LICENSE_1_0.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)
import shutil
import codecs
import xml.sax.handler
import glob
import os.path
import os
import string
import time
import sys
import ftplib
import utils
import runner
report_types = [ 'us', 'ds', 'ud', 'dd', 'l', 'p', 'i', 'n', 'ddr', 'dsr', 'udr', 'usr' ]
if __name__ == '__main__':
run_dir = os.path.abspath( os.path.dirname( sys.argv[ 0 ] ) )
else:
run_dir = os.path.abspath( os.path.dirname( sys.modules[ __name__ ].__file__ ) )
def map_path( path ):
return os.path.join( run_dir, path )
def xsl_path( xsl_file_name ):
return map_path( os.path.join( 'xsl/v2', xsl_file_name ) )
class file_info:
def __init__( self, file_name, file_size, file_date ):
self.name = file_name
self.size = file_size
self.date = file_date
def __repr__( self ):
return "name: %s, size: %s, date %s" % ( self.name, self.size, self.date )
#
# Find the mod time from unix format directory listing line
#
def get_date( words ):
date = words[ 5: -1 ]
t = time.localtime()
month_names = [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" ]
year = time.localtime()[0] # If year is not secified is it the current year
month = month_names.index( date[0] ) + 1
day = int( date[1] )
hours = 0
minutes = 0
if date[2].find( ":" ) != -1:
( hours, minutes ) = [ int(x) for x in date[2].split( ":" ) ]
else:
# there is no way to get seconds for not current year dates
year = int( date[2] )
return ( year, month, day, hours, minutes, 0, 0, 0, 0 )
def list_ftp( f ):
# f is an ftp object
utils.log( "listing source content" )
lines = []
# 1. get all lines
f.dir( lambda x: lines.append( x ) )
# 2. split lines into words
word_lines = [ x.split( None, 8 ) for x in lines ]
# we don't need directories
result = [ file_info( l[-1], None, get_date( l ) ) for l in word_lines if l[0][0] != "d" ]
for f in result:
utils.log( " %s" % f )
return result
def list_dir( dir ):
utils.log( "listing destination content %s" % dir )
result = []
for file_path in glob.glob( os.path.join( dir, "*.zip" ) ):
if os.path.isfile( file_path ):
mod_time = time.localtime( os.path.getmtime( file_path ) )
mod_time = ( mod_time[0], mod_time[1], mod_time[2], mod_time[3], mod_time[4], mod_time[5], 0, 0, mod_time[8] )
# no size (for now)
result.append( file_info( os.path.basename( file_path ), None, mod_time ) )
for fi in result:
utils.log( " %s" % fi )
return result
def find_by_name( d, name ):
for dd in d:
if dd.name == name:
return dd
return None
def diff( source_dir_content, destination_dir_content ):
utils.log( "Finding updated files" )
result = ( [], [] ) # ( changed_files, obsolete_files )
for source_file in source_dir_content:
found = find_by_name( destination_dir_content, source_file.name )
if found is None: result[0].append( source_file.name )
elif time.mktime( found.date ) != time.mktime( source_file.date ): result[0].append( source_file.name )
else:
pass
for destination_file in destination_dir_content:
found = find_by_name( source_dir_content, destination_file.name )
if found is None: result[1].append( destination_file.name )
utils.log( " Updated files:" )
for f in result[0]:
utils.log( " %s" % f )
utils.log( " Obsolete files:" )
for f in result[1]:
utils.log( " %s" % f )
return result
def _modtime_timestamp( file ):
return os.stat( file ).st_mtime
root_paths = []
def shorten( file_path ):
root_paths.sort( lambda x, y: cmp( len(y ), len( x ) ) )
for root in root_paths:
if file_path.lower().startswith( root.lower() ):
return file_path[ len( root ): ].replace( "\\", "/" )
return file_path.replace( "\\", "/" )
class action:
def __init__( self, file_path ):
self.file_path_ = file_path
self.relevant_paths_ = [ self.file_path_ ]
self.boost_paths_ = []
self.dependencies_ = []
self.other_results_ = []
def run( self ):
utils.log( "%s: run" % shorten( self.file_path_ ) )
__log__ = 2
for dependency in self.dependencies_:
if not os.path.exists( dependency ):
utils.log( "%s doesn't exists, removing target" % shorten( dependency ) )
self.clean()
return
if not os.path.exists( self.file_path_ ):
utils.log( "target doesn't exists, building" )
self.update()
return
dst_timestamp = _modtime_timestamp( self.file_path_ )
utils.log( " target: %s [%s]" % ( shorten( self.file_path_ ), dst_timestamp ) )
needs_updating = 0
utils.log( " dependencies:" )
for dependency in self.dependencies_:
dm = _modtime_timestamp( dependency )
update_mark = ""
if dm > dst_timestamp:
needs_updating = 1
utils.log( ' %s [%s] %s' % ( shorten( dependency ), dm, update_mark ) )
if needs_updating:
utils.log( "target needs updating, rebuilding" )
self.update()
return
else:
utils.log( "target is up-to-date" )
def clean( self ):
to_unlink = self.other_results_ + [ self.file_path_ ]
for result in to_unlink:
utils.log( ' Deleting obsolete "%s"' % shorten( result ) )
if os.path.exists( result ):
os.unlink( result )
class merge_xml_action( action ):
def __init__( self, source, destination, expected_results_file, failures_markup_file ):
action.__init__( self, destination )
self.source_ = source
self.destination_ = destination
self.expected_results_file_ = expected_results_file
self.failures_markup_file_ = failures_markup_file
self.dependencies_.extend( [
self.source_
, self.expected_results_file_
, self.failures_markup_file_
]
)
self.relevant_paths_.extend( [ self.source_ ] )
self.boost_paths_.extend( [ self.expected_results_file_, self.failures_markup_file_ ] )
def update( self ):
utils.log( 'Merging "%s" with expected results...' % shorten( self.source_ ) )
utils.libxslt(
utils.log
, self.source_
, xsl_path( 'add_expected_results.xsl' )
, os.path.join( self.file_path_ )
, {
"expected_results_file" : self.expected_results_file_
, "failures_markup_file": self.failures_markup_file_
}
)
def _xml_timestamp( xml_path ):
class timestamp_reader( xml.sax.handler.ContentHandler ):
def startElement( self, name, attrs ):
if name == 'test-run':
self.timestamp = attrs.getValue( 'timestamp' )
raise self
try:
xml.sax.parse( xml_path, timestamp_reader() )
raise 'Cannot extract timestamp from "%s". Invalid XML file format?' % xml_path
except timestamp_reader, x:
return x.timestamp
class make_links_action( action ):
def __init__( self, source, destination, output_dir, tag, run_date, comment_file, failures_markup_file ):
action.__init__( self, destination )
self.dependencies_.append( source )
self.source_ = source
self.output_dir_ = output_dir
self.tag_ = tag
self.run_date_ = run_date
self.comment_file_ = comment_file
self.failures_markup_file_ = failures_markup_file
self.links_file_path_ = os.path.join( output_dir, 'links.html' )
def update( self ):
utils.makedirs( os.path.join( os.path.dirname( self.links_file_path_ ), "output" ) )
utils.makedirs( os.path.join( os.path.dirname( self.links_file_path_ ), "developer", "output" ) )
utils.makedirs( os.path.join( os.path.dirname( self.links_file_path_ ), "user", "output" ) )
utils.log( ' Making test output files...' )
utils.libxslt(
utils.log
, self.source_
, xsl_path( 'links_page.xsl' )
, self.links_file_path_
, {
'source': self.tag_
, 'run_date': self.run_date_
, 'comment_file': self.comment_file_
, 'explicit_markup_file': self.failures_markup_file_
}
)
open( self.file_path_, "w" ).close()
class unzip_action( action ):
def __init__( self, source, destination, unzip_func ):
action.__init__( self, destination )
self.dependencies_.append( source )
self.source_ = source
self.unzip_func_ = unzip_func
def update( self ):
try:
utils.log( ' Unzipping "%s" ... into "%s"' % ( shorten( self.source_ ), os.path.dirname( self.file_path_ ) ) )
self.unzip_func_( self.source_, os.path.dirname( self.file_path_ ) )
except Exception, msg:
utils.log( ' Skipping "%s" due to errors (%s)' % ( self.source_, msg ) )
def ftp_task( site, site_path , destination ):
__log__ = 1
utils.log( '' )
utils.log( 'ftp_task: "ftp://%s/%s" -> %s' % ( site, site_path, destination ) )
utils.log( ' logging on ftp site %s' % site )
f = ftplib.FTP( site )
f.login()
utils.log( ' cwd to "%s"' % site_path )
f.cwd( site_path )
source_content = list_ftp( f )
destination_content = list_dir( destination )
d = diff( source_content, destination_content )
def synchronize():
for source in d[0]:
utils.log( 'Copying "%s"' % source )
result = open( os.path.join( destination, source ), 'wb' )
f.retrbinary( 'RETR %s' % source, result.write )
result.close()
mod_date = find_by_name( source_content, source ).date
m = time.mktime( mod_date )
os.utime( os.path.join( destination, source ), ( m, m ) )
for obsolete in d[1]:
utils.log( 'Deleting "%s"' % obsolete )
os.unlink( os.path.join( destination, obsolete ) )
utils.log( " Synchronizing..." )
__log__ = 2
synchronize()
f.quit()
def unzip_archives_task( source_dir, processed_dir, unzip_func ):
utils.log( '' )
utils.log( 'unzip_archives_task: unpacking updated archives in "%s" into "%s"...' % ( source_dir, processed_dir ) )
__log__ = 1
target_files = [ os.path.join( processed_dir, os.path.basename( x.replace( ".zip", ".xml" ) ) ) for x in glob.glob( os.path.join( source_dir, "*.zip" ) ) ] + glob.glob( os.path.join( processed_dir, "*.xml" ) )
actions = [ unzip_action( os.path.join( source_dir, os.path.basename( x.replace( ".xml", ".zip" ) ) ), x, unzip_func ) for x in target_files ]
for a in actions:
a.run()
def merge_xmls_task( source_dir, processed_dir, merged_dir, expected_results_file, failures_markup_file ):
utils.log( '' )
utils.log( 'merge_xmls_task: merging updated XMLs in "%s"...' % source_dir )
__log__ = 1
utils.makedirs( merged_dir )
target_files = [ os.path.join( merged_dir, os.path.basename( x ) ) for x in glob.glob( os.path.join( processed_dir, "*.xml" ) ) ] + glob.glob( os.path.join( merged_dir, "*.xml" ) )
actions = [ merge_xml_action( os.path.join( processed_dir, os.path.basename( x ) )
, x
, expected_results_file
, failures_markup_file ) for x in target_files ]
for a in actions:
a.run()
def make_links_task( input_dir, output_dir, tag, run_date, comment_file, extended_test_results, failures_markup_file ):
utils.log( '' )
utils.log( 'make_links_task: make output files for test results in "%s"...' % input_dir )
__log__ = 1
target_files = [ x + ".links" for x in glob.glob( os.path.join( input_dir, "*.xml" ) ) ] + glob.glob( os.path.join( input_dir, "*.links" ) )
actions = [ make_links_action( x.replace( ".links", "" )
, x
, output_dir
, tag
, run_date
, comment_file
, failures_markup_file
) for x in target_files ]
for a in actions:
a.run()
class xmlgen( xml.sax.saxutils.XMLGenerator ):
document_started = 0
def startDocument(self):
if not self.document_started:
xml.sax.saxutils.XMLGenerator.startDocument( self )
self.document_started = 1
def merge_processed_test_runs( test_runs_dir, tag, writer ):
utils.log( '' )
utils.log( 'merge_processed_test_runs: merging processed test runs into a single XML... %s' % test_runs_dir )
__log__ = 1
all_runs_xml = xmlgen( writer )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -