Browse Source

Download testdata

The (binary) test data:
    raw sfxc output, known-good MS, known-good IDI file, support files (*.vix, *.lis)

should not live in the code repository. They're now in a bzip2'ed tarfile
that will be downloadable from ``.

On test execution does a simple check to see if the data has already been
downloaded and if not, downloads & inflates & extracts in the appropriate
haavee 8 months ago
  1. 54
  2. 628


@ -37,6 +37,7 @@ class EnvironmentBinaries(object):
return self._jplotter_fn_(self)
def get_python(self):
return self._python_fn_(self)
j2ms2 = property(get_j2ms2)
tConvert = property(get_tConvert)
jplotter = property(get_jplotter)
@ -111,11 +112,11 @@ binarys = DefaultBinaries()
parsert = argparse.ArgumentParser(add_help=True)
mutexgr = parsert.add_mutually_exclusive_group()
mutexgr.add_argument('--cmake-install-dir', help="test j2ms2/tConvert binaries from install_dir/bin",
dest='binaries', action=InstallDirAction, path='path__', default=binarys)#DefaultBinaries())
dest='binaries', action=InstallDirAction, path='path__', default=binarys)
mutexgr.add_argument('--cmake-build-dir', help="test j2ms2/tConvert binaries from build_dir/app/{j2ms2,tConvert}/",
dest='binaries', action=BuildDirAction, path='path__', default=binarys)#DefaultBinaries())
dest='binaries', action=BuildDirAction, path='path__', default=binarys)
parsert.add_argument('--jplotter', help="Path to jplotter to use (default: from $PATH)",
dest='binaries', action=jplotterAction, path='jplpath__', default=binarys)#DefaultBinaries())
dest='binaries', action=jplotterAction, path='jplpath__', default=binarys)
options, args = parsert.parse_known_args()
sys.argv[1:] = args
@ -157,6 +158,9 @@ class TTC(unittest.TestCase):
TTC.__curdir__ = os.getcwd()
# (2) where this specific file is (to find the raw and gold data files)
TTC.__rootdir__ = os.path.abspath( os.path.dirname(__file__) )
# Before creating any garbage, let's make sure the test data is available
# (possibly download it using wget)
# (3) create a temporary directory where to create new data files to compare
TTC.__tempdir__ = tempfile.mkdtemp()
#sys.stdout.write( "\n=======> Created tmp dir: {0}\n".format(self.tempDir) )
@ -166,6 +170,7 @@ class TTC(unittest.TestCase):
# And finally, we change directory to the newly created temp dir
# and populate it with the files as necessary
os.mkdir( TTC.__workdir__ )
# Now we can go on with our business
os.chdir( TTC.__workdir__ )
# symlink SFXC job 24427 into the work dir
os.symlink( os.path.join(TTC.__rootdir__, '24427'), './24427' )
@ -174,6 +179,39 @@ class TTC(unittest.TestCase):
shutil.copyfile( os.path.join(TTC.__rootdir__, f), os.path.join(TTC.__workdir__, f) )
print( " done" )
# Need to download several testfiles
def download_test_data(self):
# Only attempt to download if not already there!
# Check the first / last entries from the tar file, hoping to catch partial #fail
if os.path.isdir( os.path.join(TTC.__rootdir__, '24427') ) and \
os.path.isfile( os.path.join(TTC.__rootdir__, 'es085a_cont.lis') ):
opath = copy.deepcopy(sys.path)
# pre-compute name of downloadfile
bz2data = os.path.join(TTC.__rootdir__, "es085a_data.tar.bz2")
# Import the 'wget' python module from our own repo
sys.path.insert(0, os.path.join(TTC.__rootdir__, "python"))
import wget, tarfile
print("downloading test data...", end='')
# Download URL into workarea "",
out= bz2data )
with bz2data, "r:bz2" ) as tar:
print("inflate+extract...", end='')
tar.extractall( path = TTC.__rootdir__ )
# hope this triggers auto cleanup at some point
del sys.modules['wget']
del wget
sys.path = opath
os.path.unlink( bz2data )
def AtestFoo(self):
self.assertEqual(1+1, 2)
@ -210,7 +248,7 @@ class TTC(unittest.TestCase):
python = options.binaries.python,
compare = os.path.join(TTC.__rootdir__, "../python", ""),
selection = TTC.__es085a_comparison__,
gold_ms = os.path.join(TTC.__rootdir__, "") )
gold_ms = os.path.join(TTC.__rootdir__, "") )
# we need to prepare the environment
jplEnv = copy.deepcopy(os.environ)
# Make sure the pythonpath to jplotter is added
@ -343,8 +381,8 @@ class TTC(unittest.TestCase):
# data sets compared to the "gold" MS on Baseline McIb source 3c345 ...
# (and reporting different number of integrations 156 in stead of 155)
#gold_ms = os.path.join(TTC.__rootdir__, ""),
gold_ms = os.path.join(TTC.__rootdir__, ""),
gold_idi = os.path.join(TTC.__rootdir__, "ES085A_CONT.IDI"),
gold_ms = os.path.join(TTC.__rootdir__, ""),
gold_idi = os.path.join(TTC.__rootdir__, "ES085A.IDI"),
print("Start", cmd)
with open( os.path.join(TTC.__workdir__, 'testE.log'), 'w' ) as lf:
@ -366,8 +404,8 @@ class TTC(unittest.TestCase):
--ms --ms --idi ES085A_CONT.IDI --idi ES085A.IDI".format(
python = options.binaries.python,
verify = os.path.join(TTC.__rootdir__, "../jive-toolchain-verify/"),
gold_ms = os.path.join(TTC.__rootdir__, ""),
gold_idi = os.path.join(TTC.__rootdir__, "ES085A_CONT.IDI"),
gold_ms = os.path.join(TTC.__rootdir__, ""),
gold_idi = os.path.join(TTC.__rootdir__, "ES085A.IDI"),
print("Start", cmd)
with open( os.path.join(TTC.__workdir__, 'testF.log'), 'w' ) as lf:


