The vbs tools - vbs_ls, vbs_rm, vbs_fs - for listing, removing and mounting vbs and Mark6 format scattered VLBI recordings on FlexBuff and Mark6 systems
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

2109 lines
80 KiB

  1. // FUSE file system for vlbi_streamer style recordings
  2. #define FUSE_USE_VERSION 26
  3. #include <fuse.h>
  4. // Project includes
  5. #include <auto_array.h>
  6. #include <evlbidebug.h>
  7. #include <regular_expression.h>
  8. #include <dosyscall.h>
  9. #include <directory_helper_templates.h>
  10. // Standardized C++ headers
  11. #include <iostream>
  12. #include <map>
  13. #include <set>
  14. #include <list>
  15. #include <queue>
  16. #include <string>
  17. #include <exception>
  18. #include <stdexcept>
  19. #include <iterator>
  20. #include <limits>
  21. #include <algorithm>
  22. #include <cstddef>
  23. #include <cerrno>
  24. #include <cstring>
  25. #include <csignal>
  26. #include <cstdlib>
  27. #include <climits>
  28. // Old-style *NIX headers
  29. #include <glob.h>
  30. #include <fnmatch.h>
  31. #include <unistd.h>
  32. #include <dirent.h>
  33. #include <sys/types.h>
  34. #include <sys/mman.h>
  35. #include <sys/resource.h>
  36. using namespace std;
  37. DECLARE_EZEXCEPT(vbs_except)
  38. DEFINE_EZEXCEPT(vbs_except)
  39. typedef std::vector<std::string> patternlist_type;
  40. typedef std::vector<Regular_Expression> matchlist_type;
  41. string escape(string const& s);
  42. // Get a handle for invalid file descriptor that isn't -1.
  43. const int invalidFileDescriptor = std::numeric_limits<int>::min();
  44. const size_t pgSize( ::getpagesize() );
  45. // prototype now, impl follows below
  46. int vbs_pthread_create(pthread_t*, void*(*)(void*), void *arg);
  47. struct mk6_file_header;
  48. /////////////////////////////////////////////////////////////////////////////////////////////
  49. //
  50. //
  51. // This is what it's all about: mapping recording name to a Mk6/VBS
  52. // scattered recording!
  53. //
  54. //
  55. //////////////////////////////////////////////////////////////////////////////////////////////
  56. typedef set<string> paths_type;
  57. typedef list<pthread_t> threadlist_type;
  58. ///////////////////////////////////////////////////////////////
  59. //
  60. // Update metadata from src => dst
  61. // in a specific manner
  62. //
  63. ///////////////////////////////////////////////////////////////
  64. static const mode_t permissionBits = (S_IRWXU | S_IRWXG | S_IRWXO | S_ISUID | S_ISGID | S_ISVTX);
  65. ostream& operator<<(ostream& os, struct stat const& st) {
  66. return os << "[st_mode=" << st.st_mode << " uid=" << st.st_uid << " gid=" << st.st_gid <<
  67. " sz=" << st.st_size << " st_mtime=" << st.st_mtime << "]";
  68. }
  69. void update(struct stat& dst, struct stat const& src) {
  70. // dst's st_mtime should be maximum of src/dst's st_mtime
  71. dst.st_mtime = std::max(dst.st_mtime, src.st_mtime);
  72. // If mode already filled in don't do that again
  73. if( dst.st_mode )
  74. return;
  75. // Set mode to file + current permissions, owner and group
  76. dst.st_mode = S_IFREG | (src.st_mode & permissionBits);
  77. dst.st_uid = src.st_uid;
  78. dst.st_gid = src.st_gid;
  79. }
  80. ///////////////////////////////////////////////////////////////
  81. //
  82. // Simple utility: scoped mutex lock
  83. // Always comes in handy
  84. //
  85. ///////////////////////////////////////////////////////////////
  86. struct scoped_lock_type {
  87. scoped_lock_type(pthread_mutex_t* mtx):
  88. __m_slt_mutex_ptr( mtx )
  89. { ::pthread_mutex_lock(__m_slt_mutex_ptr); }
  90. ~scoped_lock_type() {
  91. ::pthread_mutex_unlock(__m_slt_mutex_ptr);
  92. }
  93. pthread_mutex_t* __m_slt_mutex_ptr;
  94. };
  95. struct scoped_readlock_type {
  96. scoped_readlock_type(pthread_rwlock_t* rw):
  97. __m_rwlock_ptr( rw )
  98. { ::pthread_rwlock_rdlock( __m_rwlock_ptr ); }
  99. ~scoped_readlock_type() {
  100. ::pthread_rwlock_unlock( __m_rwlock_ptr );
  101. }
  102. pthread_rwlock_t* __m_rwlock_ptr;
  103. };
  104. struct scoped_writelock_type {
  105. scoped_writelock_type(pthread_rwlock_t* rw):
  106. __m_rwlock_ptr( rw )
  107. { ::pthread_rwlock_wrlock( __m_rwlock_ptr ); }
  108. ~scoped_writelock_type() {
  109. ::pthread_rwlock_unlock( __m_rwlock_ptr );
  110. }
  111. pthread_rwlock_t* __m_rwlock_ptr;
  112. };
  113. ///////////////////////////////////////////////////////////////////////////////
  114. // Below code copied from Jan Wagner's mark6sg FUSE file system
  115. // which is copied from Roger Cappallo's d-plane software ...
  116. //
  117. // [HV: Why are block numbers & sizes ints and not unsigned ints?
  118. // We will have to follow the convention or else we may construct invalid
  119. // files. But why????
  120. // Using unsigned:
  121. // - gives you another factor of two in block numbers ....
  122. // - how can block numbers go *negative* in the first place?!?!!
  123. // - likewise for packet sizes: how could they ever be
  124. // _negative_???
  125. //
  126. // Also made the ints of actual 32-bit size; you can't really
  127. // leave it up to the compiler to decide how long your int really
  128. // is if you want portable, 64-bit clean code]
  129. // Internal structs
  130. //
  131. // There is apparently no documentation on the Mark6 non-RAID recording format(?).
  132. // From Mark6 source code one can see that the several files associated with a
  133. // single scan all look like this:
  134. //
  135. // [file header]
  136. // [block a header] [block a data (~10MB)]
  137. // [block b header] [block b data (~10MB)]
  138. // [block c header] [block c data (~10MB)]
  139. // ...
  140. //
  141. // Block numbers are increasing. They will have gaps (e.g, a=0, b=16, c=35, ...).
  142. // The 'missing' blocks (1,2,3..,15,17,18,..,34,...) of one file are found in one of
  143. // the other files. Blocks can have different data sizes.
  144. //
  145. // Because 10GbE recording in Mark6 software is not done Round-Robin across files,
  146. // the order of blocks between files is somewhat random.
  147. //
  148. // The below structures are adopted from the source code of the
  149. // Mark6 program 'd-plane' version 1.12:
  150. #define MARK6_SG_SYNC_WORD 0xfeed6666
  151. struct mk6_file_header { // file header - one per file
  152. // HV: Stick the enum in the mk6_file_header struct such that
  153. // the global namespace doesn't get clobbered.
  154. enum packet_formats {
  155. VDIF,
  156. MK5B,
  157. UNKNOWN_FORMAT
  158. };
  159. uint32_t sync_word; // MARK6_SG_SYNC_WORD
  160. int32_t version; // defines format of file
  161. int32_t block_size; // length of blocks including header (bytes)
  162. int32_t packet_format; // format of data packets, enumerated below
  163. int32_t packet_size; // length of packets (bytes)
  164. };
  165. struct mk6_wb_header_v2 { // write block header - version 2
  166. int32_t blocknum; // block number, starting at 0
  167. int32_t wb_size; // same as block_size, or occasionally shorter
  168. };
  169. ///////////////////////////////////////////////////
  170. //
  171. // A filedescriptor cache
  172. //
  173. // Mark6 scatters chunks over a number of files
  174. // so many chunks share the same file descriptor.
  175. // After opening a recording then the path opener
  176. // is called for each path of the recording.
  177. //
  178. // The Mk6 opener pre-opens all files for reading
  179. // and puts them in the cache. The vbs opener doesn't.
  180. //
  181. // Then, if a chunk needs to be read from disk,
  182. // the reader checks the cache for the fd. If found,
  183. // it uses that file descriptor. Otherwise it temporarily
  184. // opens the chunks file path, reads data and closes
  185. // it again.
  186. //
  187. ///////////////////////////////////////////////////
  188. typedef map<string, int> fdcache_type;
  189. typedef int(*path_opener_fn)(string const& p);
  190. fdcache_type fdcache;
  191. pthread_rwlock_t fdcache_lock = PTHREAD_RWLOCK_INITIALIZER;
  192. // Pointer to a path opener function.
  193. typedef int (*openpath_fn)(string const& p, time_t& tmHint, off_t& szHint);
  194. typedef int (*closepath_fn)(string const& p);
  195. // Mk6 path opener puts opened file in the fdcache
  196. int mk6_path_opener(string const& p, time_t&, off_t&) {
  197. int fd = ::open(p.c_str(), O_RDONLY );
  198. if( fd<0 ) {
  199. DEBUG(-1, "mk6_path_opener: failed to open '" << p << "' - " << ::strerror(errno) << endl);
  200. return -1;
  201. }
  202. // And put it in the cache
  203. scoped_writelock_type swl( &fdcache_lock );
  204. return fdcache.insert(make_pair(p, fd)).second==true ? 0 : -1;
  205. }
  206. int mk6_path_closer(string const& p) {
  207. scoped_writelock_type swl( &fdcache_lock );
  208. fdcache_type::iterator fdptr = fdcache.find(p);
  209. if( fdptr==fdcache.end() ) {
  210. DEBUG(-1, "mk6_path_closer: path '" << p << "' not in cache?!" << endl);
  211. return -1;
  212. }
  213. ::close( fdptr->second );
  214. fdcache.erase( fdptr );
  215. return 0;
  216. }
  217. // vbs path opener doesn't do anything
  218. int vbs_path_opener(string const&, time_t&, off_t& ) {
  219. return 0;
  220. }
  221. int vbs_path_closer(string const& ) {
  222. return 0;
  223. }
  224. ///////////////////////////////////////////////////
  225. //
  226. // A mountpoint monitor's private parts
  227. //
  228. ///////////////////////////////////////////////////
  229. struct filechunk_type;
  230. typedef set<int> observers_type;
  231. typedef queue<filechunk_type*> workqueue_type;
  232. struct mphandler_data {
  233. bool stop;
  234. bool indexed;
  235. pthread_t threadId;
  236. const string mountPoint;
  237. paths_type matchList;
  238. workqueue_type workqueue;
  239. pthread_cond_t* condition;
  240. pthread_mutex_t* mutex;
  241. mphandler_data(string const& mp, paths_type const& indexPatterns):
  242. stop( false ), indexed( false ), mountPoint( mp ), matchList( indexPatterns ),
  243. condition( new pthread_cond_t ), mutex( new pthread_mutex_t )
  244. {
  245. // Initialize the mutex and condition variable
  246. ::pthread_cond_init(condition, 0);
  247. ::pthread_mutex_init(mutex, 0);
  248. }
  249. // Implement operator()(string const&) => we can act as a filter!
  250. bool operator()(string const& nm) const {
  251. // the default is wether we need to filter or not
  252. // matchList empty? No filtering => let everything pass
  253. // matchList not empty? Start with false value and if something
  254. // matches let the thing pass
  255. bool rv( matchList.empty() );
  256. for( paths_type::const_iterator curRX=matchList.begin(); curRX!=matchList.end(); curRX++) {
  257. if( (rv=(::fnmatch(curRX->c_str(), nm.c_str(), FNM_PATHNAME)==0))==true )
  258. break;
  259. }
  260. return rv;
  261. }
  262. };
  263. // Keep a global mapping of recording => filechunks
  264. // note that we let the filechunks be an automatically sorted container
  265. mphandler_data& get_mpdata( string const& mp );
  266. struct filechunk_type {
  267. // Note: no default c'tor
  268. // VBS chunk:
  269. // create from mountpoint, <recname/recname>.<sequencenr>, <sequencenr>, <size>
  270. //
  271. // Mk6 chunk:
  272. // create from mountpoint, <recname>, <sequencenr>, <size>, <fileOffset>
  273. // construct from mountpoint AND full path name
  274. // the mountpoint will be used as lookup key for the thread to ask for the chunk's data
  275. filechunk_type(string const& mp, string const& fnm, unsigned int seqno, off_t sz, off_t fpos = 0) :
  276. mountPoint( mp ), pathToChunk( fnm ), chunkSize( sz ), chunkPos( fpos ), chunkOffset( 0 ), chunkNumber( seqno ),
  277. __m_saved_errno( ESRCH ), __m_thread_active( false ), __m_priority( 0 ),
  278. __m_bytes( 0 ), __m_cond( new pthread_cond_t ), __m_mutex( new pthread_mutex_t ), __m_rwlock( new pthread_rwlock_t )
  279. {
  280. // Initialize the mutex and condition variable
  281. ::pthread_cond_init(__m_cond, 0);
  282. ::pthread_mutex_init(__m_mutex, 0);
  283. ::pthread_rwlock_init(__m_rwlock, 0);
  284. }
  285. // Note: declare chunkOffset as mutable such that we can later, after
  286. // the chunks have been sorted and put into a set, update the
  287. // chunkOffset value to what it should be. [Elements in a set are const
  288. // in order to guarantee that their sorting order is not compromised
  289. // as you alter the element - in this case WE know that the sorting only
  290. // depends on the value of 'chunkNumber' so we can safely edit
  291. // chunkOffset w/o worrying about compromising the set]
  292. string mountPoint;
  293. string pathToChunk;
  294. off_t chunkSize;
  295. off_t chunkPos; // chunk's startposition in file
  296. mutable off_t chunkOffset; // offset within recording
  297. unsigned int chunkNumber;
  298. //struct stat chunkStat;
  299. // Given a file pointer which must lay at or beyond the start
  300. // of this chunk. If pointing outside the far end, return 0
  301. // [this is typically true if someone seeks outside the file
  302. // size's range or if we've reached EOF but wait for further
  303. // incoming data].
  304. size_t remainingBytes(off_t fp) const {
  305. const off_t limit = chunkOffset + chunkSize;
  306. return (size_t)((fp>=chunkOffset && fp<=limit) ? (limit - fp) : 0);
  307. }
  308. // (pre-)reading API of the chunk.
  309. // It is possible to initiate reading a chunk's data and then later wait
  310. // for it to become available
  311. // This initiates a thread, reading the chunk's data, if it's not
  312. // already in memory. This method may safely be called multiple times,
  313. // although NOT from DIFFERENT THREADS at the same time on the same
  314. // chunk - make sure you hold the chunk lock.
  315. void initiateRead( int observer, unsigned int priority ) const {
  316. // Can used scoped lock
  317. scoped_lock_type slt( __m_mutex );
  318. DEBUG(2, "iniateRead[" << mountPoint << "/" << pathToChunk << ", prio=" << priority << "]" << endl);
  319. // No observers => data isn't read yet, so must issue a request
  320. if( __m_observers.empty() || priority>__m_priority ) {
  321. scoped_writelock_type swl( __m_rwlock );
  322. // if observers not empty, requested priority > current prio,
  323. // i.e. we must replace current mmapped bytes with block read
  324. // from disk
  325. if( !__m_observers.empty() ) {
  326. // Note: mmap() can only map on page-size boundaries
  327. // so the /actual/ pointer returned from mmap
  328. // can be computed by shifting it back by the
  329. // amount the /actual/ "chunk" started within
  330. // the page
  331. const size_t pgOffset( chunkPos % pgSize );
  332. ::munmap( __m_bytes - pgOffset, chunkSize + pgOffset );
  333. __m_bytes = 0;
  334. }
  335. // Can already switch to new priority
  336. __m_priority = priority;
  337. // Depending on prio, do the right thing
  338. if( __m_priority==0 ) {
  339. // Handle the mmapping inside this call w/o giving up locks
  340. filechunk_type::mmap_chunk(const_cast<filechunk_type*>(this));
  341. } else {
  342. // Append ourselves to the appropriate workqueue - the queue
  343. // for the mountpoint we're located on!
  344. mphandler_data& mpdata( get_mpdata(mountPoint) );
  345. scoped_lock_type wqlock( mpdata.mutex );
  346. // Adding to the workqueue is not enough (guess how I found this out!)
  347. // we must also signal the condition - how else is the worker thread
  348. // to know that something happened?!
  349. if( !mpdata.stop ) {
  350. mpdata.workqueue.push( const_cast<filechunk_type*>(this) );
  351. ::pthread_cond_broadcast(mpdata.condition);
  352. }
  353. // Also indicate that we're being read
  354. __m_thread_active = true;
  355. }
  356. }
  357. // Insert observer into set
  358. __m_observers.insert( observer );
  359. }
  360. // At some time after init you call this to wait indefinitely for the data to be
  361. // in memory. If it's already there it returns quickly, obviously.
  362. //
  363. // If it returns NULL, it sets errno as follows:
  364. // * ESRCH 'No such process' if no call to initateRead() was made
  365. // first
  366. // * ENOMEM if the memory allocation for this file chunk failed
  367. // * any of the errno's returned by open(2) or read(2), if a failure
  368. // occurred during any of these operations, trying to read the
  369. // chunk's data into memory
  370. //
  371. unsigned char* readLockForData( void ) const {
  372. scoped_lock_type slt( __m_mutex );
  373. // Wait whilst no data & thread active.
  374. // As soon as either is false, we can do stuff
  375. while( __m_bytes==0 && __m_thread_active )
  376. ::pthread_cond_wait(__m_cond, __m_mutex);
  377. if( __m_bytes==0 )
  378. errno = __m_saved_errno;
  379. else
  380. // grab readlock on the data
  381. ::pthread_rwlock_rdlock( __m_rwlock );
  382. return __m_bytes;
  383. }
  384. void releaseReadLock( void ) const {
  385. ::pthread_rwlock_unlock( __m_rwlock );
  386. }
  387. // When the caller has no use for this chunk's data anymore, please to
  388. // release it!
  389. void releaseData( int observer ) const {
  390. scoped_lock_type slt( __m_mutex );
  391. const size_t nobs = __m_observers.size();
  392. observers_type::iterator obsptr = __m_observers.find( observer );
  393. // Check if the observer was observing this block
  394. if( obsptr==__m_observers.end() )
  395. return;
  396. // Remove it from the set. If the set of observers is empty, then we can safely
  397. // deallocate the block.
  398. __m_observers.erase( obsptr );
  399. DEBUG(2, "filechunk[" << pathToChunk << "]::releaseData - data:" << (__m_bytes!=0)
  400. << " nObservers:" << nobs << " -> " << __m_observers.size() << endl);
  401. // still observers left - we're done
  402. if( __m_observers.size() )
  403. return;
  404. // No more referrers?
  405. // Then we must at least wait for a possible thread to finish
  406. // [E.g. file closed before read-ahead chunk was actually needed]
  407. while( __m_thread_active )
  408. ::pthread_cond_wait(__m_cond, __m_mutex);
  409. // Good, now that the thread is gone, we can delete the data
  410. // Do that depending on how it was loaded
  411. if( __m_bytes ) {
  412. if( __m_priority==0 ) {
  413. // Note: mmap() can only map on page-size boundaries
  414. // so the /actual/ pointer returned from mmap
  415. // can be computed by shifting it back by the
  416. // amount the /actual/ "chunk" started within
  417. // the page
  418. const size_t pgOffset( chunkPos % pgSize );
  419. ::munmap( __m_bytes - pgOffset, chunkSize + pgOffset );
  420. } else
  421. ::free( __m_bytes );
  422. }
  423. __m_bytes = 0;
  424. __m_saved_errno = ESRCH;
  425. }
  426. private:
  427. // the mountpoint monitor thread function has
  428. // access to our internals
  429. friend void* mountpoint_monitor(void*);
  430. // no default c'tor!
  431. filechunk_type();
  432. // the data + mutex &cet protecting the object
  433. mutable int __m_saved_errno;
  434. mutable bool __m_thread_active;
  435. mutable unsigned int __m_priority; // 0 = mmap(), >0 = read_chunk()/malloc thus ::free()
  436. mutable observers_type __m_observers;
  437. mutable unsigned char* __m_bytes;
  438. mutable pthread_cond_t* __m_cond;
  439. mutable pthread_mutex_t* __m_mutex;
  440. mutable pthread_rwlock_t* __m_rwlock;
  441. // Minor wrapper arounce read_chunk that detaches.
  442. // Best be executed inna thread
  443. static void* mmap_chunk(filechunk_type* fcptr) {
  444. int fd = invalidFileDescriptor, e;
  445. bool closeFd( true );
  446. const off_t pgAligned( (fcptr->chunkPos/pgSize)*pgSize );
  447. const size_t pgOffset( fcptr->chunkPos % pgSize );
  448. unsigned char* bytes = 0;
  449. DEBUG(3, "mmap_chunk[" << ::pthread_self() << "] mapping " << fcptr->pathToChunk << ":" << fcptr->chunkPos
  450. << " + " << fcptr->chunkSize << "bytes [pgAligned: " << pgAligned << " + " << fcptr->chunkSize+pgOffset << "bytes]" << endl);
  451. // Check if file dscriptor already open
  452. {
  453. scoped_readlock_type srl( &fdcache_lock );
  454. fdcache_type::iterator fdptr = fdcache.find( fcptr->pathToChunk );
  455. if( fdptr!=fdcache.end() ) {
  456. fd = fdptr->second;
  457. closeFd = false;
  458. }
  459. }
  460. // If fd still invalid, then we have to open the file ourselves :-(
  461. if( fd==invalidFileDescriptor ) {
  462. // If we cannot open the file ...
  463. if( (fd=::open(fcptr->pathToChunk.c_str(), O_RDONLY))<0 ) {
  464. e = errno;
  465. DEBUG(-1, "mmap_chunk[" << fcptr->pathToChunk << "] - failed to open: " << ::strerror(errno) << endl);
  466. fcptr->__m_saved_errno = e;
  467. fcptr->__m_thread_active = false;
  468. fcptr->__m_bytes = 0;
  469. return (void*)0;
  470. }
  471. closeFd = true;
  472. }
  473. // HV: Hmmm. We can only mmap page-aligned offsets from within the file
  474. // So the amount of bytes to mmap() has to grow by the
  475. // amount of bytes we have shifted the actual pgAligned
  476. // address downward
  477. bytes = (unsigned char*)::mmap((void*)0, (size_t)fcptr->chunkSize + pgOffset, PROT_READ, MAP_PRIVATE, fd, pgAligned);
  478. // Save errno for possible later use
  479. e = errno;
  480. if( bytes==MAP_FAILED ) {
  481. DEBUG(-1, "mmap_chunk[" << ::pthread_self() << "] mmap failed on " <<
  482. fcptr->pathToChunk << ":" << fcptr->chunkPos << " + " << fcptr->chunkSize <<
  483. " [pgAligned: " << pgAligned << " + " << (fcptr->chunkSize+pgOffset) << "]" <<
  484. " - " << ::strerror(e) << endl);
  485. }
  486. fcptr->__m_saved_errno = (bytes==MAP_FAILED) ? e : 0;
  487. fcptr->__m_thread_active = false;
  488. fcptr->__m_bytes = (bytes==MAP_FAILED) ? 0 : (bytes + pgOffset);
  489. // If we need to close the file descriptor, do it now
  490. (void)(closeFd && ::close( fd ));
  491. return (void*)0;
  492. }
  493. // chunk-reading function. preferrably to be executed
  494. // inna different thread of execution
  495. static void* read_chunk(void* args) {
  496. int fd = invalidFileDescriptor, e = 0;
  497. bool closeFd( true );
  498. unsigned char* bytes = 0;
  499. filechunk_type* fcptr = (filechunk_type*)args;
  500. DEBUG(2, "read_chunk[" << ::pthread_self() << "] reading " << fcptr->pathToChunk << ", " <<
  501. fcptr->chunkSize << " bytes" << endl);
  502. // Check if file dscriptor already open
  503. {
  504. scoped_readlock_type srl( &fdcache_lock );
  505. fdcache_type::iterator fdptr = fdcache.find( fcptr->pathToChunk );
  506. if( fdptr!=fdcache.end() ) {
  507. fd = fdptr->second;
  508. closeFd = false;
  509. }
  510. }
  511. // If fd still invalid, then we have to open the file ourselves :-(
  512. if( fd==invalidFileDescriptor ) {
  513. // If we cannot open the file ...
  514. if( (fd=::open(fcptr->pathToChunk.c_str(), O_RDONLY))<0 ) {
  515. e = errno;
  516. DEBUG(-1, "read_chunk[" << fcptr->pathToChunk << "] - failed to open: " << ::strerror(errno) << endl);
  517. ::pthread_mutex_lock(fcptr->__m_mutex);
  518. fcptr->__m_saved_errno = e;
  519. fcptr->__m_thread_active = false;
  520. fcptr->__m_bytes = 0;
  521. ::pthread_cond_broadcast(fcptr->__m_cond);
  522. ::pthread_mutex_unlock(fcptr->__m_mutex);
  523. return (void*)0;
  524. }
  525. closeFd = true;
  526. }
  527. // use ::malloc() because it doesn't throw - we cannot throw
  528. // exceptions across threads and this executes inna different
  529. // thread
  530. if( (bytes=(unsigned char*)::malloc(fcptr->chunkSize))==0 ) {
  531. (void)(closeFd && ::close( fd ));
  532. DEBUG(-1, "read_chunk[" << fcptr->pathToChunk << "] - insufficient memory for " << fcptr->chunkSize << " bytes" << endl);
  533. ::pthread_mutex_lock(fcptr->__m_mutex);
  534. fcptr->__m_saved_errno = ENOMEM;
  535. fcptr->__m_thread_active = false;
  536. fcptr->__m_bytes = 0;
  537. ::pthread_cond_broadcast(fcptr->__m_cond);
  538. ::pthread_mutex_unlock(fcptr->__m_mutex);
  539. return (void*)0;
  540. }
  541. // seek to the chunk's position in the file
  542. ::lseek(fd, fcptr->chunkPos, SEEK_SET);
  543. // Nothing left than to read the data
  544. size_t n = (size_t)fcptr->chunkSize;
  545. while( n ) {
  546. ssize_t nr = ::read(fd, bytes+(fcptr->chunkSize-n), n);
  547. if( nr<=0 ) {
  548. if( nr<0 ) {
  549. e = errno;
  550. DEBUG(-1, "read_chunk[" << fcptr->pathToChunk << "] - failed to read() " << ::strerror(errno) << endl);
  551. }
  552. break;
  553. }
  554. n -= (size_t)nr;
  555. }
  556. // If we need to close the file descriptor, do it now
  557. (void)(closeFd && ::close( fd ));
  558. // Ok, indicate we're done!
  559. ::pthread_mutex_lock(fcptr->__m_mutex);
  560. fcptr->__m_saved_errno = e;
  561. fcptr->__m_thread_active = false;
  562. fcptr->__m_bytes = bytes;
  563. ::pthread_cond_broadcast(fcptr->__m_cond);
  564. ::pthread_mutex_unlock(fcptr->__m_mutex);
  565. DEBUG(2, "read_chunk[" << ::pthread_self() << "] DONE reading " << fcptr->pathToChunk << endl);
  566. return (void*)0;
  567. }
  568. };
  569. typedef set<filechunk_type> filechunks_type;
  570. // Comparison operator for filechunk_type - sort by chunkNumber exclusively!
  571. bool operator<(filechunk_type const& l, filechunk_type const& r) {
  572. return l.chunkNumber < r.chunkNumber;
  573. }
  574. // And we keep metadata per recording separately
  575. struct metadata_type {
  576. off_t recordingSize;
  577. struct stat st; // used for permissions and st_mtime
  578. metadata_type():
  579. recordingSize( 0 )
  580. { ::memset(&st, 0x0, sizeof(struct stat)); }
  581. };
  582. ////////////////////////////////////////////////////////////////
  583. //
  584. // Prototypes so we can use the calls; implementation is at the
  585. // bottom of this module
  586. //
  587. ////////////////////////////////////////////////////////////////
  588. // These look for VBS recordings
  589. void scanVBSRecordingDirectory(string const& recname, string const& dir, filechunks_type& fcs, struct stat& ts);
  590. void scanMk6RecordingFile(string const& recname, string const& file, filechunks_type& fcs, struct stat& ts);
  591. typedef void (*scanfunction_ptr)(string const& recname, string const& path, filechunks_type& fc, struct stat& tsptr);
  592. // When can we switch to c++11? This sucks, having to do pthread stuff!
  593. struct scanfunction_args {
  594. string path;
  595. string recordingName;
  596. struct stat* stPtr;
  597. pthread_mutex_t* mtx;
  598. scanfunction_ptr sfPtr;
  599. filechunks_type* fcPtr;
  600. scanfunction_args(string const& rec, string const& p, scanfunction_ptr sf,
  601. filechunks_type* fcptr, struct stat* stptr, pthread_mutex_t* m):
  602. path( p ), recordingName( rec ), stPtr( stptr ), mtx( m ), sfPtr( sf ), fcPtr( fcptr )
  603. {}
  604. private:
  605. scanfunction_args();
  606. scanfunction_args(const scanfunction_args&);
  607. const scanfunction_args& operator=(const scanfunction_args&);
  608. };
  609. void* doScan(void* ptr) {
  610. scanfunction_args* sfargs( (scanfunction_args*)ptr );
  611. try {
  612. // Accumulate file chunks in local variable
  613. struct stat tmpST;
  614. filechunks_type tmpFC;
  615. ::memset(&tmpST, 0x0, sizeof(struct stat));
  616. sfargs->sfPtr(sfargs->recordingName, sfargs->path, tmpFC, tmpST);
  617. // Now lock the result and transfer our results into that
  618. scoped_lock_type sml( sfargs->mtx );
  619. // Manually copy the entries because we must check for #FAIL
  620. // (std::copy to an output/insert iterator doesn't do that)
  621. for(filechunks_type::const_iterator curFC = tmpFC.begin(); curFC!=tmpFC.end(); curFC++)
  622. if( sfargs->fcPtr->insert(*curFC).second==false )
  623. throw std::runtime_error(std::string("Duplicate insert of chunk ")+curFC->pathToChunk);
  624. update(*sfargs->stPtr, tmpST);
  625. }
  626. catch( int e ) {
  627. DEBUG(-1, "doScan(" << sfargs->path << ") - errno=" << e << " [" << ::strerror(e) << endl);
  628. }
  629. catch( std::exception& e ) {
  630. DEBUG(-1, "doScan(" << sfargs->path << ") - exception " << e.what() << endl);
  631. }
  632. catch( ... ) {
  633. DEBUG(-1, "doScan(" << sfargs->path << ") - unknown exception" << endl);
  634. }
  635. delete sfargs;
  636. return (void*)0;
  637. }
  638. // Describes a recording
  639. struct recording_type {
  640. const string recordingName; // Const because this will be the primary key!
  641. mutable paths_type paths; // Mk6 => files, VBS => directories
  642. mutable metadata_type metadata;
  643. mutable observers_type observers;
  644. pthread_mutex_t mutex;
  645. mutable filechunks_type fileChunks;
  646. scanfunction_ptr scanFunction;
  647. openpath_fn pathOpener;
  648. closepath_fn pathCloser;
  649. recording_type(recording_type const& other):
  650. recordingName( other.recordingName ), scanFunction( other.scanFunction ),
  651. pathOpener( other.pathOpener ), pathCloser( other.pathCloser )
  652. {
  653. ::pthread_mutex_init(&mutex, 0);
  654. }
  655. recording_type(string const& recname, scanfunction_ptr sf, openpath_fn opener, closepath_fn closer):
  656. recordingName( recname ), scanFunction( sf ), pathOpener( opener ), pathCloser( closer )
  657. {
  658. ::pthread_mutex_init(&mutex, 0);
  659. }
  660. void add_path(string const& p) const {
  661. scoped_lock_type sml( const_cast<pthread_mutex_t*>(&mutex) );
  662. if( paths.insert(p).second==false )
  663. throw runtime_error(string("Duplicate insert for path '")+p+"'");
  664. // Can have quick look at path - get modification time estimate?
  665. // in case it's a file we then might also get an estimated file size
  666. // and user/group id?
  667. }
  668. int open(int observer) const {
  669. scoped_lock_type sml( const_cast<pthread_mutex_t*>(&mutex) );
  670. // Add observer (do not allow duplicates)
  671. if( observers.insert(observer).second==false ) {
  672. DEBUG(-1, "recording/open: duplicate observer " << observer << endl);
  673. return -1;
  674. }
  675. // Must be indexed
  676. int e;
  677. if( (e=doIndex_unlocked())!=0 )
  678. return e;
  679. // And open necessary files
  680. time_t t;
  681. off_t o;
  682. for(paths_type::iterator pptr=paths.begin(); pptr!=paths.end(); pptr++)
  683. if( (e=pathOpener(*pptr, t, o))!=0 )
  684. break;
  685. return e;
  686. }
  687. int index( void ) const {
  688. scoped_lock_type sml( const_cast<pthread_mutex_t*>(&mutex) );
  689. return doIndex_unlocked();
  690. }
  691. int close(int observer) const {
  692. scoped_lock_type sml( const_cast<pthread_mutex_t*>(&mutex) );
  693. observers_type::iterator optr = observers.find(observer);
  694. if( optr==observers.end() ) {
  695. DEBUG(-1, "recording/close: observer " << observer << " not in list!" << endl);
  696. return -1;
  697. }
  698. observers.erase( optr );
  699. // No one left? Then we need to close all paths
  700. if( !observers.empty() )
  701. return 0;
  702. // Note that we do not break on error - we must attempt to close
  703. // all paths
  704. int e = 0;
  705. for(paths_type::iterator pptr=paths.begin(); pptr!=paths.end(); pptr++) {
  706. int tmpe = pathCloser( *pptr );
  707. // remember first failure
  708. if( tmpe && !e )
  709. e = tmpe;
  710. }
  711. return e;
  712. }
  713. private:
  714. int doIndex_unlocked( void ) const {
  715. if( fileChunks.size() )
  716. return 0;
  717. // Oh bugger
  718. // Issue a multithreaded parallel scan
  719. int eno = 0;
  720. pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
  721. threadlist_type threads;
  722. // Make sure metadata is reset to empty
  723. metadata = metadata_type();
  724. for(paths_type::iterator pptr=paths.begin(); pptr!=paths.end(); pptr++) {
  725. pthread_t threadId;
  726. scanfunction_args* sfaptr(new scanfunction_args(recordingName, *pptr, scanFunction,&fileChunks,
  727. &metadata.st, &mtx));
  728. if( (eno=vbs_pthread_create(&threadId, doScan, sfaptr))!=0 )
  729. break;
  730. threads.push_back( threadId );
  731. }
  732. // Join all threads
  733. for(threadlist_type::iterator tidptr=threads.begin(); tidptr!=threads.end(); tidptr++)
  734. ::pthread_join( *tidptr, 0 );
  735. // Go through all file chunks, updating their position and also
  736. // keep track of the recording size
  737. for(filechunks_type::iterator p=fileChunks.begin(); p!=fileChunks.end(); p++) {
  738. p->chunkOffset = metadata.recordingSize;
  739. metadata.recordingSize += p->chunkSize;
  740. }
  741. return eno;
  742. }
  743. // None of these should be allowed
  744. recording_type();
  745. recording_type const& operator=(recording_type const&);
  746. };
  747. // Define strict weak ordering for recording_type:
  748. // compare by name only
  749. bool operator<(recording_type const& l, recording_type const& r) {
  750. return l.recordingName < r.recordingName;
  751. }
  752. typedef set<recording_type> recordings_type;
  753. // Open file information
  754. struct openfile_type {
  755. // remember last pointer to filechunk
  756. off_t previousEnd;
  757. uint64_t nConsecutiveReads;
  758. pthread_rwlock_t fileRWLock;
  759. recordings_type::iterator recordingPtr;
  760. filechunks_type::iterator curChunk;
  761. openfile_type(openfile_type const& other):
  762. nConsecutiveReads( 0 ), recordingPtr( other.recordingPtr ), curChunk( other.curChunk )
  763. {
  764. int r;
  765. // default rwlock is good enough for us!
  766. if( (r=::pthread_rwlock_init(&fileRWLock, 0))!=0 ) {
  767. DEBUG(-1, "openfile_type(openfile_type const&)/pthread_rwlock_init fails - " << ::strerror(r) << endl);
  768. }
  769. // Put previous end past end-of-file such that it should, normally,
  770. // not compare equal to the first read() action at any file offset
  771. previousEnd = recordingPtr->metadata.recordingSize + 1;
  772. }
  773. openfile_type(recordings_type::iterator recptr):
  774. nConsecutiveReads( 0 ), recordingPtr( recptr ), curChunk( recordingPtr->fileChunks.begin() )
  775. {
  776. int r;
  777. // default rwlock is good enough for us!
  778. if( (r=::pthread_rwlock_init(&fileRWLock, 0))!=0 ) {
  779. DEBUG(-1, "openfile_type()/pthread_rwlock_init fails - " << ::strerror(r) << endl);
  780. }
  781. // Put previous end past end-of-file such that it should, normally,
  782. // not compare equal to the first read() action at any file offset
  783. previousEnd = recordingPtr->metadata.recordingSize + 1;
  784. }
  785. private:
  786. openfile_type();
  787. openfile_type const& operator=(openfile_type const&);
  788. };
  789. ///////////////////////////////////////////////////
  790. //
  791. // typedefs
  792. //
  793. ///////////////////////////////////////////////////
  794. typedef map<string, mphandler_data> mpmonitormap_type;
  795. typedef map<int, openfile_type> openfilecache_type;
  796. ///////////////////////////////////////////////
  797. // Keep state of the FUSE file system in
  798. // a struct of this type. It's a global
  799. // variable.
  800. ///////////////////////////////////////////////
  801. static const string flexbuffPattern( "/mnt/disk*" );
  802. static const string mk6Pattern( "/mnt/disks/*/*/data" );
  803. struct vbs_state_type {
  804. int readAhead;
  805. bool fullIndex;
  806. paths_type mountPoints;
  807. paths_type indexPatterns;
  808. recordings_type recordings;
  809. pthread_mutex_t chunkMutex;
  810. mpmonitormap_type mpmonitorMap;
  811. openfilecache_type openfileCache;
  812. vbs_state_type():
  813. readAhead( 0 ), fullIndex( true )
  814. {
  815. ::pthread_mutex_init(&chunkMutex, 0);
  816. }
  817. ~vbs_state_type() {
  818. DEBUG(2, "~vbs_state_type: stopping all monitor threads" << endl);
  819. // Stop all mountpoint monitors
  820. for( mpmonitormap_type::iterator monptr=mpmonitorMap.begin(); monptr!=mpmonitorMap.end(); monptr++) {
  821. mphandler_data& mpdata = monptr->second;
  822. DEBUG(2, "~vbs_state_type: stopping " << monptr->first << endl);
  823. // Signal thread that it's time to go
  824. ::pthread_mutex_lock(mpdata.mutex);
  825. mpdata.stop = true;
  826. ::pthread_cond_signal(mpdata.condition);
  827. ::pthread_mutex_unlock(mpdata.mutex);
  828. }
  829. for( mpmonitormap_type::iterator monptr=mpmonitorMap.begin(); monptr!=mpmonitorMap.end(); monptr++) {
  830. mphandler_data& mpdata = monptr->second;
  831. // Now join it
  832. int j;
  833. void* dummy;
  834. DEBUG(2, "~vbs_state_type: joining " << monptr->first << endl);
  835. j = ::pthread_join(mpdata.threadId, &dummy);
  836. if( j!=0 ) {
  837. DEBUG(-1, "~vbs_state_type: failed to join [" << monptr->first << "]: " << ::strerror(errno) << endl);
  838. }
  839. DEBUG(2, "~vbs_state_type: " << monptr->first << " " << ((j==0)?("OK"):("FAIL")) << endl);
  840. }
  841. DEBUG(2, "~vbs_state_type: done" << endl);
  842. }
  843. };
  844. vbs_state_type vbs_state;
  845. ///////////////////////////////////////////////
  846. //
  847. // Given a mountpoint, return a reference
  848. // to the mountpointhandler data
  849. //
  850. ///////////////////////////////////////////////
  851. mphandler_data& get_mpdata( string const& mp ) {
  852. mpmonitormap_type::iterator p = vbs_state.mpmonitorMap.find( mp );
  853. EZASSERT2( p!=vbs_state.mpmonitorMap.end(), vbs_except, EZINFO("Mountpoint " << mp << " not found in monitormap" << endl) );
  854. return p->second;
  855. }
  856. ///////////////////////////////////////////////
  857. // Wrapper around ::pthread_create(3) -
  858. // creates a joinable thread with
  859. // ALL signals blocked.
  860. // Takes same arguments as ::pthread_create(3)
  861. // apart from the pthread_attr_t struct
  862. ///////////////////////////////////////////////
  863. int vbs_pthread_create(pthread_t* thread, void *(*start_routine)(void*), void *arg) {
  864. int pr, createrv;
  865. sigset_t oldSig, newSig;
  866. pthread_attr_t attr;
  867. // Make sure we have a joinable thread
  868. if( (pr=::pthread_attr_init(&attr))!=0 ) {
  869. DEBUG(-1, "vbs_pthread_create: pthread_attr_init fails - " << ::strerror(pr) << endl);
  870. return pr;
  871. }
  872. if( (pr=::pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE))!=0 ) {
  873. DEBUG(-1, "vbs_pthread_create: pthread_attr_setdetachstate fails - " << ::strerror(pr) << endl);
  874. return pr;
  875. }
  876. // Install a fully filled signal set (i.e. block all of 'm) and save the
  877. // current one
  878. if( sigfillset(&newSig)!=0 ) {
  879. DEBUG(-1, "vbs_pthread_create: sigfillset fails - " << ::strerror(errno) << endl);
  880. return errno;
  881. }
  882. if( (pr=::pthread_sigmask(SIG_SETMASK, &newSig, &oldSig))!=0 ) {
  883. DEBUG(-1, "vbs_pthread_create: pthread_sigmask (setting new mask) fails - " << ::strerror(pr) << endl);
  884. return pr;
  885. }
  886. // Now we're in a determined state and we can safely create the
  887. // thread. We save the return value of this one specifically because
  888. // that will be the eventual return value. We do the cleanup calls
  889. // and if they'll fail we inform the user that but do not return *those*
  890. // error value(s)
  891. createrv = ::pthread_create(thread, &attr, start_routine, arg);
  892. if( createrv!=0 )
  893. DEBUG(-1, "vbs_pthread_create: pthread_create fails - " << ::strerror(createrv) << endl);
  894. // Cleanup phase: put back old signal mask & destroy pthread attributes
  895. if( (pr=::pthread_sigmask(SIG_SETMASK, &oldSig, 0))!=0 )
  896. DEBUG(-1, "vbs_pthread_create: pthread_sigmask (put back old mask) fails - " << ::strerror(pr) << endl);
  897. if( (pr=::pthread_attr_destroy(&attr))!=0 )
  898. DEBUG(-1, "vbs_pthread_create: pthread_attr_destroy fails - " << ::strerror(pr) << endl);
  899. // Phew. Finally done.
  900. return createrv;
  901. }
  902. ///////////////////////////////////////////////
  903. // The mountpoint monitor function.
  904. // It is a thread function
  905. ///////////////////////////////////////////////
  906. struct mapresult_type {
  907. //time_t mtime;
  908. struct stat st;
  909. scanfunction_ptr scanFunction;
  910. openpath_fn openFunction;
  911. closepath_fn closeFunction;
  912. mapresult_type():
  913. scanFunction( 0 ), openFunction( 0 ), closeFunction( 0 )
  914. { ::memset(&st, 0x0, sizeof(struct stat)); }
  915. mapresult_type(struct stat s, scanfunction_ptr sf, openpath_fn opener, closepath_fn closer):
  916. st( s ), scanFunction( sf ), openFunction( opener ), closeFunction( closer )
  917. {}
  918. operator bool() const {
  919. return !(scanFunction==0 || openFunction==0 || closeFunction==0);
  920. }
  921. };
  922. mapresult_type mk6(struct stat t) {
  923. return mapresult_type(t, scanMk6RecordingFile, mk6_path_opener, mk6_path_closer);
  924. }
  925. mapresult_type vbs(struct stat t) {
  926. return mapresult_type(t, scanVBSRecordingDirectory, vbs_path_opener, vbs_path_closer);
  927. }
  928. // predicate which checks if the given entry is a Mark6 data file
  929. struct isMark6Recording {
  930. bool operator()(string const& entry) const {
  931. // attempt to read mk6 header
  932. int fd = ::open(entry.c_str(), O_RDONLY);
  933. ssize_t n;
  934. mk6_file_header fhdr;
  935. DEBUG(4, "isMark6Recording(" << entry << "): fd=" << fd << endl);
  936. if( fd<0 )
  937. return false;
  938. // attempt to read sizeof(header)
  939. n = ::read(fd, &fhdr, sizeof(mk6_file_header));
  940. ::close( fd );
  941. // Check if we could read the file header and wether it has the magicz
  942. return (n==sizeof(mk6_file_header) && fhdr.sync_word==MARK6_SG_SYNC_WORD && fhdr.version==2) ? true : false;
  943. }
  944. };
  945. // Complaint by users: vbs_ls, vbs_rm and vbs_fs don't seem to pick up
  946. // FlexBuff recordings with regex majik characters (".", "+" et.al.) in
  947. // them. Now, creating recordings with those characters in their names might
  948. // be capital offence in the first place ... but since no-one's stopping
  949. // them it might be considered polite to suck it up and make the s/w operate
  950. // correctly just the same.
  951. // In vbs_ls/vbs_rm the issue was fixed by escaping the recording name
  952. // before making a regex pattern out of it for the VBS shrapnel:
  953. // https://docs.python.org/2/library/re.html#re.escape
  954. //
  955. // I've looked up the implementation of re.escape:
  956. // https://github.com/python/cpython/blob/master/Lib/re.py#L249
  957. // (status as on 09 Feb 2017)
  958. //
  959. // and I think it's easy enough to emulate that here in C++
  960. static const string alphanum_str("_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890");
  961. string escape(string const& s) {
  962. string rv;
  963. back_insert_iterator<string> output(rv);
  964. for(string::const_iterator p=s.begin(); p!=s.end(); *output++ = *p++)
  965. if( alphanum_str.find(*p)==string::npos )
  966. *output++ = '\\';
  967. return rv;
  968. }
  969. // predicate which tests if the given entry is a VBS style recording:
  970. // * entry represents a directory
  971. // * that directory contains at least one file <entry>.XXXXXXXX
  972. // "entry/entry.XXXXXXXX is the pattern for a VBS recording chunk)
  973. // predicate that returns wether or not a file name matches a recording chunk
  974. struct isRecordingChunk {
  975. isRecordingChunk(string const& recname):
  976. __m_regex( string("(^|/)")+escape(recname)+"\\.[0-9]{8}$" )
  977. {}
  978. bool operator()(string const& entry) const {
  979. return __m_regex.matches(entry);
  980. }
  981. Regular_Expression __m_regex;
  982. private:
  983. isRecordingChunk();
  984. };
  985. struct isVBSRecording {
  986. bool operator()(string const& entry) const {
  987. DEBUG(4, "isVBSRecording(" << entry << ")" << endl);
  988. // Test if any of the entries in this directory conform to the VBS
  989. // recording naming convention. As this is a quick scan we quit as
  990. // soon as we find one (1) matching entry. The full scan will be
  991. // done later
  992. // NOTE: 'entry' is the full path to a directory:
  993. // /path/to/some/potential/vbs_recording_no0001
  994. // so we have to extract the last part of the path!
  995. try {
  996. return dir_exists(entry, isRecordingChunk(entry.substr(entry.rfind('/')+1)));
  997. }
  998. catch( std::exception& e ) {
  999. DEBUG(4, "isVBSRecording(" << entry << ")/caught exception: " << e.what() << endl);
  1000. return false;
  1001. }
  1002. catch(...) {
  1003. DEBUG(4, "isVBSRecording(" << entry << ")/caught unknown exception" << endl);
  1004. return false;
  1005. }
  1006. }
  1007. };
  1008. // Define a directory mapper which we can map over a mountpoint.
  1009. // Each entry will be translated into a mapresult_type.
  1010. // Any entry with a non-empty mapresult will have to be added to the global
  1011. // recording cache ...
  1012. struct isRecording {
  1013. // for each entry we return a value of this type
  1014. typedef mapresult_type value_type;
  1015. // The actual implementation
  1016. value_type operator()(string const& entry) const {
  1017. // Ok it's a thing we recognize. Need to get the st_mtime
  1018. // for the metadata
  1019. struct stat s;
  1020. value_type (*maker)(struct stat) = 0;
  1021. if( ::lstat(entry.c_str(), &s)==0 ) {
  1022. // Check if it's anything we recognize
  1023. // directories could be VBS recording,
  1024. // files could be Mark6 recording
  1025. if( ((s.st_mode&S_IFDIR)==S_IFDIR) && isVBSRecording_f(entry) )
  1026. maker = vbs;
  1027. else if( ((s.st_mode&S_IFREG)==S_IFREG) && isMark6Recording_f(entry) )
  1028. maker = mk6;
  1029. } else {
  1030. DEBUG(1, "WARN: Failed to stat(" << entry << ") - " << ::strerror(errno) << endl);
  1031. }
  1032. return (maker == 0) ? mapresult_type() : maker( s );
  1033. }
  1034. isMark6Recording isMark6Recording_f;
  1035. isVBSRecording isVBSRecording_f;
  1036. };
  1037. struct name_matcher_type {
  1038. name_matcher_type(string const& nm):
  1039. __name_to_match( nm )
  1040. {}
  1041. bool operator()( const recording_type& rec ) const {
  1042. return rec.recordingName==__name_to_match;
  1043. }
  1044. const string __name_to_match;
  1045. };
  1046. // note that we only ever ADD files to the chunk cache. If someone deletes
  1047. // the backing storage from out under us, that's too bad ...
  1048. void* mountpoint_monitor(void* args) {
  1049. // For quick scanning the mount point
  1050. typedef dir_mapper<isRecording> recordingmapper_type;
  1051. typedef recordingmapper_type::value_type found_recordings_type;
  1052. // We get passed a pointer to string
  1053. bool except = false;
  1054. ostringstream oss;
  1055. mphandler_data* mpdata = (mphandler_data*)args;
  1056. if( mpdata==0 ) {
  1057. DEBUG(-1, "mountpoint_monitor/null-pointer passed as handler data!" << endl);
  1058. return (void*)0;
  1059. }
  1060. if( mpdata->matchList.size() )
  1061. oss << " [filtering " << mpdata->matchList.size() << " pattern" << ((mpdata->matchList.size()==1) ? "" : "s") << "]";
  1062. DEBUG(2, "mountpoint_monitor[" << mpdata->mountPoint << "] starting" << oss.str() << endl);
  1063. try {
  1064. // Let's check that mother!
  1065. // Do the scan unlocked
  1066. found_recordings_type found = recordingmapper_type()(mpdata->mountPoint, *mpdata);
  1067. // Filter out the entries that were diagnosed to be recordings.
  1068. // We must hold the lock on the recordings.
  1069. ::pthread_mutex_lock( &vbs_state.chunkMutex );
  1070. recordings_type& recordings( vbs_state.recordings );
  1071. for(found_recordings_type::const_iterator ptr=found.begin(); ptr!=found.end(); ptr++) {
  1072. mapresult_type const& mr( ptr->second );
  1073. // If it was not a recognized recording ..
  1074. if( !mr )
  1075. continue;
  1076. // If recording does not exist yet, add it
  1077. recordings_type::iterator recptr = std::find_if(recordings.begin(), recordings.end(), name_matcher_type(ptr->first));
  1078. if( recptr==recordings.end() )
  1079. recptr = recordings.insert(
  1080. recording_type(ptr->first, mr.scanFunction, mr.openFunction, mr.closeFunction)
  1081. ).first;
  1082. // Ok, recording is certain to exist and recptr points to it
  1083. // Now add path and update meta data
  1084. recording_type const& recording( *recptr );
  1085. recording.add_path( mpdata->mountPoint+"/"+ptr->first );
  1086. update(recording.metadata.st, ptr->second.st);
  1087. }
  1088. // Done updating the global recording map
  1089. ::pthread_mutex_unlock( &vbs_state.chunkMutex );
  1090. // Signal that we're done indexing
  1091. ::pthread_mutex_lock(mpdata->mutex);
  1092. mpdata->indexed = true;
  1093. ::pthread_cond_signal(mpdata->condition);
  1094. ::pthread_mutex_unlock(mpdata->mutex);
  1095. // And drop into our inner loop!
  1096. while( true ) {
  1097. bool stop = false; // copies of shared state variables
  1098. filechunk_type* workptr = 0;
  1099. // lock self & condition wait for something to happen
  1100. ::pthread_mutex_lock(mpdata->mutex);
  1101. while( mpdata->stop==false && mpdata->workqueue.size()==0 )
  1102. ::pthread_cond_wait(mpdata->condition, mpdata->mutex);
  1103. stop = mpdata->stop;
  1104. if( mpdata->workqueue.size() ) {
  1105. workptr = mpdata->workqueue.front();
  1106. mpdata->workqueue.pop();
  1107. }
  1108. ::pthread_mutex_unlock(mpdata->mutex);
  1109. // Check what the message was
  1110. if( stop==true )
  1111. break;
  1112. if( workptr==0 ) {
  1113. DEBUG(-1, "mountpoint_monitor[" << mpdata->mountPoint << "] got nullptr in workqueue?!" << endl);
  1114. continue;
  1115. }
  1116. // Crap. There's no denying anymore - we have to Do Work(tm)
  1117. filechunk_type::read_chunk( workptr );
  1118. }
  1119. // Ok, we're not reading anymore. Inform all queued bloccks
  1120. // that it ain't gonna happen no more
  1121. ::pthread_mutex_lock(mpdata->mutex);
  1122. while( mpdata->workqueue.size() ) {
  1123. filechunk_type* fcptr = mpdata->workqueue.front();
  1124. ::pthread_mutex_lock(fcptr->__m_mutex);
  1125. fcptr->__m_thread_active = false;
  1126. ::pthread_cond_broadcast(fcptr->__m_cond);
  1127. ::pthread_mutex_unlock(fcptr->__m_mutex);
  1128. mpdata->workqueue.pop();
  1129. }
  1130. ::pthread_mutex_unlock(mpdata->mutex);
  1131. }
  1132. catch( int eno ) {
  1133. except = true;
  1134. DEBUG(-1, "mountpoint_monitor[" << mpdata->mountPoint << "] caught errno " << eno << " - " << ::strerror(eno) << endl);
  1135. }
  1136. catch( std::exception& e ) {
  1137. except = true;
  1138. DEBUG(-1, "mountpoint_monitor[" << mpdata->mountPoint << "] caught exception " << e.what() << endl);
  1139. }
  1140. catch( ... ) {
  1141. except = true;
  1142. DEBUG(-1, "mountpoint_monitor[" << mpdata->mountPoint << "] caught unknown exception" << endl);
  1143. }
  1144. if( except ) {
  1145. // Signal to main loop that (1) we're done initializing and (2)
  1146. // that we don't accept any requests
  1147. ::pthread_mutex_lock(mpdata->mutex);
  1148. mpdata->indexed = true;
  1149. mpdata->stop = true;
  1150. ::pthread_cond_signal(mpdata->condition);
  1151. ::pthread_mutex_unlock(mpdata->mutex);
  1152. }
  1153. DEBUG(2, "mountpoint_monitor[" << mpdata->mountPoint << "] stopping" << endl);
  1154. return (void*)0;
  1155. }
  1156. // Predicate for a directory entry to be a mountpoint
  1157. // 1. name must be "disk[0-9]+"
  1158. // 2. must refer to a directory
  1159. struct isMountpoint {
  1160. bool operator()(string const& entry) const {
  1161. Regular_Expression rxDisk("^disk[0-9]{1,}$");
  1162. struct stat status;
  1163. string::size_type slash = entry.find_last_of("/");
  1164. // IF there is a slash, we skip it, if there isn't, we
  1165. // use the whole string
  1166. if( slash==string::npos )
  1167. slash = 0;
  1168. else
  1169. slash += 1;
  1170. DEBUG(3, "isMountpoint: checking name " << entry.substr(slash) << endl);
  1171. if( !rxDisk.matches(entry.substr(slash)) )
  1172. return false;
  1173. if( ::lstat(entry.c_str(), &status)<0 ) {
  1174. DEBUG(2, "predMountpoint: ::lstat fails on " << entry << " - " << ::strerror(errno) << endl);
  1175. return false;
  1176. }
  1177. // We must have r,x access to the directory [in order to descend into it]
  1178. return S_ISDIR(status.st_mode) && (status.st_mode & S_IRUSR) && (status.st_mode & S_IXUSR);
  1179. }
  1180. };
  1181. ///////////////////////////////////////////////
  1182. //
  1183. // Here follow the implementations
  1184. // of the file system functions
  1185. // that we support
  1186. //
  1187. ///////////////////////////////////////////////
  1188. void* vbs_init(struct fuse_conn_info* /*conn*/) {
  1189. DEBUG(-1, "vbs_init: starting" << endl);
  1190. try {
  1191. mpmonitormap_type& mpmon( vbs_state.mpmonitorMap );
  1192. EZASSERT2( vbs_state.mountPoints.size()>0, vbs_except, EZINFO("No mountpoints found") );
  1193. for( paths_type::const_iterator mp=vbs_state.mountPoints.begin(); mp!=vbs_state.mountPoints.end(); mp++) {
  1194. DEBUG(2, "vbs_init: Found mountpoint " << *mp << endl);
  1195. // Excellent, now attempt to start a monitor thread for this mountpoint
  1196. typedef pair<mpmonitormap_type::iterator, bool> mpinsres_type;
  1197. int r;
  1198. mpinsres_type insres = mpmon.insert( make_pair(*mp, mphandler_data(*mp, vbs_state.indexPatterns)) );
  1199. // Start the monitor thread
  1200. if( (r=vbs_pthread_create(&insres.first->second.threadId, mountpoint_monitor, &insres.first->second))!=0 ) {
  1201. DEBUG(-1, "vbs_init: Failed to create thread for mountpoint " << *mp << endl);
  1202. // remove from mpmonitor map orelse we'll be waiting for a
  1203. // thread that was never started
  1204. // Again: actual killing of vbs_init() will happen later,
  1205. // after we've waited for all other threads
  1206. mpmon.erase( insres.first );
  1207. }
  1208. }
  1209. // For each thread that was started, wait for it to have signalled it's done initializing
  1210. DEBUG(-1, "vbs_init: please wait whilst indexing " << mpmon.size() << " mountpoints" << endl);
  1211. for( mpmonitormap_type::iterator monptr=mpmon.begin(); monptr!=mpmon.end(); monptr++) {
  1212. bool stop;
  1213. mphandler_data& mpdata = monptr->second;
  1214. ::pthread_mutex_lock(mpdata.mutex);
  1215. while( mpdata.stop==false && mpdata.indexed==false )
  1216. ::pthread_cond_wait(mpdata.condition, mpdata.mutex);
  1217. stop = mpdata.stop;
  1218. ::pthread_mutex_unlock(mpdata.mutex);
  1219. DEBUG(1, "vbs_init: " << monptr->first << " " << ((stop)?("EXIT"):("OK")) << endl);
  1220. }
  1221. DEBUG(-1, "vbs_init: indexing " << mpmon.size() << " mountpoints finished" << endl);
  1222. // Only do the full index scan if requested
  1223. recordings_type& recordings( vbs_state.recordings );
  1224. if( vbs_state.fullIndex ) {
  1225. DEBUG(-1, "vbs_init: start indexing metadata of " << recordings.size() << " recordings ..." << endl);
  1226. for(recordings_type::iterator recptr=recordings.begin(); recptr!=recordings.end(); recptr++)
  1227. recptr->index();
  1228. }
  1229. DEBUG(-1, "vbs_init: indexing metadata finished" << endl);
  1230. }
  1231. catch( int erno ) {
  1232. DEBUG(-1, "vbs_init/caught deadly errno - " << ::strerror(erno) << endl);
  1233. }
  1234. catch( const exception& e ) {
  1235. DEBUG(-1, "vbs_init/caught deadly exception - " << e.what() << endl);
  1236. }
  1237. catch( ... ) {
  1238. DEBUG(-1, "vbs_init/caught deadly unknown exception" << endl);
  1239. }
  1240. return (void*)&vbs_state;
  1241. }
  1242. int vbs_readdir(char const* path, void* buf, fuse_fill_dir_t filler, off_t /*offset*/, struct fuse_file_info* /*fi*/) {
  1243. DEBUG(1, "vbs_readdir[" << path << "]" << endl);
  1244. // We will *never* allow subdirectories so any request for anything
  1245. // other than "/" (the root of this file system) we're going to return
  1246. // an error!
  1247. if( string(path)!="/" )
  1248. return -ENOENT;
  1249. // Now fill the directory listing
  1250. filler(buf, ".", 0, 0);
  1251. filler(buf, "..", 0, 0);
  1252. // Grab the lock on the recordings and fetch all the keys - they are the
  1253. // recording names!
  1254. scoped_lock_type sml( &vbs_state.chunkMutex );
  1255. const recordings_type& recordings = vbs_state.recordings;
  1256. for(recordings_type::iterator recptr=recordings.begin(); recptr!=recordings.end(); recptr++)
  1257. filler(buf, recptr->recordingName.c_str(), 0, 0);
  1258. return 0;
  1259. }
  1260. int vbs_getattr(char const* path, struct stat* statbuf) {
  1261. const time_t now = ::time(0);
  1262. DEBUG(1, "vbs_getattr[" << path << "]" << endl);
  1263. // before we create the recording name out of the path,
  1264. // we have to strip leading slashes!
  1265. while( *path=='/' ) path++;
  1266. const string recname( path );
  1267. // If someone requests the attrs of "/" it's the props of
  1268. // the FUSE mountpoint itself, so we list it as a directory
  1269. // [note, after stripping leading slashes, the root path
  1270. // becomes the empty string!]
  1271. if( recname=="" ) {
  1272. statbuf->st_mode = S_IFDIR | permissionBits;
  1273. statbuf->st_uid = ::getuid();
  1274. statbuf->st_gid = ::getgid();
  1275. statbuf->st_mtime = now;
  1276. statbuf->st_ctime = now;
  1277. statbuf->st_atime = now;
  1278. return 0;
  1279. }
  1280. scoped_lock_type sml( &vbs_state.chunkMutex );
  1281. recordings_type& recordings( vbs_state.recordings );
  1282. recordings_type::const_iterator recptr = std::find_if(recordings.begin(), recordings.end(), name_matcher_type(recname));
  1283. if( recptr==vbs_state.recordings.end() ) {
  1284. DEBUG(-1, "vbs_getattr[" << path << "] - not found in cache?!!!" << endl);
  1285. return -ENOENT;
  1286. }
  1287. // It's an existing recording!
  1288. // should get mode/user/group from the actual file(s)-on-disk
  1289. struct stat const& recStat(recptr->metadata.st);
  1290. statbuf->st_mode = recStat.st_mode;
  1291. statbuf->st_uid = recStat.st_uid;
  1292. statbuf->st_gid = recStat.st_gid;
  1293. statbuf->st_size = recptr->metadata.recordingSize;
  1294. statbuf->st_mtime = statbuf->st_ctime = statbuf->st_atime = recStat.st_mtime;
  1295. DEBUG(3, "vbs_getattr[" << path << "]: " << *statbuf << endl);
  1296. return 0;
  1297. }
  1298. void vbs_destroy(void* /*ptr*/) { }
  1299. int vbs_open(char const* path, struct fuse_file_info* fi) {
  1300. // Skip leading slash(es) in path
  1301. while( path && *path && *path=='/' )
  1302. path++;
  1303. int res = 0;
  1304. const string recname( path );
  1305. ::pthread_mutex_lock(&vbs_state.chunkMutex);
  1306. recordings_type& recordings = vbs_state.recordings;
  1307. openfilecache_type& openfiles = vbs_state.openfileCache;
  1308. recordings_type::iterator recptr = std::find_if(recordings.begin(), recordings.end(), name_matcher_type(recname));
  1309. if( recptr==recordings.end() ) {
  1310. res = -ENOENT;
  1311. } else {
  1312. // Find first unused file descriptor
  1313. struct rlimit rlim;
  1314. res = -EMFILE; // will be reset to succes if indeed we find a free fd
  1315. fi->fh = std::numeric_limits<int>::max();
  1316. if( ::getrlimit(RLIMIT_NOFILE, &rlim)!=0 ) {
  1317. DEBUG(-1, "::getrlimit(RLIMIT_NOFILE, ...) fails - " << ::strerror(errno) << endl);
  1318. rlim.rlim_max = rlim.rlim_cur = std::numeric_limits<int>::max() - 1024;
  1319. }
  1320. while( (fi->fh>rlim.rlim_max) && openfiles.find(fi->fh)!=openfiles.end() )
  1321. fi->fh--;
  1322. if( fi->fh>=rlim.rlim_max ) {
  1323. // do not forget to open the recording
  1324. recptr->open( (int)fi->fh );
  1325. openfiles.insert( make_pair(fi->fh, openfile_type(recptr)) );
  1326. res = 0; // reset to success
  1327. }
  1328. }
  1329. ::pthread_mutex_unlock(&vbs_state.chunkMutex);
  1330. DEBUG(1, "vbs_open[" << path << "] res=" << res << " fh=" << fi->fh << endl);
  1331. return res;
  1332. }
  1333. int vbs_read(char const* path, char* buff, size_t n, off_t offset, struct fuse_file_info* fi) {
  1334. static unsigned int nRead = 0;
  1335. unsigned int nr = nRead++;
  1336. openfilecache_type::iterator ofp;
  1337. {
  1338. // make use of the fact that once we have an iterator
  1339. // into a std::map or std::set, other inserts/deletes won't
  1340. // invalidate this iterator. Only someone else doing a removal
  1341. // of this entry will invalidate the iterator we got (if any)
  1342. scoped_lock_type sml( &vbs_state.chunkMutex );
  1343. if( (ofp=vbs_state.openfileCache.find(fi->fh))==vbs_state.openfileCache.end() )
  1344. return -ENOENT;
  1345. }
  1346. // Lock the open file - we must have exclusive access to of->curChunk;
  1347. // either we need to modify it andalso we must prevent others from
  1348. // modifying it behind our backs
  1349. openfile_type& of = ofp->second;
  1350. scoped_writelock_type swl( &ofp->second.fileRWLock );
  1351. int eno = 0, racnt;
  1352. unsigned int subn = 0, priority;
  1353. size_t navail;
  1354. const size_t norg = n;
  1355. filechunks_type& chunks = of.recordingPtr->fileChunks;
  1356. filechunks_type::iterator ptr = of.curChunk, raptr;
  1357. // Count consecutive reads and set priority depending on that
  1358. if( offset==of.previousEnd )
  1359. of.nConsecutiveReads++;
  1360. else
  1361. of.nConsecutiveReads = 0;
  1362. // Threshold may be subject to change - maybe fuse does some readahead
  1363. // by itself, even if the user only asks for a tiny bit once
  1364. priority = (of.nConsecutiveReads>3);
  1365. // If data is not from the current block [someone changed the file
  1366. // pointer behind our backs through e.g. lseek(2)] we must unbobserve
  1367. // blocks we don't use no more [the ones for which we issued
  1368. // read-aheads]
  1369. if( ptr==chunks.end() || (ptr!=chunks.end() && ptr->remainingBytes(offset)==0) ) {
  1370. filechunks_type::iterator rmptr = chunks.begin();
  1371. // release anything up to the block we *need* to have
  1372. while( rmptr!=chunks.end() && rmptr->remainingBytes(offset)==0 ) {
  1373. rmptr->releaseData(ofp->first);
  1374. rmptr++;
  1375. }
  1376. // Save pointer to chunk we need to be a-readin from
  1377. of.curChunk = ptr = rmptr;
  1378. // Attempt to advance "readahead" nr of blocks - those
  1379. // we'll potentially need so we leave their in-memory state
  1380. // as-is. If prio==0 this means no read-ahead actually.
  1381. for(racnt=0; priority>0 && racnt<=vbs_state.readAhead && rmptr!=chunks.end(); racnt++, rmptr++);
  1382. // Release anything up to end of file, even though we possibly need
  1383. // those again later on but such is life in cache-land; our
  1384. // recordings are so big that we cannot possibly keep everything in
  1385. // memory that we want. Also: it is the application's responsibility
  1386. // to not seek to and fro throughout the recording!
  1387. while( rmptr!=chunks.end() ) {
  1388. rmptr->releaseData(ofp->first);
  1389. rmptr++;
  1390. }
  1391. }
  1392. while( n ) {
  1393. DEBUG(3, "vbs_read#" << nr << "." << subn++ << "[" << path << ", " << ::pthread_self() << "] " <<
  1394. "offset=" << offset << " n=" << n << " prio=" << priority << " prevEnd=" << of.previousEnd << " nConsec=" << of.nConsecutiveReads << endl);
  1395. // if no more bytes available, move to next chunk
  1396. for( ;ptr!=chunks.end() && (navail=ptr->remainingBytes(offset))==0; ptr++)
  1397. ptr->releaseData(ofp->first);
  1398. // either end of file or point at chunk that we need to read from
  1399. // attempt to read past end of file is not an error
  1400. if( ptr==chunks.end() )
  1401. break;
  1402. // We now point at block that we need data from so issue read for
  1403. // this block as well as the following nReadAhead - depending on
  1404. // priority. If
  1405. for(raptr=ptr, racnt=0; racnt<=(priority>0 ? vbs_state.readAhead : 0) && raptr!=chunks.end(); racnt++, raptr++)
  1406. raptr->initiateRead( ofp->first, priority );
  1407. // We know that navail>0 at this point
  1408. const size_t n2r = std::min(n, navail);
  1409. unsigned char* data = ptr->readLockForData();
  1410. if( data==0 ) {
  1411. eno = errno;
  1412. DEBUG(-1, "vbs_read: " << ptr->pathToChunk << " FAILS: " << ::strerror(errno) << endl);
  1413. break;
  1414. }
  1415. // Ok data available!
  1416. ::memcpy(buff, data+(offset - ptr->chunkOffset), n2r);
  1417. buff += n2r;
  1418. n -= n2r;
  1419. offset += n2r;
  1420. // Indicate that we're done with this chunk for now
  1421. ptr->releaseReadLock();
  1422. }
  1423. of.curChunk = ptr;
  1424. of.previousEnd = offset;
  1425. DEBUG(3, "vbs_read#" << nr << "." << subn << "[" << path << ", " << ::pthread_self() << "] "
  1426. "offset=" << offset << " n=" << n << " errno=" << eno << " FINISHED; rv=" << (eno ? -eno : (norg - n)) << endl);
  1427. return eno ? -eno : (norg - n);
  1428. }
  1429. int vbs_release(char const* path, struct fuse_file_info* fi) {
  1430. scoped_lock_type slt( &vbs_state.chunkMutex );
  1431. openfilecache_type::iterator ofp = vbs_state.openfileCache.find( fi->fh );
  1432. DEBUG(1, "vbs_release[" << path << "] fh=" << fi->fh << endl);
  1433. if( ofp==vbs_state.openfileCache.end() )
  1434. return -ENOENT;
  1435. filechunks_type& chunks = ofp->second.recordingPtr->fileChunks;
  1436. // Wait for issued chunk-read-requests to finish and properly clean them up
  1437. for(filechunks_type::iterator fcptr = chunks.begin(); fcptr!=chunks.end(); fcptr++) {
  1438. if( fcptr->readLockForData() )
  1439. fcptr->releaseReadLock();
  1440. fcptr->releaseData( ofp->first );
  1441. }
  1442. // Do not forget to close the recording for this observer
  1443. ofp->second.recordingPtr->close( ofp->first );
  1444. DEBUG(1, "vbs_release[" << path << "] done" << endl);
  1445. vbs_state.openfileCache.erase( ofp );
  1446. return 0;
  1447. }
  1448. ///////////////////////////////////////////////
  1449. // A filesystem must define a set
  1450. // of operations with callback functions
  1451. // on the file system objects it
  1452. // represents. All callbacks are
  1453. // registered in the fuse_operations struct
  1454. ///////////////////////////////////////////////
  1455. struct vbs_fuse_operations :
  1456. public fuse_operations {
  1457. vbs_fuse_operations() {
  1458. init = &vbs_init;
  1459. destroy = &vbs_destroy;
  1460. getdir = NULL;
  1461. readdir = &vbs_readdir;
  1462. getattr = &vbs_getattr;
  1463. open = &vbs_open;
  1464. read = &vbs_read;
  1465. release = &vbs_release; // called on close(2)
  1466. }
  1467. };
  1468. vbs_fuse_operations vbs_oper;
  1469. #if 0
  1470. struct fuse_operations vbs_oper = {
  1471. //.getattr = vbs_getattr,
  1472. //.readdir = vbs_readdir,
  1473. //.open = vbs_open,
  1474. //.read = vbs_read,
  1475. //.readlink = vbs_readlink,
  1476. // no .getdir -- that's deprecated
  1477. //.getdir = NULL,
  1478. //.mknod = vbs_mknod,
  1479. //.mkdir = vbs_mkdir,
  1480. //.unlink = vbs_unlink,
  1481. //.rmdir = vbs_rmdir,
  1482. //.symlink = vbs_symlink,
  1483. //.rename = vbs_rename,
  1484. //.link = vbs_link,
  1485. //.chmod = vbs_chmod,
  1486. //.chown = vbs_chown,
  1487. //.truncate = vbs_truncate,
  1488. //.utime = vbs_utime,
  1489. //.write = vbs_write,
  1490. /** Just a placeholder, don't set */ // huh???
  1491. //.statfs = vbs_statfs,
  1492. //.flush = vbs_flush,
  1493. //.release = vbs_release,
  1494. //.fsync = vbs_fsync,
  1495. //.setxattr = vbs_setxattr,
  1496. //.getxattr = vbs_getxattr,
  1497. //.listxattr = vbs_listxattr,
  1498. //.removexattr = vbs_removexattr,
  1499. //.opendir = vbs_opendir,
  1500. //.releasedir = vbs_releasedir,
  1501. //.fsyncdir = vbs_fsyncdir,
  1502. .init = vbs_init,
  1503. .destroy = vbs_destroy,
  1504. //.access = vbs_access,
  1505. //.create = vbs_create,
  1506. //.ftruncate = vbs_ftruncate,
  1507. };
  1508. #endif
  1509. /////////////////////////////////////////////////////////////////
  1510. //
  1511. // Transform a list of globbety glob shell patterns
  1512. // into a set of directories such that we don't have
  1513. // duplicates
  1514. //
  1515. ////////////////////////////////////////////////////////////////
  1516. paths_type globbety_glob(patternlist_type const& patterns) {
  1517. int flags = GLOB_NOSORT;
  1518. paths_type retval;
  1519. #ifdef GLOB_BRACE
  1520. flags |= GLOB_BRACE;
  1521. #endif
  1522. for(patternlist_type::const_iterator p=patterns.begin(); p!=patterns.end(); p++) {
  1523. // globbety glob
  1524. glob_t globs;
  1525. struct stat st;
  1526. if( ::glob(p->c_str(), flags, 0, &globs)==0 ) {
  1527. // ok, we found matches, only add the ones that are directories
  1528. for(size_t i=0; i<globs.gl_pathc; i++)
  1529. if( ::lstat(globs.gl_pathv[i], &st)==0 && (st.st_mode&S_IFDIR)==S_IFDIR )
  1530. retval.insert( globs.gl_pathv[i] );
  1531. } else {
  1532. DEBUG(-1, "glob(" << *p << ") fails - " << ::strerror(errno) << endl);
  1533. }
  1534. ::globfree(&globs);
  1535. }
  1536. return retval;
  1537. }
  1538. /////////////////////////////////////////////////////////////////
  1539. // Command line handling
  1540. // There's only a few options
  1541. // that we support
  1542. // -h print help and exit
  1543. // --quick do not do a full index
  1544. // --version print version and exit
  1545. // -m <int> setting debug level [higher = more output]
  1546. // -I <pattern> add pattern to list of recordings to index
  1547. // -R <pattern> add pattern to list of mount points to index
  1548. // -v add flexbuff pattern to list of paths
  1549. // -6 add Mark6 pattern to list of paths
  1550. // -f FUSE 'foreground' flag
  1551. //
  1552. ////////////////////////////////////////////////////////////////
  1553. struct vbs_settings {
  1554. int read_ahead;
  1555. int debug_level;
  1556. bool help;
  1557. bool version;
  1558. bool fullScan;
  1559. patternlist_type indexPatterns;
  1560. patternlist_type patterns;
  1561. // Initialize with defaults
  1562. vbs_settings():
  1563. read_ahead( 0 ), debug_level( 0 ), help( false ), version( false ), fullScan( true )
  1564. {}
  1565. };
  1566. ostream& operator<<(ostream& os, const vbs_settings& vbo) {
  1567. os << "-m " << vbo.debug_level << " ";
  1568. if( vbo.version )
  1569. os << "--version ";
  1570. if( !vbo.fullScan )
  1571. os << "--quick ";
  1572. for(patternlist_type::const_iterator pattern=vbo.patterns.begin(); pattern!=vbo.patterns.end(); pattern++)
  1573. os << "-R " << *pattern << " ";
  1574. for(patternlist_type::const_iterator pattern=vbo.indexPatterns.begin(); pattern!=vbo.indexPatterns.end(); pattern++)
  1575. os << "-I " << *pattern << " ";
  1576. if( vbo.read_ahead>0 )
  1577. os << "-n " << vbo.read_ahead << " ";
  1578. if( vbo.help )
  1579. os << "-h ";
  1580. return os;
  1581. }
  1582. #define OFFSETOF(tp, fld) \
  1583. (size_t)((unsigned char*)(&(((tp*)1)->fld)) - (unsigned char*)1)
  1584. #define VBSFS_OPT(t, p, v) { t, OFFSETOF(struct vbs_settings, p), v }
  1585. struct fuse_opt vbs_options[] = {
  1586. VBSFS_OPT("-m %i", debug_level, 0),
  1587. VBSFS_OPT("-n %i", read_ahead, 0),
  1588. // For these we require specific processing [neither are just int or
  1589. // char* options]
  1590. {"-I %s" , -1U, 0x4A59}, // 'JY' - see help on the options ;-)
  1591. {"-R %s" , -1U, 42}, //
  1592. {"-6" , -1U, 666}, //
  1593. {"-v" , -1U, 112}, // All of these numbers
  1594. {"--help" , -1U, 911}, // were randomly chosen, obviously!
  1595. {"--version", -1U, 31415926},//
  1596. {"--quick" , -1U, 27182818},//
  1597. // Sentinel
  1598. {NULL, 0, 0}
  1599. };
  1600. int vbs_option_proc(void* data, const char* arg, int key, struct fuse_args* /*outargs*/) {
  1601. int rv = 0; // discard by default
  1602. const string arg_s( arg );
  1603. vbs_settings* settingsptr = (vbs_settings*)data;
  1604. DEBUG(2, "vbs_option_proc[key=" << key << " (non_opt=" << FUSE_OPT_KEY_NONOPT << ")]: processing " << arg_s << endl);
  1605. switch( key ) {
  1606. case 31415926:
  1607. settingsptr->version = true;
  1608. break;
  1609. case 27182818:
  1610. settingsptr->fullScan = false;
  1611. break;
  1612. case 0x4A59:
  1613. // we know that the arg looks like "-I<pattern>"
  1614. settingsptr->indexPatterns.push_back( arg_s.substr(2) );
  1615. break;
  1616. case 911:
  1617. settingsptr->help = true;
  1618. break;
  1619. case 112:
  1620. settingsptr->patterns.push_back( flexbuffPattern );
  1621. break;
  1622. case 666:
  1623. settingsptr->patterns.push_back( mk6Pattern );
  1624. break;
  1625. case 42:
  1626. // we know that the arg looks like "-R<pattern>"
  1627. settingsptr->patterns.push_back( arg_s.substr(2) );
  1628. break;
  1629. default:
  1630. // we didn't handle the argument so let's keep it!
  1631. rv = 1;
  1632. break;
  1633. }
  1634. return rv;
  1635. }
  1636. void Usage(string const& prog ) {
  1637. cout << "Usage: " << prog << " [options] /path/to/mountpoint" << endl << endl;
  1638. cout << "VBS specific command line arguments:" << endl << endl
  1639. << " -f run in foreground/do not background" << endl
  1640. << " --help print this message and exit" << endl
  1641. << " --version print version and exit" << endl
  1642. << " --quick do not do a full index at startup" << endl
  1643. << " this will perform a quick scan of recordings" << endl
  1644. << " but full meta data (modification time, size, etc)" << endl
  1645. << " for a recording will only be gathered (and cached)" << endl
  1646. << " if is opened" << endl
  1647. << " -m <int> set VBS_FS debug print level to <n>" << endl
  1648. << " higher <n> => more output, default 0 (no output)" << endl
  1649. << " -n <int> issue <int> chunks read-ahead. Only useful if files" << endl
  1650. << " are read in strict linear fashion, otherwise it will" << endl
  1651. << " sigificantly HURT performance (default '1')" << endl
  1652. << " -I <pattern> only index recordings matching <pattern>" << endl
  1653. << " This option can be present multiple times." << endl
  1654. << " Shell-style wildcard characters are supported;" << endl
  1655. << " the implementation uses fnmatch(3) for matching." << endl
  1656. << " [credits to Jun Yang(Onsala) for requesting this]" << endl
  1657. << " -R <pattern> add pattern to list of directories to index" << endl
  1658. << " if nothing specified, defaults to /mnt/disk*" << endl
  1659. << " Note: the -R option can be specified multiple times" << endl
  1660. #ifdef GLOB_BRACE
  1661. << " Note: \"{pat1, pat2}\" syntax is supported in the pattern" << endl
  1662. #endif
  1663. << " -6 shorthand for adding the Mark6 directories to the" << endl
  1664. << " search path: /mtn/disks/*/*/data" << endl
  1665. << " -v explicitly add the FlexBuff disk pattern to the list" << endl
  1666. << " of paths to search" << endl;
  1667. }
  1668. ///////////////////////////////////////////////
  1669. //
  1670. // The main function
  1671. //
  1672. ///////////////////////////////////////////////
  1673. int main(int argc, char** argv) {
  1674. // Before doing _anything_ check if we're not root/suid root
  1675. if( ::getuid()==0 || ::geteuid()==0 ) {
  1676. DEBUG(-1, "Running vbs_fuse as root opens unnacceptable security holes" << endl);
  1677. return -11;
  1678. }
  1679. // Now we can safely start parsing
  1680. int fuse_stat;
  1681. vbs_settings settings;
  1682. struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
  1683. // Do command line parsing
  1684. ::fuse_opt_parse(&args, (void*)&settings, &vbs_options[0], vbs_option_proc);
  1685. dbglev_fn( settings.debug_level );
  1686. DEBUG(1, "fuse_opt_arg done [" << settings << "]" << endl);
  1687. if( settings.version ) {
  1688. cout << "$Id$" << endl;
  1689. exit( 0 );
  1690. }
  1691. if( settings.help ) {
  1692. string prog( argv[0] );
  1693. string::size_type slash = prog.rfind('/');
  1694. Usage( (slash==string::npos) ? prog : prog.substr(slash+1) );
  1695. exit( 0 );
  1696. }
  1697. // Set read-ahead value - only if it's >0
  1698. if( settings.read_ahead<0 ) {
  1699. DEBUG(-1, "invalid value " << settings.read_ahead << " for read ahead!" << endl);
  1700. return -EINVAL;
  1701. }
  1702. // If no root pattern given, default to flexbuff pattern
  1703. if( settings.patterns.empty() ) {
  1704. DEBUG(-1, "WARN: No root dir specified for disks, defaulting to " << flexbuffPattern << endl);
  1705. settings.patterns.push_back( flexbuffPattern );
  1706. }
  1707. // use glob.glob to expand all of the patterns given on the cmd line
  1708. vbs_state.mountPoints = globbety_glob( settings.patterns );
  1709. if( settings.read_ahead!=vbs_state.readAhead )
  1710. vbs_state.readAhead = settings.read_ahead;
  1711. // Do not forget to copy the --quick option 8-/
  1712. vbs_state.fullIndex = settings.fullScan;
  1713. // And transform the list-of-patterns into set-of-patterns to avoid duplicates :-)
  1714. std::copy(settings.indexPatterns.begin(), settings.indexPatterns.end(),
  1715. std::insert_iterator<paths_type>(vbs_state.indexPatterns, vbs_state.indexPatterns.end()));
  1716. // Now it's about time to drop into FUSE's main loop
  1717. DEBUG(3, "dropping into fuse_main" << endl);
  1718. fuse_stat = fuse_main(args.argc, args.argv, &vbs_oper, &vbs_state);
  1719. DEBUG(3, "fuse_stat returned " << fuse_stat << endl);
  1720. return fuse_stat;
  1721. }
  1722. void scanVBSRecordingDirectory(string const& recname, string const& dir, filechunks_type& rv, struct stat& ts) {
  1723. DIR* dirp;
  1724. struct stat st;
  1725. const string mp(dir.substr(0, dir.rfind('/')));
  1726. direntries_type chunks;
  1727. isRecordingChunk predicate( recname );
  1728. if( (dirp=::opendir(dir.c_str()))==0 ) {
  1729. DEBUG(4, "scanVBSRecordingDirectory(" << recname << ")/ ::opendir(" << dir << ") fails - " << ::strerror(errno) << endl);
  1730. return;
  1731. }
  1732. chunks = dir_filter(dirp, predicate);
  1733. ::closedir(dirp);
  1734. // If we find duplicates, now *that* is a reason to throw up
  1735. for(direntries_type::const_iterator p=chunks.begin(); p!=chunks.end(); p++) {
  1736. const string fnam( dir + "/" + *p );
  1737. if( ::lstat(fnam.c_str(), &st)!=0 ) {
  1738. DEBUG(4, "scanVBSRecordingDirectory(" << recname << ")/ ::lstat(" << fnam << ") fails - " << ::strerror(errno) << endl);
  1739. continue;
  1740. }
  1741. update(ts, st);
  1742. // parse the chunk sequence number. Note that "*p" matched the
  1743. // isRecordingChunk predicate, i.e. looks like "<recname>.XXXXXXXX"
  1744. string::size_type dot = p->rfind('.');
  1745. const unsigned long seqno = ::strtoul(p->substr(dot+1).c_str(), 0, 10);
  1746. EZASSERT2((rv.insert(filechunk_type(mp, fnam, (unsigned int)seqno, st.st_size))).second,
  1747. vbs_except, EZINFO(" duplicate insert for chunk " << *p));
  1748. }
  1749. }
  1750. void scanMk6RecordingFile(string const& /*recname*/, string const& file, filechunks_type& rv, struct stat& ts) {
  1751. int fd;
  1752. off_t fpos;
  1753. struct stat st;
  1754. const string mp(file.substr(0, file.rfind('/')));
  1755. const size_t fh_size = sizeof(mk6_file_header);
  1756. const size_t wb_size = sizeof(mk6_wb_header_v2);
  1757. unsigned char buf[ fh_size+wb_size ];
  1758. mk6_file_header* fh6 = (mk6_file_header*)&buf[0];
  1759. mk6_wb_header_v2* wbh = (mk6_wb_header_v2*)&buf[0];
  1760. // File existence has been checked before so now we MUST be able to open it
  1761. ASSERT2_POS( fd=::open(file.c_str(), O_RDONLY), SCINFO(" failed to open file " << file) );
  1762. // It may well not be a Mk6 recording, for all we know
  1763. if( ::read(fd, fh6, sizeof(mk6_file_header))!=sizeof(mk6_file_header) ) {
  1764. DEBUG(4, "scanMk6RecordingFile[" << file << "]: fail to read mk6 header - " << ::strerror(errno) << endl);
  1765. ::close(fd);
  1766. return;
  1767. }
  1768. if( ::fstat(fd, &st)!=0 ) {
  1769. DEBUG(4, "scanMk6RecordingFile[" << file << "]: fail to ::fstat - " << ::strerror(errno) << endl);
  1770. } else {
  1771. update(ts, st);
  1772. }
  1773. DEBUG(4, "scanMk6RecordingFile[" << file << "]: starting" << endl);
  1774. // Ok. Now we should just read all the blocks in this file!
  1775. fpos = fh_size;
  1776. while( ::read(fd, wbh, wb_size)==(ssize_t)wb_size ) {
  1777. // Don't forget that the block sizes written in the Mark6 files are
  1778. // including the write-block-header size! (Guess how I found out
  1779. // that I'd forgotten just that ...)
  1780. // Make sure there's sense in the block number and size, otherwise better give up
  1781. EZASSERT2(wbh->blocknum>=0 && wbh->wb_size>0, vbs_except,
  1782. EZINFO(" found bogus stuff in write block header @" << fpos << " in " << file <<
  1783. ", block# " << wbh->blocknum << ", sz=" << wbh->wb_size);
  1784. ::close(fd));
  1785. // Ok, found another block!
  1786. fpos += wb_size;
  1787. // We cannot tolerate duplicate inserts
  1788. EZASSERT2(rv.insert(filechunk_type(mp, file, (unsigned int)wbh->blocknum, wbh->wb_size-wb_size, fpos)).second, vbs_except,
  1789. EZINFO(" duplicate insert for chunk " << wbh->blocknum); ::close(fd) );
  1790. // Advance file pointer
  1791. fpos += (wbh->wb_size - wb_size);
  1792. if( ::lseek(fd, fpos, SEEK_SET)==(off_t)-1 ) {
  1793. DEBUG(4, "scanMk6RecordingFile[" << file << "]: failed to seek to next block @" <<
  1794. fpos << " - " << ::strerror(errno) << endl);
  1795. break;
  1796. }
  1797. }
  1798. // Actually, why don't I just close the file when done?
  1799. // Thanks for AlexanderN and ChristianP for reporting the issue ...
  1800. ::close(fd);
  1801. DEBUG(4, "scanMk6RecordingFile[" << file << "]: done" << endl);
  1802. }