The vbs tools - vbs_ls, vbs_rm, vbs_fs - for listing, removing and mounting vbs and Mark6 format scattered VLBI recordings on FlexBuff and Mark6 systems
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

282 lines
17 KiB

  1. #!/usr/bin/env python
  2. # simple script to tally up/sort vbs recordings
  3. import os, sys, argparse, re, glob, collections, itertools, operator, functools, pwd, grp, time, stat, struct, fnmatch
  4. version = "$Id$"
  5. flexbuff_pattern = '/mnt/disk*'
  6. mark6_pattern = '/mnt/disks/*/*/data'
  7. # everyone loves function composition
  8. compose = lambda *funcs: lambda x: reduce(lambda v, f: f(v), reversed(funcs), x)
  9. choice = lambda pred, t, f: lambda x: t(x) if pred(x) else f(x)
  10. vbs_chunk = lambda recording: re.compile(r"^"+re.escape(recording)+r"\.(?P<seqno>[0-9]{8})$")
  11. description = """List FlexBuff/Mark6 recording details, much like ls(1), allowing for filtering and/or sorting by size and/or time. In addition %(prog)s can accumulate totals (sizes) per pattern, e.g. to compute total size per experiment. By default vbs_ls searches the FlexBuff mountpoints for recordings. Using the '-6' flag and/or the '-R' argument (see below) the search path may be altered. Shell-style wildcards on the -R arguments are supported.
  12. """
  13. # SimonC: "KeyError: 'getpwuid(): uid not found: 5000' If the user id doesn't exist"
  14. def lookup_or_bust(resolver):
  15. def lookup(num):
  16. try: return resolver(num)[0]
  17. except: return num
  18. return lookup
  19. getpwuid = lookup_or_bust(pwd.getpwuid)
  20. getgrgid = lookup_or_bust(grp.getgrgid)
  21. # Keep per vbs scattered recording totals in this object
  22. class recording_data(object):
  23. __slots__ = ['st_mtime', 'st_size', 'st_uid', 'st_gid', 'st_mode', 'rec_name', 'chunks']
  24. def __init__(self):
  25. self.st_mtime = self.st_size = self.st_mode = 0
  26. self.st_uid = self.st_gid = self.rec_name = None
  27. self.chunks = collections.defaultdict(lambda: collections.defaultdict(int))
  28. def update(self, statres, recname, seqno):
  29. if isinstance(seqno, int):
  30. # int seqno comes from flexbuff thus must set stat.S_IFDIR
  31. self.chunks[recname][seqno] += 1
  32. self.st_mode |= stat.S_IFDIR
  33. else:
  34. # must be Mark6, i.e. file
  35. self.st_mode |= stat.S_IFREG
  36. # only update time/size if there's something to update from
  37. if statres is None:
  38. return
  39. self.st_mtime = max(self.st_mtime, statres.st_mtime)
  40. self.st_size += statres.st_size
  41. # decode uid, gid and permissions only once; the list is [(bit, 'char'), ...]
  42. if self.st_uid is not None:
  43. return
  44. self.st_uid = getpwuid(statres.st_uid)
  45. self.st_gid = getgrgid(statres.st_gid)
  46. # Only copy the permission bits &cet; we keep track of file/dir ourselves
  47. self.st_mode = self.st_mode | (statres.st_mode & ~(stat.S_IFDIR|stat.S_IFREG))
  48. # update name and return self - allows for convenient "map(set_name, ...)
  49. def set_name(self, nm):
  50. self.rec_name = nm
  51. return self
  52. # analyze 'problems' with the collected chunks:
  53. # #-of-chunks indexed != max seqnr - 1
  54. # this means either missing or repeated data
  55. # Note: nChunk can never be <=0 because this object only
  56. # exists if we found at least one chunk!
  57. _types = {stat.S_IFDIR: '/', stat.S_IFREG: '', stat.S_IFDIR|stat.S_IFREG: '+'}
  58. def suffix(self, showtype=None):
  59. s = recording_data._types.get(stat.S_IFMT(self.st_mode), '?') if showtype else ''
  60. # size and mtime == 0 => still default, i.e. not updated, i.e. must have been
  61. # an aggregate i.e. loss/repeat are meaningless
  62. if self.st_mtime and self.st_size and self.chunks:
  63. # get nExpect, nFound and nRepeat per recording and then sum them
  64. (nExpect, nFound, nRepeat) = map(sum, zip(*map(lambda c: (max(c.keys())+1, len(c), sum(filter(lambda n: n>1, c.values()))),
  65. self.chunks.values())))
  66. s += " {0} duplicate chunks".format(nRepeat) if nRepeat else ''
  67. if nExpect and nFound!=nExpect:
  68. s += " {0:4.2f}% recovered [{1}/{2}]".format(float(nFound)/nExpect*100, nFound, nExpect)
  69. return s
  70. is_dir = lambda x: os.path.isdir(x) and os.access(x, os.X_OK|os.R_OK)
  71. is_file = lambda x: os.path.isfile(x) and os.access(x, os.R_OK)
  72. # Mk6 file header layout
  73. # uint32_t sync_word; // MARK6_SG_SYNC_WORD = 0xfeed6666
  74. # int32_t version; // defines format of file
  75. # int32_t block_size; // length of blocks including header (bytes)
  76. # int32_t packet_format; // format of data packets, enumerated below
  77. # int32_t packet_size; // length of packets (bytes)
  78. #
  79. # files are possible Mark6 recordings, if they're longer than the header size
  80. # and they have the magic Mk6 header [0xfeed6666] and we only do version 2 with
  81. # a block size > 8: in version 2 the block header is 8 bytes long.
  82. mk_obj = lambda **kwargs: type('', (), kwargs)()
  83. mk6_hdr_f = '<5I'
  84. mk6_hdr_sz = struct.calcsize(mk6_hdr_f)
  85. mk6_hdr = compose(choice(lambda x: len(x)==mk6_hdr_sz,
  86. compose(lambda y: y if y[0]==0xfeed6666 and y[1]==2 and y[2]>8 else None,
  87. functools.partial(struct.unpack, mk6_hdr_f)),
  88. lambda z: None),
  89. operator.methodcaller('read', mk6_hdr_sz), open)
  90. # Only index recordings matching the pattern
  91. def mk_filter_idx(pattern, accukey_fn, getstat_fn, do_details):
  92. def index_mountpoint(acc, mp):
  93. # Split the contents into files/directories that match a pattern we're looking for
  94. (files, dirs) = ([], [])
  95. for (nm, p) in map(lambda p: (p, os.path.join(mp, p)), filter(pattern, os.listdir(mp))):
  96. files.append((nm, accukey_fn(nm), p, os.stat(p))) if is_file(p) else \
  97. (dirs.append((nm, accukey_fn(nm), p)) if is_dir(p) else None)
  98. # Directories under mountpoints are possible vbs recordings,
  99. for (maybe_rec, key, path) in dirs:
  100. # figure out if there are entries of the form "maybe_rec/maybe_rec.XXXXXXXX"
  101. # inspect all chunks of this recording only if necessary
  102. for (mo, fpath) in filter(compose(is_file, operator.itemgetter(1)),
  103. map(lambda mo: (mo, os.path.join(path, mo.string)),
  104. filter(operator.truth, map(vbs_chunk(maybe_rec).match, os.listdir(path))))):
  105. # found a flexbuff recording chunk. Add if details or it was not already found as flexbuff format
  106. if not (do_details or key not in acc or (acc[key].st_mode & stat.S_IFDIR)==0):
  107. break
  108. acc[ key ].update( getstat_fn(fpath), maybe_rec, int(mo.group(1)) )
  109. # Filter out the real Mark6 recordings [hdr==None implies file is not a dplane v.2 recording]
  110. for (rec, key, path, st, hdr) in filter(operator.itemgetter(4), map(lambda tup: tup+(mk6_hdr(tup[2]),), files)):
  111. # found a mark6 recording chunk. Add if details or it was not already found as mark6 format
  112. if not (do_details or key not in acc or (acc[key].st_mode & stat.S_IFREG)==0):
  113. break
  114. # Ok it's Mk6 file! Adjust the size by solving this:
  115. # file_size = hdr_size + N * hdr.blocksize [with hdr.blocksize = block hdr size + payload size]
  116. # need: N*payload_size , estimate N from: N = (file_size - hdr_size) / hdr.blocksize
  117. # create a new 'stat'-like object; the attributes of the os.stat() returned obj are read-only
  118. st = mk_obj(st_size=((st.st_size - mk6_hdr_sz) / hdr[2]) * (hdr[2] - 8), \
  119. st_mtime=st.st_mtime, st_mode=st.st_mode, st_uid=st.st_uid, st_gid=st.st_gid)
  120. acc[ key ].update( st , rec, None )
  121. return acc
  122. return index_mountpoint
  123. def mk_filter_fn(patterns):
  124. def maybe_pass(recording_name):
  125. for p in patterns:
  126. if fnmatch.fnmatch(recording_name, p):
  127. return p
  128. return None
  129. return maybe_pass
  130. ########################################################################
  131. #
  132. # Output formatting details
  133. #
  134. ########################################################################
  135. sort_by_ls_order = lambda attr: compose(operator.neg, operator.attrgetter(attr))
  136. jeez_its_old_fmt = functools.partial(time.strftime, "%b %d %Y")
  137. recent_time_fmt = functools.partial(time.strftime, "%b %d %H:%M")
  138. # how to represent the size of the file
  139. size_in_bytes = functools.partial(str.format, "{0:14d}")
  140. # return it in kMGTPE, whichever is appropriate
  141. def size_human_readable(sz):
  142. vu = list(itertools.dropwhile(lambda vu: vu[0]>=1024,
  143. itertools.imap(lambda x, c=(1024**x for x in itertools.count(1)): (float(sz)/next(c), x), 'kMGTPE')))
  144. return "{0:> 7.2f}{1}".format(*vu[0]) if vu else "{0: 7d}".format(sz)
  145. class Mode:
  146. # types: '-' for file, 'd' for directory, '+' for both
  147. _types = {stat.S_IFREG: '-', stat.S_IFDIR: 'd', stat.S_IFREG|stat.S_IFDIR: '+'}
  148. # pair the permission bits with the permission character
  149. _permbits = zip( [ stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR,
  150. stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP,
  151. stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH],
  152. itertools.cycle(['r', 'w', 'x']) )
  153. # inspect all the permission bits and return the character string describing them
  154. @staticmethod
  155. def fmt(m):
  156. return Mode._types.get(stat.S_IFMT(m), '?') + ''.join(map(lambda x: x[1] if (m&x[0])==x[0] else '-', Mode._permbits))
  157. mk_short_fmt = lambda tp: lambda obj: obj.rec_name + obj.suffix(tp)
  158. # according to ls(1): if time more than half year off (future OR past) then print year in stead of time
  159. max_age = 356 * 86400 / 2
  160. def long_fmt(obj):
  161. time_fmt = jeez_its_old_fmt if abs(time.time() - obj.st_mtime)>max_age else recent_time_fmt
  162. return "{mode} {uid:<9} {gid:<9} {sz} {mtime} {name}{suffix}".format( \
  163. uid=obj.st_uid, gid=obj.st_gid, sz=size_fn(obj.st_size), mtime=time_fmt(time.gmtime(obj.st_mtime)), \
  164. name=obj.rec_name, mode=Mode.fmt(obj.st_mode), suffix=obj.suffix() \
  165. )
  166. ###########################################################################
  167. #
  168. # Command line parsing
  169. #
  170. ###########################################################################
  171. # 'append_list' action: a helper to append a list of items to a variable
  172. # i.e. to support multiple '-R pattern1 -R pattern2 ...' options
  173. # Note that we only accept 'nargs' values that will actually result inna list:
  174. # nargs = '+' or '*', int >= 0, argparse.REMAINDER
  175. class AppendList(argparse.Action):
  176. def __init__(self, option_strings, dest, nargs=None, **kwargs):
  177. super(AppendList, self).__init__(option_strings, dest, nargs=nargs, **kwargs)
  178. if not ((isinstance(nargs, str) and len(nargs)==1 and nargs[0] in '+*') or
  179. (isinstance(nargs, int) and nargs>=0) or
  180. (nargs == argparse.REMAINDER) ):
  181. raise RuntimeError, "nargs must be '+', '*', an integer >= 0 or argparse.REMAINDER for this action"
  182. def __call__(self, p, ns, values, option_string):
  183. setattr(ns, self.dest, values) if not hasattr(ns, self.dest) else setattr(ns, self.dest, getattr(ns, self.dest)+values)
  184. class PrintHelp(argparse.Action):
  185. def __call__(self, the_parsert, *args):
  186. the_parsert.print_help() or sys.exit( 0 )
  187. parsert = argparse.ArgumentParser(description=description, add_help=False)
  188. parsert.add_argument('--help', nargs=0, action=PrintHelp, help="show this help message and exit succesfully")
  189. parsert.add_argument('-6', dest='rootDirs', action='append_const', default=[], const=mark6_pattern,
  190. help="Look for recordings in Mark6 mountpoints")
  191. parsert.add_argument('-v', dest='rootDirs', action='append_const', default=[], const=flexbuff_pattern,
  192. help="Look for recordings in FlexBuff mountpoints (default)")
  193. parsert.add_argument('-F', dest='show_type', action="store_true", default=None,
  194. help="Show type of recording in short format using a trailing character: '/' = directory = FlexBuff recording, '' (nothing) = file = Mark6 recording, '+' if the recording was found in both formats and '?' for unknown type")
  195. parsert.add_argument('-h', dest='human_readable_fmt', action='store_true', default=False,
  196. help="Display file sizes in human readable format using base-1024 unit prefixes kMGTPE")
  197. parsert.add_argument('-i', dest='name_sort', action='store_const', default=operator.attrgetter('rec_name'),
  198. const=compose(str.lower, operator.attrgetter('rec_name')), help="Ignore case when sorting by name")
  199. parsert.add_argument('-l', dest='long_format', action='store_true', default=False,
  200. help="(The lowercase letter ``ell''. List in long format")
  201. parsert.add_argument('-r', dest='reverse', action='store_true', default=False,
  202. help="Reverse the order of the sort to get reverse lexicographical order or the oldest entries first (or largest files last, if combined with sort by size)")
  203. parsert.add_argument("-R", nargs=1, action=AppendList, dest='rootDirs',
  204. help="Append directories matching the pattern to the vbs_ls search path. Shell-style wildcards ('*?') are supported. This option may be present multiple times to add multiple patterns" )
  205. parsert.add_argument('-S', dest='sort_fields', action='append_const', default=[], const=sort_by_ls_order('st_size'),
  206. help="Sort recordings by size")
  207. parsert.add_argument('-t', dest='sort_fields', action='append_const', default=[], const=sort_by_ls_order('st_mtime'),
  208. help="Sort by time modified (most recently modified first) before sorting the operands by lexicographical order.")
  209. parsert.add_argument('-T', dest='totals', action='store_true', default=False,
  210. help="Compute size total(s) per pattern. Mostly useful if used with ``-l''. If no patterns are specified accumulates by experiment, assuming scans are named <exp>_<station>_<scan>.")
  211. parsert.add_argument('--version', action='version', version=version, help="Print current version and exit succesfully")
  212. parsert.add_argument("patterns", nargs='*',
  213. help="Only show recordings matching these pattern(s). Shell-style wildcards ('*?') are supported" )
  214. # deal with command line
  215. userinput = parsert.parse_args()
  216. #print userinput
  217. ####################################################################################
  218. # Analyze what the user actually wanted
  219. ####################################################################################
  220. # default to short format
  221. print_fn = long_fmt if userinput.long_format else mk_short_fmt(userinput.show_type)
  222. size_fn = size_human_readable if userinput.human_readable_fmt else size_in_bytes
  223. # Need to get full status of recordings if long_output or sort by time/size
  224. details = userinput.long_format or userinput.sort_fields
  225. stat_fn = os.stat if details else lambda x: None
  226. # Create the sorting function; lexicographical key always last
  227. sort_fn = compose(tuple, lambda obj: [f(obj) for f in userinput.sort_fields+[userinput.name_sort]])
  228. # Create the filter+accumulation functions. If 'totals' are requested we accumulate per pattern
  229. # or by experiment (if not patterns given), otherwise we 'accumulate' by individual recording
  230. filter_fn = mk_filter_fn(userinput.patterns) if userinput.patterns else functools.partial(re.compile(r'^([^_]+)_[^_]+_[^_]+$').sub, r'\1')
  231. accu_fn = filter_fn if userinput.totals else lambda x: x
  232. # 1.) Get the list of mountpoints
  233. mountpoints = reduce(lambda a, pattern: a.update(filter(lambda p: os.path.isdir(p) and os.access(p, os.X_OK|os.R_OK),
  234. glob.glob(pattern))) or a,
  235. userinput.rootDirs if userinput.rootDirs else [flexbuff_pattern], set())
  236. # 2.) [Accumulate] the [filtered] list of recordings
  237. recordings = reduce(mk_filter_idx(filter_fn, accu_fn, stat_fn, details), mountpoints, collections.defaultdict(recording_data))
  238. # 3.) Transform dict of rec_name => accumulations into List [ recording, ... ], filtering out recording with name 'None'
  239. recordings = map(lambda kv: kv[1].set_name(kv[0]), filter(lambda kv: kv[0] is not None, recordings.iteritems()))
  240. # 4a.) If long format, print one-line summary, much like ls(1)
  241. if recordings and userinput.long_format:
  242. (nChunk, totSz) = map(sum, zip(*map(lambda r: (sum(map(lambda c: sum(c.values()), r.chunks.values())), r.st_size), recordings)))
  243. print "Found {0} recordings in {1} chunks, {2}".format(len(recordings), nChunk, size_human_readable(totSz))
  244. # 4b.) Print output
  245. print "\n".join(map(print_fn, sorted(recordings, key=sort_fn, reverse=userinput.reverse)))