@ -0,0 +1,628 @@
#!/usr/bin/env python
Download utility as an easy way to get file from the net
python -m wget <URL>
python <URL>
Development: is not option compatible with Unix wget utility,
to make command line interface intuitive for new people.
Public domain by anatoly techtonik <>
Also available under the terms of MIT license
Copyright (c) 2010-2015 anatoly techtonik
__version__ = "3.2"
import sys, shutil, os
import tempfile
import math
PY3K = sys.version_info >= (3, 0)
if PY3K:
import urllib.request as ulib
import urllib.parse as urlparse
import urllib as ulib
import urlparse
# --- workarounds for Python misbehavior ---
# enable passing unicode arguments from command line in Python 2.x
def win32_utf8_argv():
"""Uses shell32.GetCommandLineArgvW to get sys.argv as a list of Unicode
Versions 2.x of Python don't support Unicode in sys.argv on
Windows, with the underlying Windows API instead replacing multi-byte
characters with '?'.
from ctypes import POINTER, byref, cdll, c_int, windll
from ctypes.wintypes import LPCWSTR, LPWSTR
GetCommandLineW = cdll.kernel32.GetCommandLineW
GetCommandLineW.argtypes = []
GetCommandLineW.restype = LPCWSTR
CommandLineToArgvW = windll.shell32.CommandLineToArgvW
CommandLineToArgvW.argtypes = [LPCWSTR, POINTER(c_int)]
CommandLineToArgvW.restype = POINTER(LPWSTR)
cmd = GetCommandLineW()
argc = c_int(0)
argv = CommandLineToArgvW(cmd, byref(argc))
argnum = argc.value
sysnum = len(sys.argv)
result = []
if argnum > 0:
# Remove Python executable and commands if present
start = argnum - sysnum
for i in range(start, argnum):
return result
# enable unicode output to windows console
def win32_unicode_console():
import codecs
from ctypes import WINFUNCTYPE, windll, POINTER, byref, c_int
from ctypes.wintypes import BOOL, HANDLE, DWORD, LPWSTR, LPCWSTR, LPVOID
original_stderr = sys.stderr
# Output exceptions in this code to original_stderr, so that we can at least see them
def _complain(message):
original_stderr.write(message if isinstance(message, str) else repr(message))
codecs.register(lambda name: codecs.lookup('utf-8') if name == 'cp65001' else None)
GetStdHandle = WINFUNCTYPE(HANDLE, DWORD)(("GetStdHandle", windll.kernel32))
GetFileType = WINFUNCTYPE(DWORD, DWORD)(("GetFileType", windll.kernel32))
GetConsoleMode = WINFUNCTYPE(BOOL, HANDLE, POINTER(DWORD))(("GetConsoleMode", windll.kernel32))
def not_a_console(handle):
if handle == INVALID_HANDLE_VALUE or handle is None:
return True
return ((GetFileType(handle) & ~FILE_TYPE_REMOTE) != FILE_TYPE_CHAR
or GetConsoleMode(handle, byref(DWORD())) == 0)
old_stdout_fileno = None
old_stderr_fileno = None
if hasattr(sys.stdout, 'fileno'):
old_stdout_fileno = sys.stdout.fileno()
if hasattr(sys.stderr, 'fileno'):
old_stderr_fileno = sys.stderr.fileno()
real_stdout = (old_stdout_fileno == STDOUT_FILENO)
real_stderr = (old_stderr_fileno == STDERR_FILENO)
if real_stdout:
hStdout = GetStdHandle(STD_OUTPUT_HANDLE)
if not_a_console(hStdout):
real_stdout = False
if real_stderr:
hStderr = GetStdHandle(STD_ERROR_HANDLE)
if not_a_console(hStderr):
real_stderr = False
if real_stdout or real_stderr:
WriteConsoleW = WINFUNCTYPE(BOOL, HANDLE, LPWSTR, DWORD, POINTER(DWORD), LPVOID)(("WriteConsoleW", windll.kernel32))
class UnicodeOutput:
def __init__(self, hConsole, stream, fileno, name):
self._hConsole = hConsole
self._stream = stream
self._fileno = fileno
self.closed = False
self.softspace = False
self.mode = 'w'
self.encoding = 'utf-8' = name
def isatty(self):
return False
def close(self):
# don't really close the handle, that would only cause problems
self.closed = True
def fileno(self):
return self._fileno
def flush(self):
if self._hConsole is None:
except Exception as e:
_complain("%s.flush: %r from %r" % (, e, self._stream))
def write(self, text):
if self._hConsole is None:
if not PY3K and isinstance(text, unicode):
text = text.encode('utf-8')
elif PY3K and isinstance(text, str):
text = text.encode('utf-8')
if not PY3K and not isinstance(text, unicode):
text = str(text).decode('utf-8')
elif PY3K and not isinstance(text, str):
text = text.decode('utf-8')
remaining = len(text)
while remaining:
n = DWORD(0)
# There is a shorter-than-documented limitation on the
# length of the string passed to WriteConsoleW (see
# <>.
retval = WriteConsoleW(self._hConsole, text, min(remaining, 10000), byref(n), None)
if retval == 0 or n.value == 0:
raise IOError("WriteConsoleW returned %r, n.value = %r" % (retval, n.value))
remaining -= n.value
if not remaining:
text = text[n.value:]
except Exception as e:
_complain("%s.write: %r" % (, e))
def writelines(self, lines):
for line in lines:
except Exception as e:
_complain("%s.writelines: %r" % (, e))
if real_stdout:
sys.stdout = UnicodeOutput(hStdout, None, STDOUT_FILENO, '<Unicode console stdout>')
sys.stdout = UnicodeOutput(None, sys.stdout, old_stdout_fileno, '<Unicode redirected stdout>')
if real_stderr:
sys.stderr = UnicodeOutput(hStderr, None, STDERR_FILENO, '<Unicode console stderr>')
sys.stderr = UnicodeOutput(None, sys.stderr, old_stderr_fileno, '<Unicode redirected stderr>')
except Exception as e:
_complain("exception %r while fixing up sys.stdout and sys.stderr" % (e,))
# --- helpers ---
def to_unicode(filename):
""":return: filename decoded from utf-8 to unicode"""
if PY3K:
# [ ] test this on Python 3 + (Windows, Linux)
# [ ] port filename_from_headers once this works
# [ ] add test to repository / Travis
return filename
if isinstance(filename, unicode):
return filename
return unicode(filename, 'utf-8')
def filename_from_url(url):
""":return: detected filename as unicode or None"""
# [ ] test urlparse behavior with unicode url
fname = os.path.basename(urlparse.urlparse(url).path)
if len(fname.strip(" \n\t.")) == 0:
return None
return to_unicode(fname)
def filename_from_headers(headers):
"""Detect filename from Content-Disposition headers if present.
:param: headers as dict, list or string
:return: filename from content-disposition header or None
if type(headers) == str:
headers = headers.splitlines()
if type(headers) == list:
headers = dict([x.split(':', 1) for x in headers])
cdisp = headers.get("Content-Disposition")
if not cdisp:
return None
cdtype = cdisp.split(';')
if len(cdtype) == 1:
return None
if cdtype[0].strip().lower() not in ('inline', 'attachment'):
return None
# several filename params is illegal, but just in case
fnames = [x for x in cdtype[1:] if x.strip().startswith('filename=')]
if len(fnames) > 1:
return None
name = fnames[0].split('=')[1].strip(' \t"')
name = os.path.basename(name)
if not name:
return None
return name
def filename_fix_existing(filename):
"""Expands name portion of filename with numeric ' (x)' suffix to
return filename that doesn't exist already.
dirname = u'.'
name, ext = filename.rsplit('.', 1)
names = [x for x in os.listdir(dirname) if x.startswith(name)]
names = [x.rsplit('.', 1)[0] for x in names]
suffixes = [x.replace(name, '') for x in names]
# filter suffixes that match ' (x)' pattern
suffixes = [x[2:-1] for x in suffixes
if x.startswith(' (') and x.endswith(')')]
indexes = [int(x) for x in suffixes
if set(x) <= set('0123456789')]
idx = 1
if indexes:
idx += sorted(indexes)[-1]
return '%s (%d).%s' % (name, idx, ext)
# --- terminal/console output helpers ---
def get_console_width():
"""Return width of available window area. Autodetection works for
Windows and POSIX platforms. Returns 80 for others
Code from
if == 'nt':
# get console handle
from ctypes import windll, Structure, byref
from ctypes.wintypes import SHORT, WORD, DWORD
except ImportError:
# workaround for missing types in Python 2.5
from ctypes import (
c_short as SHORT, c_ushort as WORD, c_ulong as DWORD)
console_handle = windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class COORD(Structure):
_fields_ = [("X", SHORT), ("Y", SHORT)]
class SMALL_RECT(Structure):
_fields_ = [("Left", SHORT), ("Top", SHORT),
("Right", SHORT), ("Bottom", SHORT)]
_fields_ = [("dwSize", COORD),
("dwCursorPosition", COORD),
("wAttributes", WORD),
("srWindow", SMALL_RECT),
("dwMaximumWindowSize", DWORD)]
ret = windll.kernel32.GetConsoleScreenBufferInfo(
console_handle, byref(sbi))
if ret == 0:
return 0
return sbi.srWindow.Right+1
elif == 'posix':
from fcntl import ioctl
from termios import TIOCGWINSZ
from array import array
winsize = array("H", [0] * 4)
ioctl(sys.stdout.fileno(), TIOCGWINSZ, winsize)
except IOError:
return (winsize[1], winsize[0])[0]
return 80
def bar_thermometer(current, total, width=80):
"""Return thermometer style progress bar string. `total` argument
can not be zero. The minimum size of bar returned is 3. Example:
[.......... ]
Control and trailing symbols (\r and spaces) are not included.
See `bar_adaptive` for more information.
# number of dots on thermometer scale
avail_dots = width-2
shaded_dots = int(math.floor(float(current) / total * avail_dots))
return '[' + '.'*shaded_dots + ' '*(avail_dots-shaded_dots) + ']'
def bar_adaptive(current, total, width=80):
"""Return progress bar string for given values in one of three
styles depending on available width:
[.. ] downloaded / total
downloaded / total
[.. ]
if total value is unknown or <= 0, show bytes counter using two
adaptive styles:
%s / unknown
if there is not enough space on the screen, do not display anything
returned string doesn't include control characters like \r used to
place cursor at the beginning of the line to erase previous content.
this function leaves one free character at the end of string to
avoid automatic linefeed on Windows.
# process special case when total size is unknown and return immediately
if not total or total < 0:
msg = "%s / unknown" % current
if len(msg) < width: # leaves one character to avoid linefeed
return msg
if len("%s" % current) < width:
return "%s" % current
# --- adaptive layout algorithm ---
# [x] describe the format of the progress bar
# [x] describe min width for each data field
# [x] set priorities for each element
# [x] select elements to be shown
# [x] choose top priority element min_width < avail_width
# [x] lessen avail_width by value if min_width
# [x] exclude element from priority list and repeat
# 10% [.. ] 10/100
# pppp bbbbb sssssss
min_width = {
'percent': 4, # 100%
'bar': 3, # [.]
'size': len("%s" % total)*2 + 3, # 'xxxx / yyyy'
priority = ['percent', 'bar', 'size']
# select elements to show
selected = []
avail = width
for field in priority:
if min_width[field] < avail:
avail -= min_width[field]+1 # +1 is for separator or for reserved space at
# the end of line to avoid linefeed on Windows
# render
output = ''
for field in selected:
if field == 'percent':
# fixed size width for percentage
output += ('%s%%' % (100 * current // total)).rjust(min_width['percent'])
elif field == 'bar': # [. ]
# bar takes its min width + all available space
output += bar_thermometer(current, total, min_width['bar']+avail)
elif field == 'size':
# size field has a constant width (min == max)
output += ("%s / %s" % (current, total)).rjust(min_width['size'])
selected = selected[1:]
if selected:
output += ' ' # add field separator
return output
# --/ console helpers
__current_size = 0 # global state variable, which exists solely as a
# workaround against Python 3.3.0 regression
# fixed in Python 3.3.1
def callback_progress(blocks, block_size, total_size, bar_function):
"""callback function for urlretrieve that is called when connection is
created and when once for each block
draws adaptive progress bar in terminal/console
use sys.stdout.write() instead of "print,", because it allows one more
symbol at the line end without linefeed on Windows
:param blocks: number of blocks transferred so far
:param block_size: in bytes
:param total_size: in bytes, can be -1 if server doesn't return it
:param bar_function: another callback function to visualize progress
global __current_size
width = min(100, get_console_width())
if sys.version_info[:3] == (3, 3, 0): # regression workaround
if blocks == 0: # first call
__current_size = 0
__current_size += block_size
current_size = __current_size
current_size = min(blocks*block_size, total_size)
progress = bar_function(current_size, total_size, width)
if progress:
sys.stdout.write("\r" + progress)
def detect_filename(url=None, out=None, headers=None, default="download.wget"):
"""Return filename for saving file. If no filename is detected from output
argument, url or headers, return default (download.wget)
names = dict(out='', url='', headers='')
if out:
names["out"] = out or ''
if url:
names["url"] = filename_from_url(url) or ''
if headers:
names["headers"] = filename_from_headers(headers) or ''
return names["out"] or names["headers"] or names["url"] or default
def download(url, out=None, bar=bar_adaptive):
"""High level function, which downloads URL into tmp file in current
directory and then renames it to filename autodetected from either URL
or HTTP headers.
:param bar: function to track download progress (visualize etc.)
:param out: output filename or directory
:return: filename where URL is downloaded to
# detect of out is a directory
outdir = None
if out and os.path.isdir(out):
outdir = out
out = None
# get filename for temp file in current directory
prefix = detect_filename(url, out)
(fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=prefix, dir=".")
# set progress monitoring callback
def callback_charged(blocks, block_size, total_size):
# 'closure' to set bar drawing function in callback
callback_progress(blocks, block_size, total_size, bar_function=bar)
if bar:
callback = callback_charged
callback = None
if PY3K:
# Python 3 can not quote URL as needed
binurl = list(urlparse.urlsplit(url))
binurl[2] = urlparse.quote(binurl[2])
binurl = urlparse.urlunsplit(binurl)
binurl = url
(tmpfile, headers) = ulib.urlretrieve(binurl, tmpfile, callback)
filename = detect_filename(url, out, headers)
if outdir:
filename = outdir + "/" + filename
# add numeric ' (x)' suffix if filename already exists
if os.path.exists(filename):
filename = filename_fix_existing(filename)
shutil.move(tmpfile, filename)
#print headers
return filename
usage = """\
usage: [options] URL
-o --output FILE|DIR output filename or directory
-h --help
if __name__ == "__main__":
if len(sys.argv) < 2 or "-h" in sys.argv or "--help" in sys.argv:
if "--version" in sys.argv:
sys.exit(" " + __version__)
# patch Python 2.x to read unicode from command line
if not PY3K and sys.platform == "win32":
sys.argv = win32_utf8_argv()
# patch Python to write unicode characters to console
if sys.platform == "win32":
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-o", "--output", dest="output")
(options, args) = parser.parse_args()
url = sys.argv[1]
filename = download(args[0], out=options.output)
print("Saved under %s" % filename)
features that require more tuits for urlretrieve API
[x] autodetect filename from URL
[x] autodetect filename from headers - Content-Disposition
[ ] make HEAD request to detect temp filename from Content-Disposition
[ ] process HTTP status codes (i.e. 404 error)
[ ] catch KeyboardInterrupt
[ ] optionally preserve incomplete file
[x] create temp file in current directory
[ ] resume download (broken connection)
[ ] resume download (incomplete file)
[x] show progress indicator
[x] do not overwrite downloaded file
[x] rename file automatically if exists
[x] optionally specify path for downloaded file
[ ] options plan
[x] -h, --help, --version (CHAOS speccy)
[ ] clpbar progress bar style
_ 30.0Mb at 3.0 Mbps eta: 0:00:20 30% [===== ]
[ ] test "bar \r" print with \r at the end of line on Windows
[ ] process Python 2.x urllib.ContentTooShortError exception gracefully
(ideally retry and continue download)
(tmpfile, headers) = urllib.urlretrieve(url, tmpfile, callback_progress)
File "C:\Python27\lib\", line 93, in urlretrieve
return _urlopener.retrieve(url, filename, reporthook, data)
File "C:\Python27\lib\", line 283, in retrieve
"of %i bytes" % (read, size), result)
urllib.ContentTooShortError: retrieval incomplete: got only 15239952 out of 24807571 bytes
[ ] find out if urlretrieve may return unicode headers
[ ] write files with unicode characters
[x] Python 2, Windows
[x] Python 3, Windows
[ ] Linux
[ ] add automatic tests
[ ] specify unicode URL from command line
[ ] specify unicode output file from command line
[ ] test suite for unsafe filenames from url and from headers
[ ] security checks
[ ] filename_from_url
[ ] filename_from_headers
[ ] MITM redirect from https URL
[ ] https certificate check
[ ] size+hash check helpers
[ ] fail if size is known and mismatch
[ ] fail if hash mismatch