Browse Source

Import the multi-part IDI file checker

This tool reports data loss between FITS-IDI file chunks, a signature of
erroneous handling of mixed-bandwidth MS->IDI conversion
haavee 2 years ago
1 changed files with 79 additions and 0 deletions
  1. +79

+ 79
- 0 View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
# '.../*/fits/*.IDI*' (escape the '*' to prevent shell expansion!)
# checks if there is any data missing between successive chunks of the same FITS file
# this could happen with old tConvert (pre Jun 2018) writing mixed-bandwidth correlation data.
# the problem is:
# - we keep FITS files to ~2GB per piece
# - old tConvert estimates how many MS rows will fit in such a piece and
# breaks up the MS into chunks of n_row sub-MSs
# - the estimate is only valid if all baselines have the same number of rows, or,
# in MS speak, spectral windows (IFs in AIPS/FITS-IDI speak)
# - with mixed-bandwidth correlation the previous assumption is not true anymore
# - there could be more visibilities in a sub-MS than tConvert estimates
# but since there is no possibility of growing the FITS-IDI file, the rows
# not written to the FITS-IDI file will be silently lost
# This code does a simple check to compare end time of one chunk and start time of the next.
# If more than a full integration's worth of data is lost, there will be a gap in the time
# between the chunks.
import sys, re, collections, glob, os, functools, operator, itertools,, numpy

# everybody should love themselves some function composition. really.
compose = lambda *fns : lambda x: functools.reduce(lambda acc, f: f(acc), reversed(fns), x)
ylppa = lambda *fns : lambda x: tuple(map(lambda fn: fn(x), fns)) # map(x, [f0, f1, f2, ...]) => [f0(x), f1(x), f2(x), ...]
identity = lambda x : x
choice = lambda p, t, f : lambda x: t(x) if p(x) else f(x)
const = lambda c : lambda x: c
method = lambda f : lambda *args: f(*args)
Map = lambda fn : functools.partial(map, fn)
GetN = lambda n : operator.itemgetter(n)
GetA = lambda a : operator.attrgetter(a)
Group = lambda n : operator.methodcaller('group', n)
GroupBy = lambda keyfn : functools.partial(itertools.groupby, key=keyfn)
Sort = lambda keyfn : functools.partial(sorted, key=keyfn)
Filter = lambda pred : functools.partial(filter, pred)
Reduce = lambda accfn : functools.partial(functools.reduce, accfn)
Drain = lambda iterable : functools.reduce(lambda acc, x: acc, iterable, None) # drain an iterable for its side effects
D = lambda x : print(x) or x
Obj = lambda **kwargs : type('', (), kwargs)()

# Group all fitsfiles by their stem and sort by their chunk number
# ['eg098a_0_1'] => ['/path/to/eg098a_0_1.IDI1', '/path/to/eg098a_0_1.IDI2(.*)']

# transform file name into (<stem>, <full path>, <sequence number>) (sometimes the name is "...IDI<digits>.POLCONVERT" ...)
getStemAndSeqNo = compose( ylppa(compose(Reduce(operator.add), ylppa(Group(1), compose(choice(operator.truth, identity, const("")), Group(4)))),
compose(int, Group(2))),
re.compile(r"([^/]+)\.IDI([0-9]+)(\.(.+))?$").search )

# transform fitsfilename into numpy array of time stamps
getTime = compose(numpy.array, sum, ylppa(GetA('DATE'), GetA('TIME')), GetA('data'), GetN('UV_DATA'), functools.partial(, memmap=True, mode='readonly'))

def analFITS(fn, n):
timeStamps = getTime(fn)
# do analysis. Note that sometimes we get 1.xxxE-153 in stead of pure zero
nonZero = timeStamps[numpy.where( timeStamps>1e-6 )]
return Obj(startTime=min(nonZero), endTime=max(nonZero), nZero=len(timeStamps)-len(nonZero), fileName=fn+" [of {0}]".format(n), __bool__=lambda self: bool(self.nZero>0), __repr__=method("{0.fileName}: start={0.startTime} end={0.endTime} nZero={0.nZero}".format))

def accuStats(acc, nextFF):
if acc.prevEnd is not None:
dt = (nextFF.startTime - acc.prevEnd) * 86400
(acc.loss, acc.gain) = (acc.loss+dt, acc.gain) if dt>0 else (acc.loss, acc.gain-dt)
acc.nZero += nextFF.nZero
acc.prevEnd = nextFF.endTime
return acc

anal = ylppa(GetN(0),
compose(lambda l: Reduce(accuStats)(l, Obj(loss=0, gain=0, nZero=0, prevEnd=None, __bool__=lambda self: bool(self.loss>0 or self.gain>0 or self.nZero>0), __repr__=method("loss={0.loss}s gain={0.gain}s nZero={0.nZero}".format))),
compose(Map(choice(operator.truth, D, identity)), functools.partial(itertools.starmap, analFITS), lambda t: zip(*t), ylppa(GetN(1), compose(itertools.repeat, len, GetN(1)))) ))

if __name__ == '__main__':
compose(Drain, Map(compose(D, " ".join, Map(str))), Filter(compose(operator.truth, GetN(1))),
Map(compose(anal, ylppa(GetN(0), compose(list, compose(Map(GetN(1)), GetN(1)))))),
Map(ylppa(GetN(0), GetN(1))),
Sort(ylppa(GetN(0), GetN(2))),
compose(set, Reduce(operator.add), Map(glob.glob))(sys.argv[1:])