A set of tools to verify the operation of the j2ms2 and tConvert programs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

555 lines
29 KiB

  1. #!/usr/bin/env python3
  2. #######################/opt/anaconda3/bin/python3
  3. # compare-ms-idi.py
  4. #
  5. # Analyse the antenna-, source- and frequency tables to see
  6. # if they represent the same information
  7. from __future__ import print_function
  8. from six import with_metaclass # Grrrrr http://python-future.org/compatible_idioms.html#metaclasses
  9. from functools import partial, reduce, total_ordering
  10. import sys, re, collections, glob, os, operator, itertools, astropy.io.fits, numpy, argparse
  11. import pyrap.tables as pt
  12. # everybody should love themselves some function composition. really.
  13. compose = lambda *fns : lambda x: reduce(lambda acc, f: f(acc), reversed(fns), x)
  14. pam = lambda *fns : lambda x: tuple(map(lambda fn: fn(x), fns)) # fns= [f0, f1, ...] => map(x, fns) => (f0(x), f1(x), ...)
  15. Apply = lambda *args : args[0](*args[1:]) # args= (f0, arg0, arg1,...) => f0(arg0, arg1, ...)
  16. identity = lambda x : x
  17. choice = lambda p, t, f : lambda x: t(x) if p(x) else f(x)
  18. const = lambda c : lambda *_: c
  19. method = lambda f : lambda *args, **kwargs: f(*args, **kwargs)
  20. wrap = lambda f, n=1 : lambda *args, **kwargs: f(*args[0:n]) # transform f to allow many parameters but pass only first n
  21. is_none = partial(operator.is_, None)
  22. Map = lambda fn : partial(map, fn)
  23. Group = lambda n : operator.methodcaller('group', n)
  24. GroupBy = lambda keyfn : partial(itertools.groupby, key=keyfn)
  25. Sort = lambda keyfn : partial(sorted, key=keyfn)
  26. Filter = lambda pred : partial(filter, pred)
  27. Reduce = lambda f, i=None : lambda l: reduce(f, l, i) if i is not None else reduce(f, l)
  28. ZipStar = lambda x : zip(*x)
  29. Star = lambda f : lambda args: f(*args)
  30. StarMap = lambda f : partial(itertools.starmap, f)
  31. D = lambda x : print(x) or x
  32. DD = lambda pfx : lambda x: print(pfx,":",x) or x
  33. D_i = lambda item : lambda x: print(x[item]) or x
  34. D_a = lambda attr : lambda x: print(getattr(x, attr)) or x
  35. DD_i = lambda pfx, item : lambda x: print(pfx,":",x[item]) or x
  36. DD_a = lambda pfx, attr : lambda x: print(pfx,":",getattr(x, attr)) or x
  37. Type = lambda **kwargs : type('', (), kwargs)
  38. Derived = lambda b, **kw : type('', (b,), kw) # Create easy derived type so attributes can be set/added
  39. Obj = lambda **kwargs : Type(**kwargs)()
  40. Append = lambda l, v : l.append(v) or l
  41. SetAttr = lambda o, a, v : setattr(o, a, v) or o
  42. XFormAttr= lambda attr, f, missing=None: lambda obj, *args, **kwargs: SetAttr(obj, attr, f(getattr(obj, attr, missing), *args, **kwargs))
  43. # Thanks to Py3 one must sometimes drain an iterable for its side effects. Thanks guys!
  44. # From https://docs.python.org/2/library/itertools.html#recipes
  45. # consume(), all_equal()
  46. consume = partial(collections.deque, maxlen=0)
  47. all_equal= compose(lambda g: next(g, True) and not next(g, False), itertools.groupby)
  48. # shorthands
  49. GetN = operator.itemgetter
  50. GetA = operator.attrgetter
  51. Repeat = itertools.repeat
  52. Call = operator.methodcaller
  53. # other generic stuff (own invention)
  54. GetA_xform = lambda **kwargs: lambda o: ( kwargs[k](getattr(o,k)) for k in kwargs.keys() )
  55. GetN_xform = lambda **kwargs: lambda o: ( kwargs[k](o[k], o) for k in kwargs.keys() )
  56. # Python3 comparison is much more strict; None cannot be compared anymore.
  57. # So if you need to Py2/Py3 compatible sort a list that can have None's in you must come up with something else.
  58. # Based on https://stackoverflow.com/a/26348624
  59. @total_ordering
  60. class NietsNut(object):
  61. __lt__ = method(const(True)) # compares less than to everything else
  62. __eq__ = operator.is_ # only compares equal to singleton instance
  63. __repr__ = method("Niets".format)
  64. Niets = NietsNut()
  65. # Use this as keyfn in "sorted(key=...)" to transparently replace a None key
  66. # with Niets, but leaving the sorted objects intact - i.e. leaving the None where it was
  67. sorteable_none = choice(is_none, const(Niets), identity)
  68. ##################################################################################
  69. #
  70. # this is the information we accumulate per primary key (whatever that may turn
  71. # out to be)
  72. #
  73. ##################################################################################
  74. # If you want to figure out if an element in a tuple/list happens to be None
  75. # and there is a risk that one of the elements is a numpy.array() you're up
  76. # Sh*t Creek - without a paddle:
  77. #
  78. # >>> import numpy
  79. # >>> a1 = numpy.arange(3)
  80. # >>> None in (a1, a1)
  81. # Traceback (most recent call last):
  82. # File "<stdin>", line 1, in <module>
  83. # ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
  84. #
  85. # This behaviour is numpy version dependend - older versions will work, newer will give this
  86. # exception FFS.
  87. #
  88. # So we just have to roll our own "is the value None present in this list/tuple?"
  89. # But that is not enough - if all operands to the operation are None they should compare equal as well
  90. # but without having to call the operator
  91. # index into the comparison table is (any, all).
  92. # we only need to check for conditions where
  93. # any == True and discriminate between all/some.
  94. # If any == False this implies all == False (all == True is impossible in this case)
  95. comp_table_ = {(True, True) : const(True), # all = True -> all operands None -> .EQ.
  96. (True, False): const(False)} # all = False -> some operands None -> .NE.
  97. any_all = compose(pam(any, all), Map(is_none))
  98. SafeCompare = lambda f: lambda *args: comp_table_.get( any_all(args), f)(*args)
  99. SumAttr = lambda a: lambda o0, o1: setattr(o0, a, getattr(o0, a)+getattr(o1, a))
  100. AppendAttr = lambda a: lambda o0, o1: setattr(o0, a, getattr(o0, a).append(getattr(o1, a)))
  101. FloatEq = SafeCompare(lambda l, r: abs(l-r)<1e-3)
  102. ArrayEq = SafeCompare(lambda l, r: numpy.allclose(l, r, atol=1e-3))
  103. StringEq = SafeCompare(lambda l, r: l.lower() == r.lower())
  104. ###################################3
  105. # For meta-data comparison we need:
  106. # - antenna info
  107. # position, diameter, mount, offset
  108. # - source info
  109. # name, ra, dec
  110. # - frequency info
  111. # start, end freq, #-of-channels (spectral channel width?)
  112. #
  113. # Creating keys for the antenna, sources can be simple:
  114. # ('antenna', <name>), ('source', <name>)
  115. # for the spectral windows it's going to be difficult to have unique key
  116. # ('frequency', "<lowest channel frequency>") with 6 digits behind the decimal point?
  117. # input: attribute1=function1, attribute2=function2
  118. # Will return function that replaces the value of attributeN with functionN( current value of attributeN )
  119. Replace = lambda **kwargs: lambda o: o._replace( **{k:f(getattr(o, k)) for k,f in kwargs.items()} )
  120. # A named tuple holding the concatenated field differences. A diff is True if it's not empty
  121. # (empty implies no differences, i.e. a "False" diff)
  122. diffbool = method(compose(operator.not_, GetA('diff')))
  123. Diff = Derived(collections.namedtuple('Diff', ['diff']), __repr__=method("DIFF: {0.diff}".format),
  124. __bool__ = diffbool, __nonzero__ = diffbool,
  125. __eq__ = lambda l, r: StringEq(l.diff, r.diff),
  126. __ne__ = lambda l, r: not (l==r),
  127. __hash__ = method(compose(Call('__hash__'), GetA('diff'))))
  128. # kwargs = { attribute:compare-function, attribute:compare-function, ...}
  129. # returns a function which returns an object that returns True if it's "empty" and not True if it has elements
  130. # the contents are the fields that are different
  131. diff_fmt = "{0}: {1} vs {2}".format
  132. def diff_reductor(l, r):
  133. def do_it(acc, item):
  134. diff = item(l, r)
  135. return (acc + ", " +diff if acc else diff) if diff else acc
  136. return do_it
  137. def mk_comp(attr, f):
  138. def do_comp(l, r):
  139. al, ar = getattr(l, attr), getattr(r, attr)
  140. return diff_fmt(attr, al, ar) if not f(al, ar) else ""
  141. return do_comp
  142. def mk_differ(**kwargs):
  143. to_compare = [mk_comp(attr, f) for attr, f in kwargs.items()]
  144. def do_it(l, r):
  145. return Diff( reduce(diff_reductor(l, r), to_compare, "") )
  146. return do_it
  147. AntennaStats = Derived(collections.namedtuple('AntennaStats', ['name', 'position', 'diameter', 'mount', 'offset']), source=None,
  148. __repr__ = method("ANT {0.name}: xyz={0.position} d={0.diameter} offset={0.offset} mount={0.mount}".format),
  149. __eq__ = lambda l,r: FloatEq(l.diameter, r.diameter) and ArrayEq(l.offset, r.offset)
  150. and ArrayEq(l.position, r.position) and StringEq(l.mount, r.mount),
  151. __ne__ = lambda l,r: not (l==r),
  152. __diff__ = mk_differ(diameter=FloatEq, offset=ArrayEq, position=ArrayEq, mount=StringEq))
  153. SourceStats = Derived(collections.namedtuple('SourceStats', ['name', 'position']), source=None,
  154. __repr__ = method("SRC {0.name}: RA/DEC {0.position}".format),
  155. __eq__ = lambda l,r: ArrayEq(l.position, r.position),
  156. __ne__ = lambda l,r: not (l==r),
  157. __diff__ = mk_differ(position=ArrayEq))
  158. FrequencyStats = Derived(collections.namedtuple('FrequencyStats', ['key', 'low', 'high', 'nchan']), source=None,
  159. __repr__ = method("SPW {0.key}: {0.low}-{0.high} #{0.nchan}".format),
  160. __eq__ = lambda l,r: FloatEq(l.low, r.low) and FloatEq(l.high, r.high) and l.nchan and r.nchan,
  161. __ne__ = lambda l,r: not (l==r),
  162. __diff__ = mk_differ(low=FloatEq, high=FloatEq, nchan=operator.__eq__))
  163. DefaultDict = Derived(collections.defaultdict)
  164. Source = lambda **kwargs: Obj(format=kwargs.get('format', 'Missing format='),
  165. location=kwargs.get('location', 'Missing location='),
  166. __repr__=method(compose("{0.format:>4s}: {0.location}".format, XFormAttr('format', str.upper))),
  167. __hash__=method(compose(Call('__hash__'), Call('__repr__'))))
  168. SetSource = XFormAttr('source', lambda _, tp: tp) # just overwrite the attribute's value
  169. #############################################################################################################################
  170. #
  171. # MeasurementSet specific handling
  172. #
  173. #############################################################################################################################
  174. # Transform row of MS Antenna table into AntennaStats.
  175. # note: order of fields in GetN() should match those to AntennaStats c'tor above)
  176. frequency_key = "{0:.3f}".format
  177. ms_antenna_props = compose(Replace(name=str.capitalize), Star(AntennaStats), GetN('NAME', 'POSITION', 'DISH_DIAMETER', 'MOUNT', 'OFFSET'))
  178. ms_source_props = compose(Replace(position=compose(numpy.rad2deg, GetN(0))), Replace(name=str.upper), Star(SourceStats), GetN('NAME', 'REFERENCE_DIR'))
  179. ms_freq_props = compose(Replace(key=compose(frequency_key, numpy.min), low=numpy.min, high=numpy.max),
  180. Star(FrequencyStats), GetN('CHAN_FREQ', 'CHAN_FREQ', 'CHAN_FREQ', 'NUM_CHAN'))
  181. open_ms = partial(pt.table, readonly=True, ack=True)
  182. mssubtable = lambda t: compose(partial(pt.table, readonly=True, ack=False), Call('getkeyword', t))
  183. ms_antenna, ms_field, ms_spectralwindow = list(map(mssubtable, ['ANTENNA', 'FIELD', 'SPECTRAL_WINDOW']))
  184. def process_ms(path_to_ms):
  185. set_source = lambda s: SetSource(s, Source(format='ms', location=path_to_ms))
  186. # The accumulator functions, taking a row of the appropriate table as input
  187. def accumulate_ant(acc, antenna):
  188. antenna = ms_antenna_props(antenna)
  189. return acc[ ('antenna', antenna.name) ].append( set_source(antenna) ) or acc
  190. def accumulate_src(acc, source):
  191. source = ms_source_props(source)
  192. return acc[ ('source', source.name) ].append( set_source(source) ) or acc
  193. def accumulate_spw(acc, spw):
  194. spw = ms_freq_props(spw)
  195. return acc[ ('frequency', spw.key) ].append( set_source(spw) ) or acc
  196. rv = DefaultDict(list)
  197. with open_ms(path_to_ms) as ms:
  198. with ms_antenna(ms) as at:
  199. rv = reduce(accumulate_ant, at, rv)
  200. with ms_field(ms) as ft:
  201. rv = reduce(accumulate_src, ft, rv)
  202. with ms_spectralwindow(ms) as spwt:
  203. rv = reduce(accumulate_spw, spwt, rv)
  204. return set_source(rv)
  205. #############################################################################################################################
  206. #
  207. # FITS-IDI specific handling
  208. #
  209. #############################################################################################################################
  210. # input: list of (absolute) path names, output: string of grouped-by-containing-directory of the file names
  211. summarize = compose("; ".join, Map("{0[0]}: {0[1]}*".format), Map(pam(GetN(0),
  212. compose(os.path.commonprefix, list, Map(GetN(1)), GetN(1)))),
  213. GroupBy(GetN(0)), Sort(GetN(0)), Map(pam(os.path.dirname, os.path.basename)))
  214. # look for paths that have "<stem>[0-9]+" and group by <stem> and compile list of suffixes
  215. idi_chunk = re.compile(r"^(?P<stem>.*\.IDI)(?P<suffix>[0-9]+)$").match
  216. # from a single groupby result:
  217. # (key, [(i, x), (i, x), ...])
  218. # extract all x'es, and return a tuple:
  219. # (x[0], x[-1])
  220. # i.e. the first + last values of the sequence
  221. # handle the case where the groupby returns an empty list (allegedly this can happen)
  222. ranger = compose(choice(operator.truth, pam(GetN(0),GetN(-1)), const(None)), list, Map(GetN(1)), GetN(1))
  223. ## Finds consecutive ranges in the sequence.
  224. ## Returns a list of (start, end) values.
  225. ## Single, non-consecutive, numbers are returned as (start, end) with end==start
  226. ##
  227. ## Based on
  228. ## http://stackoverflow.com/questions/2361945/detecting-consecutive-integers-in-a-list
  229. consecutive_ranges = compose(Filter(operator.truth), Map(ranger), GroupBy(lambda ix: ix[0]-ix[1]), enumerate, sorted)
  230. # input: list of consecutive ranges, output:
  231. # "[start-end, start-end, start, start-end, ...]"
  232. # (a single "start" means a non-consecutive, single, number)
  233. n_fmt_ = "{0[0]}".format
  234. r_fmt_ = "{0[0]}-{0[1]}".format
  235. compressed_suffixes= compose("[{0}]".format, ",".join, Map(lambda se: (n_fmt_ if se[0]==se[1] else r_fmt_)(se)))
  236. # Take a list of items, item = (Source, stem, 'suffix')
  237. # return two things: Source and a nicely formatted compressed
  238. # string of consecutive ranges: "[start-end, start, start-end, ...]" found in the suffixes
  239. # (All Source and stem values should be equal in this one since they were grouped by 'stem', i.e.
  240. # the (unique) file name.)
  241. decompose_idilist = compose(pam(compose(GetN(0), GetN(0)), # Source[0]
  242. compose(compressed_suffixes, consecutive_ranges, GetN(2))), # "[start-end, start, ...]"
  243. list, ZipStar, Map(pam(GetN(0), GetN(1), compose(int, GetN(2)))))
  244. # item = (key, list-of-items-sharing-key)
  245. # The items sharing key are tuples: (Source, stem, 'suffix')
  246. # and the key by which the items are grouped happes to be stem
  247. def xform_gb(acc, item):
  248. # item = key, [list of items]
  249. key, l = item
  250. l = list(l)
  251. if key is None: #in [None, Niets]:
  252. # If key is None, the file names did not have a suffix so should be good enough
  253. # to pass on the original Source objects
  254. # just append the list of sources to acc
  255. acc.extend( map(GetN(0), l) )
  256. else:
  257. # ok we have list of suffixes for this stem
  258. # transform to int, sort, and compress.
  259. # retain the original Source objects because we need the type of the format
  260. # We already have the stem - it's the key
  261. source, suffix = decompose_idilist( l )
  262. # Now that we have a source, the stem and the consecutive ranges as suffix
  263. # we can append a new Source object, formatted as:
  264. # <format>:<stem>[1-3,6,12-14, ...]
  265. acc.append( Source(format=source.format, location=key+suffix) )
  266. return acc
  267. sortkey = compose(sorteable_none, GetN(1)) # defines the key to sort/groupby below
  268. xform_item = choice(compose(partial(operator.is_, None), GetN(1)), pam(GetN(0), const(None), const(None)),
  269. pam(GetN(0), compose(Group('stem'), GetN(1)), compose(Group('suffix'), GetN(1))))
  270. summarize3_= compose(GroupBy(GetN(1)), Sort(sortkey), Map(xform_item), Map(pam(identity, compose(idi_chunk, GetA('location')))))
  271. summarize3 = lambda l: reduce(xform_gb, summarize3_(l), list())
  272. # Helpers for dealing with FITS-IDI files
  273. # - opening an IDI file in a specific mode, adding Debug output prints the
  274. # actual file name, which is nice feedback for the usert :-)
  275. # - building antenna, source lookup tables
  276. # - Given an opened FITS-IDI object, returning the accumulated statistics from the UV_DATA table
  277. open_idi = compose(partial(astropy.io.fits.open, memmap=True, mode='readonly'), D)
  278. idi_tbl = lambda *tbl: pam(*map(lambda c: compose(GetA('data'), GetN(c)), tbl))
  279. anname_key = compose(str.upper, str)
  280. anname_key_l = compose(list, Map(anname_key))
  281. # From AIPSMEM114r, integer codes matched to MeasurementSet strings
  282. idi_mntsta_ = { 1:"ALT-AZ", 2:"EQUATORIAL", 2:"ORBITING", 3:"X-Y", 4:"ALT-AZ-NASMYTH-RH", 5:"ALT-AZ-NASMYTH-LH" }
  283. idi_mntsta = lambda mntsta: idi_mntsta_.get(mntsta, "UnknownMNTSTA#{0}".format(mntsta))
  284. # in FITS-IDI antenna info is spread over two tables: ANTENNA and ARRAY_GEOMETRY
  285. def acc_idi_antenna_table(acc, idi):
  286. an, ag = idi_tbl('ANTENNA', 'ARRAY_GEOMETRY')(idi)
  287. # array geometry table has anname column to link to antenna table
  288. agidx = anname_key_l(ag['ANNAME'])
  289. diameter = lambda i: None if 'DIAMETER' not in ag.columns.names else ag['DIAMETER'][i]
  290. stabxyz, mntsta, staxof = GetN('STABXYZ', 'MNTSTA', 'STAXOF')(ag)
  291. def mk_idi_antenna(acc, station):
  292. name = station.capitalize()
  293. key = ('antenna', name)
  294. # find row in array_geometry describing this station
  295. idx = agidx.index( station )
  296. acc[ key ].append( SetSource(AntennaStats(name, stabxyz[idx], diameter(idx), idi_mntsta(mntsta[idx]), staxof[idx]),
  297. Source(format='idi', location=idi.filename())) )
  298. return acc
  299. return reduce(mk_idi_antenna, anname_key_l(an['ANNAME']), acc)
  300. def acc_idi_source_table(acc, idi):
  301. def mk_idi_source(acc, source):
  302. source, raepo, decepo = source
  303. source = anname_key(source)
  304. key = ('source', source)
  305. acc[ key ].append( SetSource(SourceStats(source, numpy.array([float(raepo), float(decepo)])),
  306. Source(format='idi', location=idi.filename())) )
  307. return acc
  308. return reduce(mk_idi_source, zip(*GetN('SOURCE', 'RAEPO', 'DECEPO')(idi['SOURCE'].data)), acc)
  309. # From AIPSMEM114r:
  310. # "Band frequency offsets. The BANDFREQ column shall contain a one-dimensional
  311. # array of band-specific frequency offsets. There shall be one element for each
  312. # band in the file. The offset for the first band in the frequency setup with
  313. # FREQID value 1 should be 0 Hz. Frequency offsets may be of either sign."
  314. #
  315. # The offset is measured w.r.t. the value of the "REF_FREQ" mandatory header keyword.
  316. # Number of spectral channels is encoded in the value of the "NO_CHAN" mandatory header keyword.
  317. #
  318. idi_freq_hdr_values = compose(GetN_xform(REF_FREQ=wrap(float), NO_CHAN=wrap(int)), GetA('header'))
  319. idi_freq_rows = compose(ZipStar, GetN('BANDFREQ', 'TOTAL_BANDWIDTH', 'CH_WIDTH'), GetA('data'))
  320. def acc_idi_frequency_table(acc, idi):
  321. frequency = idi['FREQUENCY']
  322. ref_freq, no_chan = idi_freq_hdr_values(frequency)
  323. def mk_idi_spw(acc, spw):
  324. (bandfreq, tot_bw, ch_width) = spw
  325. bandfreq = ref_freq + bandfreq
  326. freqkey = frequency_key(bandfreq)
  327. key = ('frequency', freqkey)
  328. acc[ key ].append( SetSource(FrequencyStats(freqkey, bandfreq, bandfreq+(no_chan-1)*ch_width, no_chan),
  329. Source(format='idi', location=idi.filename())) )
  330. return acc
  331. def process_one_setup(acc, setup):
  332. # One row in the frequency table describes a number of spectral windows, a setup
  333. return reduce(mk_idi_spw, zip(*setup), acc)
  334. return reduce(process_one_setup, idi_freq_rows(frequency), acc)
  335. # Actually process a series of FITS-IDI file(s)
  336. def process_idi(list_of_idis):
  337. # reduction of a single FITS-IDI file
  338. def process_one_idi(acc, path_to_idi):
  339. idi = open_idi(path_to_idi)
  340. acc_idi_antenna_table(acc, idi)
  341. acc_idi_source_table(acc, idi)
  342. acc_idi_frequency_table(acc, idi)
  343. return acc
  344. return SetSource(reduce(process_one_idi, list_of_idis, DefaultDict(list)), Source(format='idi', location=summarize(list_of_idis)))
  345. ##########################################################################################################################################
  346. #
  347. #
  348. # The analysis part starts here
  349. #
  350. #
  351. ##########################################################################################################################################
  352. # extract the source attribute from a list of items
  353. get_src = compose(list, Map(GetA('source')))
  354. # expected input: (set(keys), Stats)
  355. # output: the sorted keys in set(keys)
  356. sorted_keys = compose(sorted, GetN(0))
  357. def split_equal(values):
  358. rv = []
  359. for (v, i) in itertools.groupby(values):
  360. rv.append( (v, get_src(i)) )
  361. return rv
  362. # input: entry from split_equal: (value, [source, source, ...])
  363. # output: formatted string
  364. # \t<value> found in:
  365. # \t\tsrc#0
  366. # \t\tsrc#1
  367. # ...
  368. source_fmt = compose("\n\t\t".join, Map(Call('__repr__')))
  369. print_group = compose(D, "\t{0[0]} found in:\n\t\t{0[1]}".format, pam(GetN(0), compose(source_fmt, summarize3, GetN(1))))
  370. ## helper functions to group lists-of-datasources by identical diff
  371. ## in order to make the output as compact as possible
  372. ## (so if there are values with identical diffs are found across different
  373. ## file formats the actual diff will only be reported once)
  374. def reduce_diff(acc, item):
  375. ref_val, group = acc
  376. value, sources = item
  377. if ref_val is None:
  378. print_group(item)
  379. return (value, group)
  380. # now collect the data sources by actual diff
  381. group[ value.__diff__(ref_val) ].update( sources )
  382. return (ref_val, group)
  383. # values is [(value, [source, source, ...]), (value, [source, source, ...]), ... ]
  384. # where value is a value that was found equal in all accompanying sources
  385. # these are assumed to be values associated with the same key (source, antenna or spectral window)
  386. # so we want to highlight the differences.
  387. # Note: we know values is at least a list of length 2 because if length == 1 there are
  388. # no differing values and so the code doesn't bother to report those
  389. def print_diffs(key, values):
  390. print(key,":")
  391. compose(consume, Map(print_group), Call('items'), GetN(1), Reduce(reduce_diff, (None, collections.defaultdict(set))))(values)
  392. return None
  393. # take a list of dicts with dict(key=>statistics) and compare them
  394. def report(list_of_stats):
  395. # transform [Stats, Stats, ...] into [(set(keys), Stats), (set(keys), Stats), ...]
  396. # Yes. I know. The keys of a dict ARE unique by definition so the "set()" doesn't seem to
  397. # add anything. However, when finding out common or extra or missing keys across multiple data sets
  398. # the set() operations like intersect and difference are exactly what we need.
  399. # So by creating them as set() objects once makes them reusable.
  400. list_of_stats = compose(list, Map(lambda s: (set(s.keys()), s)))(list_of_stats)
  401. # can only report on common keys
  402. common_keys = reduce(set.intersection, map(GetN(0), list_of_stats))
  403. # warn about non-matching keys
  404. def count_extra_keys(acc, item): # item = (set(keys), Statistics())
  405. print("="*4, "Problem report", "="*4+"\n", item[1].source, "\n", "Extra keys:\n",
  406. "\n".join(map(compose("\t{0[0]} found {0[1]} times".format, pam(identity, lambda k: len(item[1][k]))), sorted_keys(item))), "\n"+"="*25)
  407. return acc.append( len(item[0]) ) or acc
  408. nExtra = reduce(count_extra_keys,
  409. filter(compose(operator.truth, GetN(0)),
  410. Map(pam(compose(common_keys.__rsub__, GetN(0)), GetN(1)))(list_of_stats)),
  411. list())
  412. # For each common key check all collected values are the same by counting the number of keys
  413. # that don't fulfil this predicate. Also print all values+source whence they came in case of such a mismatch
  414. get_all = lambda k: compose(split_equal, Reduce(operator.__add__), Map(compose(GetN(k), GetN(1))))
  415. def check_key(acc, key):
  416. values = get_all(key)(list_of_stats)
  417. # values is [(value, [source, source, ...]), (value, [source, source, ...]), ... ]
  418. # i.e. the unique values for the current key and the sources in which those values were found
  419. # if that list is of length 1 all values are equal and thus no problems
  420. return acc if len(values)==1 else print_diffs(key, values) or acc+1
  421. nProb = reduce(check_key, sorted(common_keys), 0)
  422. # And one line feedback about what we found in total
  423. print("Checked", len(list_of_stats), "data sets,", len(common_keys), "common keys",
  424. choice(partial(operator.eq, 0), const(""), "with {0} problems identified".format)(nProb),
  425. choice(operator.truth, compose("and {0[1]} non-common keys in {0[0]} formats".format, pam(len, sum)), const(""))(nExtra))
  426. return nProb
  427. # If *all* keys have exactly one value that means there is absolutely nothing to check/compare
  428. # e.g. if multipart IDI fits with N IDI files, each IDI file has an antenna-, source- and frequency table.
  429. # I.e.: one data set but still multiple values!
  430. # nothing_to_compare() analyzes the list of accumulated Stats and returns True if every key has only one value associated.
  431. nothing_to_compare = compose(Reduce(lambda tf, kv: kv[1]==1 if tf is True else tf, True), Call('items'),
  432. Reduce(lambda a, s: reduce(lambda b, kv: b.update([kv[0]]*len(kv[1])) or b, s.items(), a),
  433. collections.Counter()))
  434. # pretty_print() will only be called if all keys found across all datasets have exactly one value
  435. # - i.e. there is nothing to compare
  436. pretty_print = compose(consume, Map(compose(D, GetN(0), GetN(1))), Sort(GetN(0)), Call('items'), D_a('source'))
  437. # This is the main function what to do: execute all statistics-gathering functions
  438. # and if there are things to compare report the differences
  439. main = compose(sys.exit, choice(nothing_to_compare, compose(const(0), consume, Map(pretty_print)), report), list, Map(Apply))
  440. ############################################################################
  441. #
  442. # Let argparse do some work - such that we don't have to repeat it
  443. #
  444. ############################################################################
  445. # This metaclass extracts the 'process_fn' from a Process()'s __init__'s
  446. # keyword arguments and turns it into an argparse.Action __call__() function
  447. # which appends a lambda() to the Action's destination, which, when called,
  448. # executes the processing function
  449. class ProcessMeta(type):
  450. def __call__(cls, *args, **kwargs):
  451. # Note: pop the 'process_fn=' keyword from kwargs such that
  452. # when Process.__init__() calls argparse.Action.__init__(), the
  453. # latter won't barf; it doesn't like unrecognized keyword arguments ...
  454. process_fn = kwargs.pop('process_fn', None)
  455. if process_fn is None:
  456. raise RuntimeError("Missing process_fn= keyword argument in __init__() for ", kwargs.get('help', "Unknown Process action"))
  457. # Need to do two things: add the __call__() special method to the class
  458. # which looks up the actual method-to-call in the instance
  459. # See: https://stackoverflow.com/a/33824320/26083
  460. setattr(cls, '__call__', lambda *args, **kwargs: args[0].__call__(*args, **kwargs))
  461. # and now decorate the instance with a call method consistent with
  462. # https://docs.python.org/3/library/argparse.html#action-classes
  463. # "Action instances should be callable, so subclasses must override
  464. # the __call__ method, which should accept four parameters: ..."
  465. return SetAttr(type.__call__(cls, *args, **kwargs), '__call__',
  466. lambda self, _parser, namespace, values, _opt: XFormAttr(self.dest, Append)(namespace, lambda: process_fn(values)))
  467. # Create a custom argparse.Action to add statistics gathering functions
  468. Process = Derived(with_metaclass(ProcessMeta, argparse.Action))
  469. if __name__ == '__main__':
  470. ###################################################################################################################
  471. #
  472. # Here's where the only interesting stuff happens: parsing the command line & executing stuff!
  473. #
  474. ###################################################################################################################
  475. parsert = argparse.ArgumentParser(description="Compare meta data contents between MeasurementSet(s) and/or FITS-IDI files")
  476. parsert.add_argument('--ms', action=Process, dest='path', default=list(), required=False, help="Specify the source MeasurementSet", process_fn=process_ms)
  477. parsert.add_argument('--idi', action=Process, dest='path', default=list(), nargs='*', help="The FITS-IDI file(s) produced from 'ms='", process_fn=process_idi)
  478. parsert.add_argument('--lis', action=Process, dest='path', default=list(), required=False, help="The .lis file that was used to construct 'ms='",
  479. process_fn=compose(lambda lis: SetSource(DefaultDict(list), Source(format='lis', location=lis)), DD("lis-file processing not yet implemented")))
  480. main(parsert.parse_args().path)