Coverage for src/susi/db/filedb.py: 39%
285 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-22 09:20 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-22 09:20 +0000
1# -*- coding: utf-8 -*-
2"""
3Provides methods used to search and navigate the SUSI database file structure
5@author: iglesias, hoelken
6"""
7import copy
8import glob
9import logging
10import os
11import pickle
12import subprocess
13from datetime import datetime, timedelta
14from typing import Iterable, Union
16import matplotlib.dates as mdates
17import matplotlib.pyplot as plt
18import numpy as np
20from .remote_connection import RemoteConnection
21from ..base import Config, IllegalArgumentException, Globals, DATE_OBS, InsufficientDataException, TIMESTAMP_US
22from ..io import Fits
23from ..plot import matplot_backend
24from ..utils import Collections
26log = logging.getLogger('SUSI')
29class FileDB:
30 """
31 Tools to search and navigate SUSI file database
32 """
34 def __init__(self, config: Config = Config(), remote_db: RemoteConnection = RemoteConnection()):
35 #: Configuration
36 self.config = config
37 #: Connection information for a remote (ssh) file db. Default: Do not use a remote.
38 #: If used the file paths found are still returned relative to `self.config.data.root`
39 #: to be directly used for local reading.
40 self.remote_db = remote_db
42 def dir_path(self, time: datetime, depth: str = 'full', base=None) -> str:
43 """
44 Returns a string with the path to the level0.1 directory where the data for input time is located.
46 NOTE:
47 if base is set, it is used as the base directory instead of `self.remote_db.base` or `self.config.data.root`
48 if self.remote_db['base'] is not None, then paths relative to remote base are returned
50 ## Params
51 - time [datetime] Time of the dir to return
52 - depth: Use it to select the deepness of the returned path. Options are:
53 -'full' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh/cam`
54 -'upto_hour' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh`
55 -'upto_day' Returns: `base/level_dir/YYYY_MM_DD`
56 -'upto_level' Returns: `base/level_dir`
57 -'base' Returns: `base`
58 -'day' Returns: `YYYY_MM_DD`
59 -'hour' Returns: `YYYY_MM_DD_hh`
60 -'level' Returns: `level_dir`
61 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh`
62 -'day-hour-cam' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh/cam`
63 - base: [string] If set, it is used as the base directory instead
64 of `self.remote_db.base` or `self.config.data.root`
65 """
66 if base is None:
67 base_dir = self.remote_db.base if self.remote_db else self.config.data.root
68 else:
69 base_dir = base
71 src = os.path.realpath(os.path.join(base_dir, self.config.data.level))
73 if depth == 'full':
74 return os.path.join(src, self.dir_structure(time, 'day-hour-cam'))
75 elif depth == 'upto_hour':
76 return os.path.join(src, self.dir_structure(time, 'day-hour'))
77 elif depth == 'upto_day':
78 return os.path.join(src, self.dir_structure(time, 'day'))
79 elif depth == 'upto_level':
80 return src
81 elif depth == 'base':
82 return base_dir
83 elif depth == 'level':
84 return self.config.data.level
85 else:
86 return self.dir_structure(time, depth)
88 def file_short_basename(self, file: str, avg=False, cam=True) -> str:
89 """
90 Returns the short basename of the file by removing year, month and extension (keeping day) from the file name.
92 Assumed file format is: `*_YYYYmmdd_HHMMSS_XXXXXXX_cam.ext`
93 the output is `dd_HHMMSS_XXXXXXX_cam`
95 if avg, the assumed format is: YYYYMMDD_HHMMSS_XXXXXXX-YYYYMMDD_HHMMSS_XXXXXXX>-<cam>.<ext>
96 the output is `DD_HHMMSS_XXXXXXX-DD_HHMMSS_XXXXXXX-cam`
98 :param: file [string] full path or file name
99 :param: avg [bool] If set, the file name is assumed to have the average filename format
100 :param: cam [bool] If set, the camera name is included in the output
101 """
102 if not avg:
103 fname = os.path.basename(file).split('_')[1:4]
104 fname = '_'.join(fname)
105 fname = fname[6:]
106 else:
107 fname = os.path.basename(file).split('-')[0:2]
108 fname1 = self.file_short_basename('bla_' + fname[0], cam=False)
109 fname2 = self.file_short_basename('bla_' + fname[1], cam=False)
110 fname = f'{fname1}-{fname2}'
112 if cam and self.config.cam.name:
113 fname += f'-{self.config.cam.name}'
114 return fname
116 def dir_structure(self, time: datetime, depth: str) -> str:
117 """
118 Returns a string with the intermediate path part from the level to the file
119 ## Params
120 - time [datetime] Time of the dir to return
121 - depth: Use it to select the deepness of the returned path. Options are:
122 -'day' Returns: `YYYY_MM_DD`
123 -'hour' Returns: `YYYY_MM_DD_hh`
124 -'cam' Returns: `camX`
125 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh`
126 -'day-hour-cam' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh/camX`
127 """
128 if depth == 'hour':
129 return time.strftime('%Y_%m_%d_%H')
130 elif depth == 'day':
131 return time.strftime('%Y_%m_%d')
132 elif depth == 'cam':
133 return self.config.cam.name
134 elif depth == 'day-hour':
135 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'))
136 elif depth == 'day-hour-cam':
137 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), self.config.cam.name)
138 else:
139 log.critical('Value of depth "%s" not recognized.', depth)
140 raise IllegalArgumentException('Value of depth not recognized.')
142 def avrg_fname(self, f_start: str, f_end: str, appendix='', avg=False) -> str:
143 """
144 Returns the file name for a file that was composed of multiple input files.
145 Per convention the file name should contain the time of the first and last files.
147 ### Returns
148 `<f_start:YYYYMMDD_HHMMSS_XXXXXXX>-<f_end:YYYYMMDD_HHMMSS_XXXXXXX>-<cam>.<ext>`
150 :param: appendix [string] to append to the end of the file name before extension
151 :param: avg [bool] If set, the file name of f_start and f_stop are assumed
152 to have already the average filename format. So we keep the very first and last parts
153 """
154 f1_part = FileDB.file_time(f_start, time_and_num_as_string=True, avg=avg)
155 f2_part = FileDB.file_time(f_end, time_and_num_as_string=True, avg=avg, avg_last=avg)
156 ext = self.config.data.ext_out
157 cam = self.config.cam.name
158 return f"{f1_part}-{f2_part}-{cam}{appendix}{ext}"
160 @staticmethod
161 def file_time(file, time_as_string=False, time_and_num_as_string=False, avg=False, avg_last=False):
162 """
163 Returns the time from the input file name (resolution of seconds).
164 NOTE that this is not the timestamp.
166 Assumed file format is: '*_YYYYmmdd_HHMMSS_*'
168 :param: file [string] full path or file name
169 :param: time_as_string. If set, returns the string 'YYYYmmdd_HHMMSS'
170 instead of a datetime object
171 :param: time_and_num_as_string. If set, returns the string
172 'YYYYmmdd_HHMMSS_ID' instead of a datetime object
173 Where ID is the 7 digit unique frame identifier
174 :param: avg. If set, the file name is assumed to have the average filename format
175 It returns the STARTING or ENDING time of the average, see avg_last
176 :param: avg_last. If True it returns the ENDING file time of the average
177 """
178 if avg:
179 if avg_last:
180 ftime = 'im_' + os.path.basename(file).split('-')[1] + '_cam'
181 else:
182 ftime = 'im_' + os.path.basename(file).split('-')[0] + '_cam'
183 else:
184 ftime = os.path.basename(file).split('.')[0]
185 ftime = ftime.split('_')
186 if time_as_string:
187 return f'{ftime[1]}_{ftime[2]}'
188 elif time_and_num_as_string:
189 return f'{ftime[1]}_{ftime[2]}_{ftime[3]}'
190 else:
191 return datetime.strptime(f'{ftime[1]}_{ftime[2]}', '%Y%m%d_%H%M%S')
193 def file_path(self, base: str, fits: Fits) -> str:
194 """Return the appropriate full file path based on a given base directory"""
195 return os.path.join(
196 base, self.dir_structure(fits.observation_time(), 'day-hour-cam'), os.path.basename(fits.path)
197 )
199 def base_out_path(self, module: list) -> str:
200 """Return the level out path depending on the configured (default) root and the pipeline module applied."""
201 level = Globals.pipeline(self.config.base.pipeline).index(module)
202 if self.config.data.custom_pipeline_dir is not None:
203 output_subdir = self.config.data.custom_pipeline_dir
204 else:
205 output_subdir = self.config.base.pipeline
206 if level == 0 and module == ['C']:
207 return os.path.join(self.config.out_path(), 'level_0')
208 if self.config.data.custom_out_levels is not None:
209 out_level = self.config.data.custom_out_levels[level]
210 else:
211 out_level = level
212 return os.path.join(self.config.out_path(), output_subdir, f'level_{out_level}')
214 def data_paths(self, start: datetime, stop: datetime = None) -> list:
215 """
216 Returns a sorted list of paths to folders that should contain all files acquired between times start and stop
217 """
218 paths = [self.dir_path(start)]
219 if stop is not None:
220 one_hour = timedelta(hours=1)
221 interim = start + one_hour
222 while interim < stop + one_hour:
223 paths.append(self.dir_path(interim))
224 interim += one_hour
225 return paths
227 def search_files(
228 self,
229 utc_start: Union[str, datetime],
230 utc_stop: Union[str, datetime] = None,
231 avg: bool = False,
232 accurate_start: Union[str, datetime] = None,
233 print_info=True,
234 ascending_imgnumber: bool = True,
235 ) -> list:
236 """
237 Returns a sorted list of paths to files acquired between times start
238 and stop found in the SUSI data based specified by self.config
240 WARNING!! Only accurate to the second. The file name is used to determine the
241 time of the file. DO NOT USE FRACTIONAL SECONDS in utc_start and utc_stop.
242 For accurate start use accurate_start
244 Note that if self.remote_db['ssh'] is ot None, then the search is done
245 in a remote machine. However, the returned file paths are relative to
246 self.config.data.root to be directly used for local reading.
248 @author: iglesias, Inspired by /bin/susi_data.py by Hoelken.
250 :param: utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00
251 :param: utc_stop. End time in ISO8601, if not set, only all files
252 :param: avg. If set, search for files that are the result of an average, which
253 follow a different nameing convetion
254 :param: accurate_start. If set, the first frame returned matches accurate_start
255 down to config.base.time_delta_cam_sync. If is not possible fatal error is raised.
256 If not set, the first frame is accurate down to 1s (given by filename)
257 :param: print_info. If set, prints the number of files found and the first and last file names
258 :param: ascending_imgnumber. If set, rise an error if files do not have ascending image numbers.
259 """
260 start = utc_start if isinstance(utc_start, datetime) else datetime.fromisoformat(utc_start)
261 stop = None
262 if utc_stop is not None:
263 stop = utc_stop if isinstance(utc_stop, datetime) else datetime.fromisoformat(utc_stop)
265 candidates = self._collect_files(start, stop, avg=avg)
266 if len(candidates) == 0:
267 log.warning('NO FILES FOUND!')
268 return []
269 files = self._reject_non_matching(candidates, start, stop, avg=avg)
270 if not files:
271 log.warning('NO MATCHING FILES FOUND!')
272 return []
274 if accurate_start is not None:
275 files = self._select_start_frame(files, accurate_start)
277 if ascending_imgnumber:
278 img_numbers = [FileDB.fname_img_number(f, avg=avg) for f in files]
279 asc_num = [img_numbers[i] <= img_numbers[i + 1] for i in range(len(img_numbers) - 1)]
280 if not all(asc_num):
281 wrong_pairs = [
282 f'{files[i]} ({img_numbers[i]}) - {files[i + 1]} ({img_numbers[i + 1]})'
283 for i in range(len(asc_num))
284 if not asc_num[i]
285 ]
286 log.warning(f'Files sorted by file name do not have ascending file number')
287 log.warning(f'Resorting by DATE_OBS header keword, this may take a while...')
288 files = self.sort_by_date_obs(files)
290 if print_info:
291 log.info('Found %s file(s) within %s and %s', len(files), utc_start, utc_stop)
292 log.info('First file is: %s', os.path.basename(files[0]))
293 log.info('Last file is: %s', os.path.basename(files[-1]))
294 return files
296 def sort_by_date_obs(self, files: list) -> list:
297 """
298 Sorts the input list of files by the DATE_OBS header keyword.
299 This is more accurate than sorting by file name, by requires reading the header of each file.
300 :param: files [list] List of file names to sort
301 :return: [list] Sorted list of file names
302 """
303 from ..io import FitsBatch # to avoid circular ref ?
305 try:
306 batch = FitsBatch().load(files, header_only=True, sort_by=DATE_OBS)
307 except Exception as e:
308 log.warning(f'Error sorting by {DATE_OBS}, sorting by {TIMESTAMP_US} instead: {e}')
309 batch = FitsBatch().load(files, header_only=True, sort_by=TIMESTAMP_US)
311 return batch.file_list()
313 def _select_start_frame(self, files: list, start: datetime) -> list:
314 """
315 Search within the first 48 files for a frame that has OBS_TIME within 0.5 exposure of start.
316 If found that becomes the first frame of flist (previous are deleted).
317 If not found, raise an error.
318 """
319 nfiles_to_search = 48
320 max_tdelta = self.config.base.time_delta_cam_sync / 1e6
321 log.info(f'Selecting start frame closest to {start}')
322 for i in range(len(files[0:nfiles_to_search])):
323 fits = Fits(files[i]).read(header_only=True)
324 ftime = fits.header[DATE_OBS]
325 ftime = datetime.strptime(ftime, '%Y-%m-%dT%H:%M:%S.%f')
326 delta = abs((start - ftime).total_seconds())
327 if delta < max_tdelta:
328 log.info('Found frame %s at %s', os.path.basename(files[i]), ftime)
329 return files[i:]
330 raise InsufficientDataException(
331 f'No frame found within {format(max_tdelta, ".4f")} s of {start}. Check the start time in your config file.'
332 )
334 def _collect_files(self, start: datetime, stop: datetime = None, avg: bool = False) -> list:
335 if avg:
336 file_pattern = f'*-{self.config.cam.name}{self.config.data.ext}'
337 else:
338 file_pattern = f'{self.config.data.file_prefix}_*_{self.config.cam.name}{self.config.data.ext}'
339 log.debug('Collecting files matching: %s', file_pattern)
340 file_lists = []
341 for path in self.data_paths(start, stop):
342 if self.remote_db:
343 files = self._find_files_remotely(path, file_pattern)
344 else:
345 files = self._find_files_locally(path, file_pattern)
346 log.debug('\t ... found %s file(s)', len(files))
347 if len(files) > 0:
348 file_lists.append(files)
349 return file_lists
351 def _find_files_remotely(self, path: str, file_pattern: str) -> list:
352 cmd = f"find {os.path.join(path, '.')} -name \'{file_pattern}\'"
353 log.debug('Executing %s on %s', cmd, self.remote_db.ssh)
354 ssh = subprocess.Popen(
355 ["ssh", "%s" % self.remote_db.ssh, cmd], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
356 )
358 files = ssh.stdout.readlines()
359 files = [f.decode('UTF-8').splitlines()[0].replace(self.remote_db.base, self.config.data.root) for f in files]
360 return sorted(files)
362 def _find_files_locally(self, path: str, file_pattern: str) -> list:
363 log.debug('Entering %s', path)
364 return sorted(glob.glob(os.path.join(path, file_pattern)))
366 def _reject_non_matching(self, file_lists: list, start: datetime, stop: datetime = None, avg: bool = False) -> list:
367 log.debug('rejecting files that do not match conditions...')
368 file_lists = Collections.flatten_sort(file_lists)
369 FileDB._reject_pre_start(file_lists, start, avg=avg)
370 FileDB._reject_post_end(file_lists, stop, avg=avg)
371 return file_lists
373 @staticmethod
374 def _reject_pre_start(files: list, start: datetime, avg: bool = False) -> None:
375 """
376 removes the entries from the given files list, where the timestamps taken
377 from the file names (s resolution) are earlier than the lookup start time.
378 I.e. timestamps of files in first folder must be > start time.
379 For average files (avg=True), it considers the start (first) timestamp from the file name.
381 Note: Rejection will be performed inplace.
382 """
383 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) < start]
385 for index in sorted(rejected, reverse=True):
386 del files[index]
388 @staticmethod
389 def _reject_post_end(files: list, stop: datetime, avg: bool = False) -> None:
390 """
391 removes the entries from the given files list, where the timestamps taken from the file names (s resolution) are
392 later than the lookup end time. I.e. timestamps of files in last folder must be < end time.
393 For average files (avg=True), it considers the end timestamp from the file name.
395 Note: Rejection will be performed inplace.
396 """
397 if stop is None:
398 return
400 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) > stop]
401 for index in sorted(rejected, reverse=True):
402 del files[index]
404 def check_obs_intervals(
405 self,
406 utc_start: str = None,
407 utc_stop: str = None,
408 file_list: Iterable = None,
409 max_gap: int = 3,
410 raise_error: bool = False,
411 out_dir: str = None,
412 ) -> list:
413 """
414 Use it to check the number of different observations recorded during
415 the input time interval (or files list)
417 Returns an empty list if there is no time gap
418 larger than criterion [s] between consecutive frames found in
419 the database from utc_start to utc_stop. The frames timestamps
420 are taken from the file names.
422 Otherwise, returns and logs a list of dict, each giving
423 the starting and ending file names of the detected observing intervals
424 (observations). It also plots to a png file and saves the list in self.config.log_dir
426 ## Params
427 - utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00
428 - utc_stop. End time in ISO8601, if not set, only all files
429 acquired at utc_start are returned
430 - file_list If given `utc_start` and `utc_stop` is ignored. The dataset within the file_list is
431 - max_gap The maximum time gap that separates frames from another within the same observation in seconds
432 if this criterion is exceeded the dataset is expected to span multiple observations
433 - raise_error If `True` (default `False`) then an error is raised if more than one observation is found.
434 """
436 if utc_start is not None:
437 files = self.search_files(utc_start, utc_stop=utc_stop)
438 elif file_list is not None:
439 files = file_list
440 else:
441 raise IllegalArgumentException('Either utc_start or file_list must be specified... ')
443 times = [FileDB.file_time(os.path.basename(f)) for f in files]
444 obs_times = []
445 obs_files = []
446 obs_id = 0
447 for i in range(len(times) - 1):
448 time_delta = times[i + 1] - times[i]
449 if time_delta.seconds > max_gap:
450 if obs_id == 0:
451 # first interval
452 obs_times.append({'start': times[0], 'stop': times[i]})
453 obs_files.append({'start': files[0], 'stop': files[i]})
454 else:
455 # intermideate intervals
456 obs_times.append({'start': times[obs_id + 1], 'stop': times[i]})
457 obs_files.append({'start': files[obs_id + 1], 'stop': files[i]})
458 obs_id = copy.copy(i)
459 # adds last interval
460 if len(obs_times) > 0:
461 obs_times.append({'start': times[obs_id + 1], 'stop': times[-1]})
462 obs_files.append({'start': files[obs_id + 1], 'stop': files[-1]})
464 if len(obs_times) > 1:
465 log.error('Found %s different observing intervals:', len(obs_times))
466 for i in range(len(obs_times)):
467 log.error(' %s -- %s', os.path.basename(obs_files[i]['start']), os.path.basename(obs_files[i]['stop']))
468 mins = min(times).strftime('%Y%m%dT%H%M%S')
469 maxs = max(times).strftime('%Y%m%dT%H%M%S')
471 if out_dir is not None:
472 cam = self.config.cam.name
473 ofile_img = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.png')
474 ofile_list = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.pkl')
475 log.error('Dumping observed intervals to %s and %s ...', ofile_img, ofile_list)
476 if len(obs_times) < 100:
477 FileDB._plot_obs_intervals(ofile_img, times, obs_times)
478 else:
479 log.warning('Too many observations, I will skip the plot but still save the list.')
480 FileDB._dump_obs_files(ofile_list, files, obs_files, obs_times)
481 if raise_error:
482 log.critical(f'Found {len(obs_times)} observations in the dataset. Only one was expected.')
483 if out_dir is not None:
484 log.critical('Details can be found in')
485 log.critical('\t %s', ofile_img)
486 log.critical('\t %s', ofile_list)
487 raise ValueError(f'Found {len(obs_times)} observations in the dataset.')
489 return obs_files
491 @staticmethod
492 def _plot_obs_intervals(out_file: str, times: list, obs_times: list) -> None:
493 with matplot_backend('Agg'):
494 fig = plt.figure(figsize=(15, 7))
495 fig.suptitle('SUSI observed intervals\nIf the time gap is not visible follow the red lines')
496 plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%dT%H%M%S'))
497 plt.gca().xaxis.set_major_locator(mdates.MinuteLocator())
498 plt.plot(times, np.ones(len(times)), '*')
499 plt.gcf().autofmt_xdate()
500 plt.xlabel('Frame time from file name (1[s] resolution)')
502 for i in range(len(obs_times)):
503 plt.axvline(x=obs_times[i]['start'], color='r', linestyle='--')
504 plt.axvline(x=obs_times[i]['stop'], color='r', linestyle='--')
506 plt.savefig(out_file)
507 plt.close()
509 @staticmethod
510 def _dump_obs_files(out_file: str, files: list, obs_files: list, obs_times: list) -> None:
511 obs_file_path = out_file.split('.')[0] + '_reduced.pkl'
512 with open(out_file, 'wb', 0o775) as handle:
513 all_ofile_list = []
514 for i in range(len(obs_times)):
515 starti = files.index(obs_files[i]['start'])
516 stopi = files.index(obs_files[i]['stop'])
517 all_ofile_list.append(files[starti:stopi])
518 pickle.dump(all_ofile_list, handle, protocol=pickle.HIGHEST_PROTOCOL)
519 handle.close()
521 with open(obs_file_path, 'wb', 0o775) as handle:
522 pickle.dump(obs_files, handle, protocol=pickle.HIGHEST_PROTOCOL)
523 handle.close()
525 @staticmethod
526 def fname_tstmp(fname: str, tstmp_start: int, tstmp_end: int, avg: bool = False) -> int:
527 """Cutout the timestamp part of the file name and return it as int"""
528 if avg:
529 delta = len('HHmmSS_XXXXXXX-YYYYMMDD_')
530 return [
531 int(os.path.basename(fname)[tstmp_start:tstmp_end]),
532 int(os.path.basename(fname)[tstmp_start + delta : tstmp_end + delta]),
533 ]
534 else:
535 return int(os.path.basename(fname)[tstmp_start:tstmp_end])
537 @staticmethod
538 def tstmp_path(root: str, time: datetime, cam: str) -> str:
539 """Generates a file path within the given root folder based on time and cam"""
540 return os.path.join(root, time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), cam)
542 @staticmethod
543 def fname_img_number(fname: str, avg: bool = False) -> int:
544 """
545 Returns the image number from the file name.
546 If avg is True, it returns the both image numbers in the average file name.
547 """
548 if avg:
549 return [
550 int(os.path.basename(fname).split('-')[0].split('_')[2]),
551 int(os.path.basename(fname).split('-')[1].split('_')[2]),
552 ]
553 else:
554 return int(os.path.basename(fname).split('_')[3])