A set of tools to verify the operation of the j2ms2 and tConvert programs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

80 lines
4.9 KiB

  1. #!/usr/bin/env python3
  2. #######################/opt/anaconda3/bin/python3
  3. # check-multipart-fits.py '.../*/fits/*.IDI*' (escape the '*' to prevent shell expansion!)
  4. #
  5. # checks if there is any data missing between successive chunks of the same FITS file
  6. # this could happen with old tConvert (pre Jun 2018) writing mixed-bandwidth correlation data.
  7. # the problem is:
  8. # - we keep FITS files to ~2GB per piece
  9. # - old tConvert estimates how many MS rows will fit in such a piece and
  10. # breaks up the MS into chunks of n_row sub-MSs
  11. # - the estimate is only valid if all baselines have the same number of rows, or,
  12. # in MS speak, spectral windows (IFs in AIPS/FITS-IDI speak)
  13. # - with mixed-bandwidth correlation the previous assumption is not true anymore
  14. # - there could be more visibilities in a sub-MS than tConvert estimates
  15. # but since there is no possibility of growing the FITS-IDI file, the rows
  16. # not written to the FITS-IDI file will be silently lost
  17. # This code does a simple check to compare end time of one chunk and start time of the next.
  18. # If more than a full integration's worth of data is lost, there will be a gap in the time
  19. # between the chunks.
  20. import sys, re, collections, glob, os, functools, operator, itertools, astropy.io.fits, numpy
  21. # everybody should love themselves some function composition. really.
  22. compose = lambda *fns : lambda x: functools.reduce(lambda acc, f: f(acc), reversed(fns), x)
  23. ylppa = lambda *fns : lambda x: tuple(map(lambda fn: fn(x), fns)) # map(x, [f0, f1, f2, ...]) => [f0(x), f1(x), f2(x), ...]
  24. identity = lambda x : x
  25. choice = lambda p, t, f : lambda x: t(x) if p(x) else f(x)
  26. const = lambda c : lambda x: c
  27. method = lambda f : lambda *args: f(*args)
  28. Map = lambda fn : functools.partial(map, fn)
  29. GetN = lambda n : operator.itemgetter(n)
  30. GetA = lambda a : operator.attrgetter(a)
  31. Group = lambda n : operator.methodcaller('group', n)
  32. GroupBy = lambda keyfn : functools.partial(itertools.groupby, key=keyfn)
  33. Sort = lambda keyfn : functools.partial(sorted, key=keyfn)
  34. Filter = lambda pred : functools.partial(filter, pred)
  35. Reduce = lambda accfn : functools.partial(functools.reduce, accfn)
  36. Drain = lambda iterable : functools.reduce(lambda acc, x: acc, iterable, None) # drain an iterable for its side effects
  37. D = lambda x : print(x) or x
  38. Obj = lambda **kwargs : type('', (), kwargs)()
  39. # Group all fitsfiles by their stem and sort by their chunk number
  40. # ['eg098a_0_1'] => ['/path/to/eg098a_0_1.IDI1', '/path/to/eg098a_0_1.IDI2(.*)']
  41. # transform file name into (<stem>, <full path>, <sequence number>) (sometimes the name is "...IDI<digits>.POLCONVERT" ...)
  42. getStemAndSeqNo = compose( ylppa(compose(Reduce(operator.add), ylppa(Group(1), compose(choice(operator.truth, identity, const("")), Group(4)))),
  43. operator.attrgetter('string'),
  44. compose(int, Group(2))),
  45. re.compile(r"([^/]+)\.IDI([0-9]+)(\.(.+))?$").search )
  46. # transform fitsfilename into numpy array of time stamps
  47. getTime = compose(numpy.array, sum, ylppa(GetA('DATE'), GetA('TIME')), GetA('data'), GetN('UV_DATA'), functools.partial(astropy.io.fits.open, memmap=True, mode='readonly'))
  48. def analFITS(fn, n):
  49. timeStamps = getTime(fn)
  50. # do analysis. Note that sometimes we get 1.xxxE-153 in stead of pure zero
  51. nonZero = timeStamps[numpy.where( timeStamps>1e-6 )]
  52. return Obj(startTime=min(nonZero), endTime=max(nonZero), nZero=len(timeStamps)-len(nonZero), fileName=fn+" [of {0}]".format(n), __bool__=lambda self: bool(self.nZero>0), __repr__=method("{0.fileName}: start={0.startTime} end={0.endTime} nZero={0.nZero}".format))
  53. def accuStats(acc, nextFF):
  54. if acc.prevEnd is not None:
  55. dt = (nextFF.startTime - acc.prevEnd) * 86400
  56. (acc.loss, acc.gain) = (acc.loss+dt, acc.gain) if dt>0 else (acc.loss, acc.gain-dt)
  57. acc.nZero += nextFF.nZero
  58. acc.prevEnd = nextFF.endTime
  59. return acc
  60. anal = ylppa(GetN(0),
  61. compose(lambda l: Reduce(accuStats)(l, Obj(loss=0, gain=0, nZero=0, prevEnd=None, __bool__=lambda self: bool(self.loss>0 or self.gain>0 or self.nZero>0), __repr__=method("loss={0.loss}s gain={0.gain}s nZero={0.nZero}".format))),
  62. compose(Map(choice(operator.truth, D, identity)), functools.partial(itertools.starmap, analFITS), lambda t: zip(*t), ylppa(GetN(1), compose(itertools.repeat, len, GetN(1)))) ))
  63. if __name__ == '__main__':
  64. compose(Drain, Map(compose(D, " ".join, Map(str))), Filter(compose(operator.truth, GetN(1))),
  65. Map(compose(anal, ylppa(GetN(0), compose(list, compose(Map(GetN(1)), GetN(1)))))),
  66. GroupBy(GetN(0)),
  67. Map(ylppa(GetN(0), GetN(1))),
  68. Sort(ylppa(GetN(0), GetN(2))),
  69. Map(getStemAndSeqNo))(
  70. compose(set, Reduce(operator.add), Map(glob.glob))(sys.argv[1:])
  71. )