Coverage for src/susi/db/filedb.py: 42%
251 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1# -*- coding: utf-8 -*-
2"""
3Provides methods used to search and navigate the SUSI database file structure
5@author: iglesias, hoelken
6"""
7import copy
8import glob
9import logging
10import os
11import pickle
12import subprocess
13from datetime import datetime, timedelta
14from typing import Iterable, Union
16import matplotlib.dates as mdates
17import matplotlib.pyplot as plt
18import numpy as np
19from src.susi import InsufficientDataException
21from .remote_connection import RemoteConnection
22from ..base import Config, IllegalArgumentException, Globals, DATE_OBS
23from ..io import Fits
24from ..plot import matplot_backend
25from ..utils import Collections
27log = logging.getLogger('SUSI')
30class FileDB:
31 """
32 Tools to search and navigate SUSI file database
33 """
35 def __init__(self, config: Config = Config(), remote_db: RemoteConnection = RemoteConnection()):
36 #: Configuration
37 self.config = config
38 #: Connection information for a remote (ssh) file db. Default: Do not use a remote.
39 #: If used the file paths found are still returned relative to `self.config.data.root`
40 #: to be directly used for local reading.
41 self.remote_db = remote_db
43 def dir_path(self, time: datetime, depth: str = 'full') -> str:
44 """
45 Returns a string with the path to the level0.1 directory where the data for input time is located.
47 Note that if self.remote_db['base'] is not None, then paths relative to remote base are returned
49 ## Params
50 - time [datetime] Time of the dir to return
51 - depth: Use it to select the deepness of the returned path. Options are:
52 -'full' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh/cam`
53 -'upto_hour' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh`
54 -'upto_day' Returns: `base/level_dir/YYYY_MM_DD`
55 -'upto_level' Returns: `base/level_dir`
56 -'base' Returns: `base`
57 -'day' Returns: `YYYY_MM_DD`
58 -'hour' Returns: `YYYY_MM_DD_hh`
59 -'level' Returns: `level_dir`
60 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh`
61 """
62 base_dir = self.remote_db.base if self.remote_db else self.config.data.root
63 src = os.path.realpath(os.path.join(base_dir, self.config.data.level))
65 if depth == 'full':
66 return os.path.join(src, self.dir_structure(time, 'day-hour-cam'))
67 elif depth == 'upto_hour':
68 return os.path.join(src, self.dir_structure(time, 'day-hour'))
69 elif depth == 'upto_day':
70 return os.path.join(src, self.dir_structure(time, 'day'))
71 elif depth == 'upto_level':
72 return src
73 elif depth == 'base':
74 return base_dir
75 elif depth == 'level':
76 return self.config.data.level
77 else:
78 return self.dir_structure(time, depth)
80 def dir_structure(self, time: datetime, depth: str) -> str:
81 """
82 Returns a string with the intermediate path part from the level to the file
83 ## Params
84 - time [datetime] Time of the dir to return
85 - depth: Use it to select the deepness of the returned path. Options are:
86 -'day' Returns: `YYYY_MM_DD`
87 -'hour' Returns: `YYYY_MM_DD_hh`
88 -'cam' Returns: `camX`
89 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh`
90 -'day-hour-cam' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh/camX`
91 """
92 if depth == 'hour':
93 return time.strftime('%Y_%m_%d_%H')
94 elif depth == 'day':
95 return time.strftime('%Y_%m_%d')
96 elif depth == 'cam':
97 return self.config.cam.name
98 elif depth == 'day-hour':
99 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'))
100 elif depth == 'day-hour-cam':
101 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), self.config.cam.name)
102 else:
103 log.critical('Value of depth "%s" not recognized.', depth)
104 raise IllegalArgumentException('Value of depth not recognized.')
106 def avrg_fname(self, f_start: str, f_end: str, appendix='', avg=False) -> str:
107 """
108 Returns the file name for a file that was composed of multiple input files.
109 Per convention the file name should contain the time of the first and last files.
111 ### Returns
112 `<f_start:YYYYMMDD_HHMMSS_XXXXXXX>-<f_end:YYYYMMDD_HHMMSS_XXXXXXX>-<cam>.<ext>`
114 :param: appendix [string] to append to the end of the file name before extension
115 :param: avg [bool] If set, the file name of f_start and f_stop are assumed
116 to have already the average filename format. So we keep the very first and last parts
117 """
118 f1_part = FileDB.file_time(f_start, time_and_num_as_string=True, avg=avg)
119 f2_part = FileDB.file_time(f_end, time_and_num_as_string=True, avg=avg, avg_last=avg)
120 ext = self.config.data.ext_out
121 cam = self.config.cam.name
122 return f"{f1_part}-{f2_part}-{cam}{appendix}{ext}"
124 @staticmethod
125 def file_time(file, time_as_string=False, time_and_num_as_string=False, avg=False, avg_last=False):
126 """
127 Returns the time from the input file name (resolution of seconds).
128 NOTE that this is not the timestamp.
130 Assumed file format is: '*_YYYYmmdd_HHMMSS_*'
132 :param: file [string] full path or file name
133 :param: time_as_string. If set, returns the string 'YYYYmmdd_HHMMSS'
134 instead of a datetime object
135 :param: time_and_num_as_string. If set, returns the string
136 'YYYYmmdd_HHMMSS_ID' instead of a datetime object
137 Where ID is the 7 digit unique frame identifier
138 :param: avg. If set, the file name is assumed to have the average filename format
139 It returns the STARTING or ENDING time of the average, see avg_last
140 :param: avg_last. If True it returns the ENDING file time of the average
141 """
142 if avg:
143 if avg_last:
144 ftime = 'im_' + os.path.basename(file).split('-')[1] + '_cam'
145 else:
146 ftime = 'im_' + os.path.basename(file).split('-')[0] + '_cam'
147 else:
148 ftime = os.path.basename(file).split('.')[0]
149 ftime = ftime.split('_')
150 if time_as_string:
151 return f'{ftime[1]}_{ftime[2]}'
152 elif time_and_num_as_string:
153 return f'{ftime[1]}_{ftime[2]}_{ftime[3]}'
154 else:
155 return datetime.strptime(f'{ftime[1]}_{ftime[2]}', '%Y%m%d_%H%M%S')
157 def file_path(self, base: str, fits: Fits) -> str:
158 """Return the appropriate full file path based on a given base directory"""
159 return os.path.join(
160 base, self.dir_structure(fits.observation_time(), 'day-hour-cam'), os.path.basename(fits.path)
161 )
163 def base_out_path(self, module: list) -> str:
164 """Return the level out path depending on the configured (default) root and the pipeline module applied."""
165 level = Globals.pipeline(self.config.base.pipeline).index(module)
166 if self.config.data.custom_pipeline_dir is not None:
167 output_subdir = self.config.data.custom_pipeline_dir
168 else:
169 output_subdir = self.config.base.pipeline
170 if level == 0 and module == ['C']:
171 return os.path.join(self.config.out_path(), 'level_0')
172 if self.config.data.custom_out_levels is not None:
173 out_level = self.config.data.custom_out_levels[level]
174 else:
175 out_level = level
176 return os.path.join(self.config.out_path(), output_subdir, f'level_{out_level}')
178 def data_paths(self, start: datetime, stop: datetime = None) -> list:
179 """
180 Returns a sorted list of paths to folders that should contain all files acquired between times start and stop
181 """
182 paths = [self.dir_path(start)]
183 if stop is not None:
184 one_hour = timedelta(hours=1)
185 interim = start + one_hour
186 while interim < stop + one_hour:
187 paths.append(self.dir_path(interim))
188 interim += one_hour
189 return paths
191 def search_files(
192 self,
193 utc_start: Union[str, datetime],
194 utc_stop: Union[str, datetime] = None,
195 avg: bool = False,
196 accurate_start: Union[str, datetime] = None,
197 print_info=True,
198 ) -> list:
199 """
200 Returns a sorted list of paths to files acquired between times start
201 and stop found in the SUSI data based specified by self.config
203 WARNING!! Only accurate to the second. The file name is used to determine the
204 time of the file. DO NOT USE FRACTIONAL SECONDS in utc_start and utc_stop.
205 For accurate start use accurate_start
207 Note that if self.remote_db['ssh'] is ot None, then the search is done
208 in a remote machine. However, the returned file paths are relative to
209 self.config.data.root to be directly used for local reading.
211 @author: iglesias, Inspired by /bin/susi_data.py by Hoelken.
213 :param: utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00
214 :param: utc_stop. End time in ISO8601, if not set, only all files
215 :param: avg. If set, search for files that are the result of an average, which
216 follow a different nameing convetion
217 :param: accurate_start. If set, the first frame returned matches accurate_start
218 down to config.base.time_delta_cam_sync. If is not possible fatal error is raised.
219 If not set, the first frame is accurate down to 1s (given by filename)
220 """
221 start = utc_start if isinstance(utc_start, datetime) else datetime.fromisoformat(utc_start)
222 stop = None
223 if utc_stop is not None:
224 stop = utc_stop if isinstance(utc_stop, datetime) else datetime.fromisoformat(utc_stop)
226 candidates = self._collect_files(start, stop, avg=avg)
227 if len(candidates) == 0:
228 log.warning('NO FILES FOUND!')
229 return []
230 files = self._reject_non_matching(candidates, start, stop, avg=avg)
231 if not files:
232 log.warning('NO MATCHING FILES FOUND!')
233 return []
235 if accurate_start is not None:
236 files = self._select_start_frame(files, accurate_start)
237 if print_info:
238 log.info('Found %s file(s) within %s and %s', len(files), utc_start, utc_stop)
239 log.info('First file is: %s', os.path.basename(files[0]))
240 log.info('Last file is: %s', os.path.basename(files[-1]))
241 return files
243 def _select_start_frame(self, files: list, start: datetime) -> list:
244 """
245 Search within the first 48 files for a frame that has OBS_TIME within 0.5 exposure of start.
246 If found that becomes the first frame of flist (previous are deleted).
247 If not found, raise an error.
248 """
249 nfiles_to_search = 48
250 max_tdelta = self.config.base.time_delta_cam_sync / 1e6
251 log.info(f'Selecting start frame accurately down o {format(max_tdelta, ".4f")} s')
252 for i in range(len(files[0:nfiles_to_search])):
253 fits = Fits(files[i]).read(header_only=True)
254 ftime = fits.header[DATE_OBS]
255 ftime = datetime.strptime(ftime, '%Y-%m-%dT%H:%M:%S.%f')
256 delta = abs((start - ftime).total_seconds())
257 if delta < max_tdelta:
258 log.info('Found frame %s at %s', os.path.basename(files[i]), ftime)
259 return files[i:]
260 raise InsufficientDataException(
261 f'No frame found within {format(max_tdelta, ".4f")} s of {start}. Check the start time in your config file.'
262 )
264 def _collect_files(self, start: datetime, stop: datetime = None, avg: bool = False) -> list:
265 if avg:
266 file_pattern = f'*-{self.config.cam.name}{self.config.data.ext}'
267 else:
268 file_pattern = f'{self.config.data.file_prefix}_*_{self.config.cam.name}{self.config.data.ext}'
269 log.debug('Collecting files matching: %s', file_pattern)
270 file_lists = []
271 for path in self.data_paths(start, stop):
272 if self.remote_db:
273 files = self._find_files_remotely(path, file_pattern)
274 else:
275 files = self._find_files_locally(path, file_pattern)
276 log.debug('\t ... found %s file(s)', len(files))
277 if len(files) > 0:
278 file_lists.append(files)
279 return file_lists
281 def _find_files_remotely(self, path: str, file_pattern: str) -> list:
282 cmd = f"find {os.path.join(path, '.')} -name \'{file_pattern}\'"
283 log.debug('Executing %s on %s', cmd, self.remote_db.ssh)
284 ssh = subprocess.Popen(
285 ["ssh", "%s" % self.remote_db.ssh, cmd], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
286 )
288 files = ssh.stdout.readlines()
289 files = [f.decode('UTF-8').splitlines()[0].replace(self.remote_db.base, self.config.data.root) for f in files]
290 return sorted(files)
292 def _find_files_locally(self, path: str, file_pattern: str) -> list:
293 log.debug('Entering %s', path)
294 return sorted(glob.glob(os.path.join(path, file_pattern)))
296 def _reject_non_matching(self, file_lists: list, start: datetime, stop: datetime = None, avg: bool = False) -> list:
297 log.debug('rejecting files that do not match conditions...')
298 file_lists = Collections.flatten_sort(file_lists)
299 FileDB._reject_pre_start(file_lists, start, avg=avg)
300 FileDB._reject_post_end(file_lists, stop, avg=avg)
301 return file_lists
303 @staticmethod
304 def _reject_pre_start(files: list, start: datetime, avg: bool = False) -> None:
305 """
306 removes the entries from the given files list, where the timestamps taken
307 from the file names (s resolution) are earlier than the lookup start time.
308 I.e. timestamps of files in first folder must be > start time.
309 For average files (avg=True), it considers the start (first) timestamp from the file name.
311 Note: Rejection will be performed inplace.
312 """
313 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) < start]
315 for index in sorted(rejected, reverse=True):
316 del files[index]
318 @staticmethod
319 def _reject_post_end(files: list, stop: datetime, avg: bool = False) -> None:
320 """
321 removes the entries from the given files list, where the timestamps taken from the file names (s resolution) are
322 later than the lookup end time. I.e. timestamps of files in last folder must be < end time.
323 For average files (avg=True), it considers the end timestamp from the file name.
325 Note: Rejection will be performed inplace.
326 """
327 if stop is None:
328 return
330 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) > stop]
331 for index in sorted(rejected, reverse=True):
332 del files[index]
334 def check_obs_intervals(
335 self,
336 utc_start: str = None,
337 utc_stop: str = None,
338 file_list: Iterable = None,
339 max_gap: int = 3,
340 raise_error: bool = False,
341 out_dir: str = None,
342 ) -> list:
343 """
344 Use it to check the number of different observations recorded during
345 the input time interval (or files list)
347 Returns an empty list if there is no time gap
348 larger than criterion [s] between consecutive frames found in
349 the database from utc_start to utc_stop. The frames timestamps
350 are taken from the file names.
352 Otherwise, returns and logs a list of dict, each giving
353 the starting and ending file names of the detected observing intervals
354 (observations). It also plots to a png file and saves the list in self.config.log_dir
356 ## Params
357 - utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00
358 - utc_stop. End time in ISO8601, if not set, only all files
359 acquired at utc_start are returned
360 - file_list If given `utc_start` and `utc_stop` is ignored. The dataset within the file_list is
361 - max_gap The maximum time gap that separates frames from another within the same observation in seconds
362 if this criterion is exceeded the dataset is expected to span multiple observations
363 - raise_error If `True` (default `False`) then an error is raised if more than one observation is found.
364 """
365 if utc_start is not None:
366 files = self.search_files(utc_start, utc_stop=utc_stop)
367 elif file_list is not None:
368 files = file_list
369 else:
370 raise IllegalArgumentException('Either utc_start or file_list must be specified... ')
372 times = [FileDB.file_time(os.path.basename(f)) for f in files]
373 obs_times = []
374 obs_files = []
375 obs_id = 0
376 for i in range(len(times) - 1):
377 time_delta = times[i + 1] - times[i]
378 if time_delta.seconds > max_gap:
379 if obs_id == 0:
380 # first interval
381 obs_times.append({'start': times[0], 'stop': times[i]})
382 obs_files.append({'start': files[0], 'stop': files[i]})
383 else:
384 # intermideate intervals
385 obs_times.append({'start': times[obs_id + 1], 'stop': times[i]})
386 obs_files.append({'start': files[obs_id + 1], 'stop': files[i]})
387 obs_id = copy.copy(i)
388 # adds last interval
389 if len(obs_times) > 0:
390 obs_times.append({'start': times[obs_id + 1], 'stop': times[-1]})
391 obs_files.append({'start': files[obs_id + 1], 'stop': files[-1]})
393 if len(obs_times) > 1:
394 log.error('Found %s different observing intervals:', len(obs_times))
395 for i in range(len(obs_times)):
396 log.error(' %s -- %s', os.path.basename(obs_files[i]['start']), os.path.basename(obs_files[i]['stop']))
397 mins = min(times).strftime('%Y%m%dT%H%M%S')
398 maxs = max(times).strftime('%Y%m%dT%H%M%S')
400 if out_dir is not None:
401 cam = self.config.cam.name
402 ofile_img = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.png')
403 ofile_list = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.pkl')
404 log.error('Dumping observed intervals to %s and %s ...', ofile_img, ofile_list)
405 if len(obs_times) < 100:
406 FileDB._plot_obs_intervals(ofile_img, times, obs_times)
407 else:
408 log.warning('Too many observations, I will skip the plot but still save the list.')
409 FileDB._dump_obs_files(ofile_list, files, obs_files, obs_times)
410 if raise_error:
411 log.critical(f'Found {len(obs_times)} observations in the dataset. Only one was expected.')
412 if out_dir is not None:
413 log.critical('Details can be found in')
414 log.critical('\t %s', ofile_img)
415 log.critical('\t %s', ofile_list)
416 raise ValueError(f'Found {len(obs_times)} observations in the dataset.')
418 return obs_files
420 @staticmethod
421 def _plot_obs_intervals(out_file: str, times: list, obs_times: list) -> None:
422 with matplot_backend('Agg'):
423 fig = plt.figure(figsize=(15, 7))
424 fig.suptitle('SUSI observed intervals\nIf the time gap is not visible follow the red lines')
425 plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%dT%H%M%S'))
426 plt.gca().xaxis.set_major_locator(mdates.MinuteLocator())
427 plt.plot(times, np.ones(len(times)), '*')
428 plt.gcf().autofmt_xdate()
429 plt.xlabel('Frame time from file name (1[s] resolution)')
431 for i in range(len(obs_times)):
432 plt.axvline(x=obs_times[i]['start'], color='r', linestyle='--')
433 plt.axvline(x=obs_times[i]['stop'], color='r', linestyle='--')
435 plt.savefig(out_file)
436 plt.close()
438 @staticmethod
439 def _dump_obs_files(out_file: str, files: list, obs_files: list, obs_times: list) -> None:
440 obs_file_path = out_file.split('.')[0] + '_reduced.pkl'
441 with open(out_file, 'wb', 0o775) as handle:
442 all_ofile_list = []
443 for i in range(len(obs_times)):
444 starti = files.index(obs_files[i]['start'])
445 stopi = files.index(obs_files[i]['stop'])
446 all_ofile_list.append(files[starti:stopi])
447 pickle.dump(all_ofile_list, handle, protocol=pickle.HIGHEST_PROTOCOL)
448 handle.close()
450 with open(obs_file_path, 'wb', 0o775) as handle:
451 pickle.dump(obs_files, handle, protocol=pickle.HIGHEST_PROTOCOL)
452 handle.close()
454 @staticmethod
455 def fname_tstmp(fname: str, tstmp_start: int, tstmp_end: int, avg: bool = False) -> int:
456 """Cutout the timestamp part of the file name and return it as int"""
457 if avg:
458 delta = len('HHmmSS_XXXXXXX-YYYYMMDD_')
459 return [
460 int(os.path.basename(fname)[tstmp_start:tstmp_end]),
461 int(os.path.basename(fname)[tstmp_start + delta : tstmp_end + delta]),
462 ]
463 else:
464 return int(os.path.basename(fname)[tstmp_start:tstmp_end])
466 @staticmethod
467 def tstmp_path(root: str, time: datetime, cam: str) -> str:
468 """Generates a file path within the given root folder based on time and cam"""
469 return os.path.join(root, time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), cam)