Compare commits

...

4 Commits
master ... sfxc

Author SHA1 Message Date
haavee 8e831008e6 More sharing between IDI and UVF 3 years ago
haavee 26dcb8e22b Merge branch 'uvfits' into sfxc 3 years ago
haavee 0a78ebc1e2 Add UVFITS support 3 years ago
haavee 1d58073739 Started to work on parsing sfxc cor file(s) 3 years ago
  1. 145
      compare-ms-idi.py

145
compare-ms-idi.py

@ -2,26 +2,28 @@
#######################/opt/anaconda3/bin/python3
# compare-ms-idi.py
#
# in order to gain confidence that everything from
# a MeasurementSet ended up the FITS-IDI file this
# tool accumulates statistics (currently: exposure time)
# per baseline per source.
# Deviations between source and destination numbers
# might indicate problems.
# in order to gain confidence that everything from a MeasurementSet ended up
# the FITS-IDI file this tool accumulates statistics (currently: exposure time
# and weight) per baseline per source. Deviations between source and
# destination findings might indicate problems.
from __future__ import print_function
from six import with_metaclass # Grrrrr http://python-future.org/compatible_idioms.html#metaclasses
from functools import partial, reduce
import sys, re, collections, glob, os, operator, itertools, astropy.io.fits, numpy, argparse
import sys, re, collections, glob, os, operator, struct, itertools, astropy.io.fits, numpy, argparse
import pyrap.tables as pt
# everybody should love themselves some function composition. really.
compose = lambda *fns : lambda x: reduce(lambda acc, f: f(acc), reversed(fns), x)
pam = lambda *fns : lambda x: tuple(map(lambda fn: fn(x), fns)) # fns= [f0, f1, ...] => map(x, fns) => (f0(x), f1(x), ...)
# compose: fns = [f0, f1, ...] -> f' such that f'(x) = f0(f1(..(x)))
compose = lambda *fns : lambda x: reduce(lambda acc, f: f(acc), reversed(fns), x)
# map: map(f, [x0, x1, ...]) -> [f(x0), f(x1), ...]) (map one function over many values)
# pam: pam(fns= [f0, f1, ...]) -> f' such that f'(x) = (f0(x), f1(x), ...) (map many functions over one value)
pam = lambda *fns : lambda x: tuple(map(lambda fn: fn(x), fns))
Apply = lambda *args : args[0](*args[1:]) # args= (f0, arg0, arg1,...) => f0(arg0, arg1, ...)
identity = lambda x : x
choice = lambda p, t, f : lambda x: t(x) if p(x) else f(x)
const = lambda c : lambda *_: c
method = lambda f : lambda *args, **kwargs: f(*args, **kwargs)
# about the slowest partitioning but it fits on one line
partition= lambda seq, pred : reduce(lambda acc, value: acc[0 if pred(value) else 1].append(value) or acc, seq, (list(), list()))
Map = lambda fn : partial(map, fn)
#Group = lambda n : operator.methodcaller('group', n)
GroupBy = lambda keyfn : partial(itertools.groupby, key=keyfn)
@ -52,6 +54,17 @@ Call = operator.methodcaller
# other generic stuff (own invention)
n_times = choice(partial(operator.ne, 1), "{0:6d} times".format, const("once")) # polite integer formatting w/ special case for 1
# these have to be done with defs :-(
def Raise(exception, *args, **kwargs):
def do_it(*_, **__):
raise exception(*args, **kwargs)
return do_it
def Try(f, exception_fn):
def do_it(x):
try: return f(x)
except: return exception_fn(x)
return do_it
##################################################################################
#
# this is the information we accumulate per primary key (whatever that may turn
@ -238,6 +251,13 @@ mk_idi_antab = mk_idi_lut('ANTENNA', ['ANTENNA_NO', 'ANNAME'], "Antenna#{0}".fo
mk_idi_srctab = mk_idi_lut('SOURCE' , ['SOURCE_ID' , 'SOURCE'], "Source#{0}".format)
get_idi_stats = compose(Star(analyze_uvdata_table), pam(*list(map(GetN, ['BASELINE', 'SOURCE_ID', 'INTTIM', 'FLUX']))), GetA('data'), GetN('UV_DATA'))
def aggregate(antab, srctab):
def do_aggregate(a, item):
a1, a2 = divmod(item[0][0], 256)
a[ (antab[a1]+antab[a2], srctab[item[0][1]]) ] += item[1]
return a
return do_aggregate
# Actually process a series of FITS-IDI file(s)
def process_idi(list_of_idis):
# reduction of a single FITS-IDI file
@ -248,14 +268,104 @@ def process_idi(list_of_idis):
antab, srctab = pam(mk_idi_antab, mk_idi_srctab)(idi)
# aggregate items we found in this IDI into the global accumulator
# in the process we unmap (baseline, source_id) to ('Anname1Anname2', 'Sourcename')
def aggregate(a, item):
a1, a2 = divmod(item[0][0], 256)
a[ (antab[a1]+antab[a2], srctab[item[0][1]]) ] += item[1]
return a
return reduce(aggregate, drain_dict(stats), acc)
return reduce(aggregate(antab, srctab), drain_dict(stats), acc)
# reduce all IDI files and set the source attribute on the returned object to tell whence these came
return SetSource(reduce(process_one_idi, list_of_idis, DefaultDict(Statistics)), Source(format='idi', location=summarize(list_of_idis)))
#############################################################################################################################
#
# UVFITS specific handling
#
#############################################################################################################################
# Create a function which creates a LUT based on enumeration in stead of taking the index from a column
mk_enum_lut = lambda tbl, col, unk: compose(partial(WrapLookup, unknown=unk), dict, ZipStar,
pam(lambda _: itertools.count(1),
compose(Map(compose(str.capitalize, str)), GetN(col))),
GetA('data'), GetN(tbl))
mk_uvf_antab = mk_enum_lut('AIPS AN', 'ANNAME', "Antenna#{0}".format)
mk_uvf_srctab = mk_idi_lut('AIPS SU' , ['ID. NO.' , 'SOURCE'], "Source#{0}".format)
uvf_param_int = lambda p: (lambda obj: obj.par(p).astype(numpy.int))
get_uvf_stats = compose(Star(analyze_uvdata_table),
pam(*list(map(uvf_param_int, ['BASELINE', 'SOURCE', 'INTTIM']))+[GetA('data')]),
GetA('data'), GetN(0))
def process_uvf(path_to_one_uvf):
uvf = open_idi(path_to_one_uvf)
# construct antenna, source unmappers
antab, srctab = pam(mk_uvf_antab, mk_uvf_srctab)(uvf)
print(antab.LUT)
# We post-process the statistics - unmap (baseline, source_id) to ('Anname1Anname2', 'Sourcename')
return SetSource(reduce(aggregate(antab, srctab), drain_dict(get_uvf_stats(uvf)), DefaultDict(Statistics)),
Source(format='uvf', location=path_to_one_uvf))
#############################################################################################################################
#
# SFXC .cor file support
#
#############################################################################################################################
########### 'Parsing' a VEX file
# SFXC output is labelled numerically 0..n-1 for stations and sources.
# The numbers refer to the sorted "def <label>;" values from the VEX file.
# This is because the "def <label>" values are the unique keys used to identify
# what is to be correlated for each scan. (The "side_ID=" and "source_name=" keys
# inside the "def" blocks tell what the /real/ value is but those are not guaranteed
# to be unique and are also not the labels being used to identify in the $SCHED; block
# which stations, sources are observed(correlated) in a scan)
# Return None or upper-case block name
block_start = compose(choice(operator.truth, compose(str.upper, Call('group', 1)), const(None)), re.compile(r'^\s*\$([^;]+);').search)
# Return None or <label> from "def <label>;"
getDefName = compose(choice(operator.truth, Call('group', 1), const(None)), re.compile(r'^\s*def\s+([^; \t\v]+);', re.I).search)
# True/False wether "enddef;" seen
isEndDef = compose(operator.truth, re.compile(r'^\s*enddef\s*;', re.I).search)
# "source_name = <...>;" value or None
getSourceName = compose(choice(operator.truth, Call('group', 1), const(None)), re.compile(r'^\s*source_name\s*=\s*([^ ;]+)', re.I).search)
# Keep state (which accumulation function and list of sources, stations) in here:
vex_state_t = collections.namedtuple('vex_state', ('function', 'stations', 'sources'))
def collect_stations(acc, line):
s_label = getDefName(line)
return acc._replace(stations=acc.stations+[s_label]) if s_label else skip_or_start_block(acc, line)
def wait_for_source_name(acc, line):
# enddef, isSourceName = return to collect_sources, otherwise skip
source_name = getSourceName(line)
if source_name:
return acc._replace(sources=acc.sources+[source_name], function=collect_sources)
return acc._replace(function=collect_sources) if isEndDef(line) else acc
def collect_sources(acc, line):
start_source = getDefName(line)
return acc._replace(function=wait_for_source_name) if start_source else skip_or_start_block(acc, line)
block_parsers = {'STATION': collect_stations, 'SOURCE': collect_sources}
skip_or_start_block = lambda acc, line: acc._replace(function=block_parsers.get(block_start(line), skip_or_start_block)) if block_start(line) else acc
parse_vex = compose(lambda f: reduce(lambda a, l: a.function(a, l), f, vex_state_t(skip_or_start_block, list(), list())), open)
mk_vex_lut = lambda which, unk: compose(partial(WrapLookup, unknown=unk), dict, enumerate, Map(str.capitalize), sorted, GetA(which))
# take list of vex files (should be length==1, code checks that), get first element, strip leading 'vex:', parse it and build tuple of LUTs
mk_vex_luts = compose(pam(mk_vex_lut('stations', "Antenna#{0}".format), mk_vex_lut('sources', "Source#{0}".format)),
parse_vex, partial(re.sub, r'^vex:', ''), GetN(0))
def process_cor(list_of_cors):
# split into lists of vex:<...> and /path/to/corfile
(vex, cors) = partition(list_of_cors, Call('startswith', 'vex:'))
# not exactly one vex file is an error
if len(vex)!=1:
raise RuntimeError("No vex:/path/to/vexfile found" if not vex else "More than one vex:/path/to/vex entries found")
antab, srctab = mk_vex_luts( vex )
print("COR: antab=",antab,"\n srctab=",srctab)
def process_one_cor(acc, path_to_cor):
cor = open(path_to_cor, "rb")
return acc
# reduce all .cor files and set the source attribute on the returned object to tell whence these came
return SetSource(reduce(process_one_cor, cors, DefaultDict(Statistics)), Source(format='cor', location=summarize(cors)))
# take a list of dicts with dict(key=>statistics) and compare them
def report(list_of_stats):
@ -289,9 +399,10 @@ def report(list_of_stats):
choice(operator.truth, compose("and {0[1]} non-common keys in {0[0]} formats".format, pam(len, sum)), const(""))(nExtra))
return nProb
print_stats = compose(const(0), compose(consume, Map(lambda kv: print(kv[0],":\t", kv[1])), Call('items'), GetN(0)))
# This is the main function what to do: execute all statistics-gathering functions
# and if there are > 1 report the differences/errors
main = compose(sys.exit, choice(compose(partial(operator.lt, 1), len), report, compose(const(0), print)), list, Map(Apply))
main = compose(sys.exit, choice(compose(partial(operator.lt, 1), len), report, print_stats), list, Map(Apply))
############################################################################
#
@ -333,6 +444,8 @@ if __name__ == '__main__':
###################################################################################################################
parsert = argparse.ArgumentParser(description="Compare contents of one MeasurementSet and one or more FITS-IDI files")
parsert.add_argument('--ms', action=Process, dest='path', default=list(), required=False, help="Specify the source MeasurementSet", process_fn=process_ms)
parsert.add_argument('--cor', action=Process, dest='path', default=list(), nargs='+', help="SFXC .cor file(s), add one vex:/path/to/vex entry", process_fn=process_cor)
parsert.add_argument('--uvf', action=Process, dest='path', default=None, type=str, required=False, help="The UVFITS file produced from 'ms='", process_fn=process_uvf)
parsert.add_argument('--idi', action=Process, dest='path', default=list(), nargs='*', help="The FITS-IDI file(s) produced from 'ms='", process_fn=process_idi)
parsert.add_argument('--lis', action=Process, dest='path', default=list(), required=False, help="The .lis file that was used to construct 'ms='",
process_fn=compose(lambda lis: SetSource(DefaultDict(Statistics), Source(format='lis', location=lis)), DD("lis-file processing not yet implemented")))

Loading…
Cancel
Save