Coverage for src/susi/db/filedb.py: 42%

251 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Provides methods used to search and navigate the SUSI database file structure 

4 

5@author: iglesias, hoelken 

6""" 

7import copy 

8import glob 

9import logging 

10import os 

11import pickle 

12import subprocess 

13from datetime import datetime, timedelta 

14from typing import Iterable, Union 

15 

16import matplotlib.dates as mdates 

17import matplotlib.pyplot as plt 

18import numpy as np 

19from src.susi import InsufficientDataException 

20 

21from .remote_connection import RemoteConnection 

22from ..base import Config, IllegalArgumentException, Globals, DATE_OBS 

23from ..io import Fits 

24from ..plot import matplot_backend 

25from ..utils import Collections 

26 

27log = logging.getLogger('SUSI') 

28 

29 

30class FileDB: 

31 """ 

32 Tools to search and navigate SUSI file database 

33 """ 

34 

35 def __init__(self, config: Config = Config(), remote_db: RemoteConnection = RemoteConnection()): 

36 #: Configuration 

37 self.config = config 

38 #: Connection information for a remote (ssh) file db. Default: Do not use a remote. 

39 #: If used the file paths found are still returned relative to `self.config.data.root` 

40 #: to be directly used for local reading. 

41 self.remote_db = remote_db 

42 

43 def dir_path(self, time: datetime, depth: str = 'full') -> str: 

44 """ 

45 Returns a string with the path to the level0.1 directory where the data for input time is located. 

46 

47 Note that if self.remote_db['base'] is not None, then paths relative to remote base are returned 

48 

49 ## Params 

50 - time [datetime] Time of the dir to return 

51 - depth: Use it to select the deepness of the returned path. Options are: 

52 -'full' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh/cam` 

53 -'upto_hour' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh` 

54 -'upto_day' Returns: `base/level_dir/YYYY_MM_DD` 

55 -'upto_level' Returns: `base/level_dir` 

56 -'base' Returns: `base` 

57 -'day' Returns: `YYYY_MM_DD` 

58 -'hour' Returns: `YYYY_MM_DD_hh` 

59 -'level' Returns: `level_dir` 

60 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh` 

61 """ 

62 base_dir = self.remote_db.base if self.remote_db else self.config.data.root 

63 src = os.path.realpath(os.path.join(base_dir, self.config.data.level)) 

64 

65 if depth == 'full': 

66 return os.path.join(src, self.dir_structure(time, 'day-hour-cam')) 

67 elif depth == 'upto_hour': 

68 return os.path.join(src, self.dir_structure(time, 'day-hour')) 

69 elif depth == 'upto_day': 

70 return os.path.join(src, self.dir_structure(time, 'day')) 

71 elif depth == 'upto_level': 

72 return src 

73 elif depth == 'base': 

74 return base_dir 

75 elif depth == 'level': 

76 return self.config.data.level 

77 else: 

78 return self.dir_structure(time, depth) 

79 

80 def dir_structure(self, time: datetime, depth: str) -> str: 

81 """ 

82 Returns a string with the intermediate path part from the level to the file 

83 ## Params 

84 - time [datetime] Time of the dir to return 

85 - depth: Use it to select the deepness of the returned path. Options are: 

86 -'day' Returns: `YYYY_MM_DD` 

87 -'hour' Returns: `YYYY_MM_DD_hh` 

88 -'cam' Returns: `camX` 

89 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh` 

90 -'day-hour-cam' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh/camX` 

91 """ 

92 if depth == 'hour': 

93 return time.strftime('%Y_%m_%d_%H') 

94 elif depth == 'day': 

95 return time.strftime('%Y_%m_%d') 

96 elif depth == 'cam': 

97 return self.config.cam.name 

98 elif depth == 'day-hour': 

99 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H')) 

100 elif depth == 'day-hour-cam': 

101 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), self.config.cam.name) 

102 else: 

103 log.critical('Value of depth "%s" not recognized.', depth) 

104 raise IllegalArgumentException('Value of depth not recognized.') 

105 

106 def avrg_fname(self, f_start: str, f_end: str, appendix='', avg=False) -> str: 

107 """ 

108 Returns the file name for a file that was composed of multiple input files. 

109 Per convention the file name should contain the time of the first and last files. 

110 

111 ### Returns 

112 `<f_start:YYYYMMDD_HHMMSS_XXXXXXX>-<f_end:YYYYMMDD_HHMMSS_XXXXXXX>-<cam>.<ext>` 

113 

114 :param: appendix [string] to append to the end of the file name before extension 

115 :param: avg [bool] If set, the file name of f_start and f_stop are assumed 

116 to have already the average filename format. So we keep the very first and last parts 

117 """ 

118 f1_part = FileDB.file_time(f_start, time_and_num_as_string=True, avg=avg) 

119 f2_part = FileDB.file_time(f_end, time_and_num_as_string=True, avg=avg, avg_last=avg) 

120 ext = self.config.data.ext_out 

121 cam = self.config.cam.name 

122 return f"{f1_part}-{f2_part}-{cam}{appendix}{ext}" 

123 

124 @staticmethod 

125 def file_time(file, time_as_string=False, time_and_num_as_string=False, avg=False, avg_last=False): 

126 """ 

127 Returns the time from the input file name (resolution of seconds). 

128 NOTE that this is not the timestamp. 

129 

130 Assumed file format is: '*_YYYYmmdd_HHMMSS_*' 

131 

132 :param: file [string] full path or file name 

133 :param: time_as_string. If set, returns the string 'YYYYmmdd_HHMMSS' 

134 instead of a datetime object 

135 :param: time_and_num_as_string. If set, returns the string 

136 'YYYYmmdd_HHMMSS_ID' instead of a datetime object 

137 Where ID is the 7 digit unique frame identifier 

138 :param: avg. If set, the file name is assumed to have the average filename format 

139 It returns the STARTING or ENDING time of the average, see avg_last 

140 :param: avg_last. If True it returns the ENDING file time of the average 

141 """ 

142 if avg: 

143 if avg_last: 

144 ftime = 'im_' + os.path.basename(file).split('-')[1] + '_cam' 

145 else: 

146 ftime = 'im_' + os.path.basename(file).split('-')[0] + '_cam' 

147 else: 

148 ftime = os.path.basename(file).split('.')[0] 

149 ftime = ftime.split('_') 

150 if time_as_string: 

151 return f'{ftime[1]}_{ftime[2]}' 

152 elif time_and_num_as_string: 

153 return f'{ftime[1]}_{ftime[2]}_{ftime[3]}' 

154 else: 

155 return datetime.strptime(f'{ftime[1]}_{ftime[2]}', '%Y%m%d_%H%M%S') 

156 

157 def file_path(self, base: str, fits: Fits) -> str: 

158 """Return the appropriate full file path based on a given base directory""" 

159 return os.path.join( 

160 base, self.dir_structure(fits.observation_time(), 'day-hour-cam'), os.path.basename(fits.path) 

161 ) 

162 

163 def base_out_path(self, module: list) -> str: 

164 """Return the level out path depending on the configured (default) root and the pipeline module applied.""" 

165 level = Globals.pipeline(self.config.base.pipeline).index(module) 

166 if self.config.data.custom_pipeline_dir is not None: 

167 output_subdir = self.config.data.custom_pipeline_dir 

168 else: 

169 output_subdir = self.config.base.pipeline 

170 if level == 0 and module == ['C']: 

171 return os.path.join(self.config.out_path(), 'level_0') 

172 if self.config.data.custom_out_levels is not None: 

173 out_level = self.config.data.custom_out_levels[level] 

174 else: 

175 out_level = level 

176 return os.path.join(self.config.out_path(), output_subdir, f'level_{out_level}') 

177 

178 def data_paths(self, start: datetime, stop: datetime = None) -> list: 

179 """ 

180 Returns a sorted list of paths to folders that should contain all files acquired between times start and stop 

181 """ 

182 paths = [self.dir_path(start)] 

183 if stop is not None: 

184 one_hour = timedelta(hours=1) 

185 interim = start + one_hour 

186 while interim < stop + one_hour: 

187 paths.append(self.dir_path(interim)) 

188 interim += one_hour 

189 return paths 

190 

191 def search_files( 

192 self, 

193 utc_start: Union[str, datetime], 

194 utc_stop: Union[str, datetime] = None, 

195 avg: bool = False, 

196 accurate_start: Union[str, datetime] = None, 

197 print_info=True, 

198 ) -> list: 

199 """ 

200 Returns a sorted list of paths to files acquired between times start 

201 and stop found in the SUSI data based specified by self.config 

202 

203 WARNING!! Only accurate to the second. The file name is used to determine the 

204 time of the file. DO NOT USE FRACTIONAL SECONDS in utc_start and utc_stop. 

205 For accurate start use accurate_start 

206 

207 Note that if self.remote_db['ssh'] is ot None, then the search is done 

208 in a remote machine. However, the returned file paths are relative to 

209 self.config.data.root to be directly used for local reading. 

210 

211 @author: iglesias, Inspired by /bin/susi_data.py by Hoelken. 

212 

213 :param: utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00 

214 :param: utc_stop. End time in ISO8601, if not set, only all files 

215 :param: avg. If set, search for files that are the result of an average, which 

216 follow a different nameing convetion 

217 :param: accurate_start. If set, the first frame returned matches accurate_start 

218 down to config.base.time_delta_cam_sync. If is not possible fatal error is raised. 

219 If not set, the first frame is accurate down to 1s (given by filename) 

220 """ 

221 start = utc_start if isinstance(utc_start, datetime) else datetime.fromisoformat(utc_start) 

222 stop = None 

223 if utc_stop is not None: 

224 stop = utc_stop if isinstance(utc_stop, datetime) else datetime.fromisoformat(utc_stop) 

225 

226 candidates = self._collect_files(start, stop, avg=avg) 

227 if len(candidates) == 0: 

228 log.warning('NO FILES FOUND!') 

229 return [] 

230 files = self._reject_non_matching(candidates, start, stop, avg=avg) 

231 if not files: 

232 log.warning('NO MATCHING FILES FOUND!') 

233 return [] 

234 

235 if accurate_start is not None: 

236 files = self._select_start_frame(files, accurate_start) 

237 if print_info: 

238 log.info('Found %s file(s) within %s and %s', len(files), utc_start, utc_stop) 

239 log.info('First file is: %s', os.path.basename(files[0])) 

240 log.info('Last file is: %s', os.path.basename(files[-1])) 

241 return files 

242 

243 def _select_start_frame(self, files: list, start: datetime) -> list: 

244 """ 

245 Search within the first 48 files for a frame that has OBS_TIME within 0.5 exposure of start. 

246 If found that becomes the first frame of flist (previous are deleted). 

247 If not found, raise an error. 

248 """ 

249 nfiles_to_search = 48 

250 max_tdelta = self.config.base.time_delta_cam_sync / 1e6 

251 log.info(f'Selecting start frame accurately down o {format(max_tdelta, ".4f")} s') 

252 for i in range(len(files[0:nfiles_to_search])): 

253 fits = Fits(files[i]).read(header_only=True) 

254 ftime = fits.header[DATE_OBS] 

255 ftime = datetime.strptime(ftime, '%Y-%m-%dT%H:%M:%S.%f') 

256 delta = abs((start - ftime).total_seconds()) 

257 if delta < max_tdelta: 

258 log.info('Found frame %s at %s', os.path.basename(files[i]), ftime) 

259 return files[i:] 

260 raise InsufficientDataException( 

261 f'No frame found within {format(max_tdelta, ".4f")} s of {start}. Check the start time in your config file.' 

262 ) 

263 

264 def _collect_files(self, start: datetime, stop: datetime = None, avg: bool = False) -> list: 

265 if avg: 

266 file_pattern = f'*-{self.config.cam.name}{self.config.data.ext}' 

267 else: 

268 file_pattern = f'{self.config.data.file_prefix}_*_{self.config.cam.name}{self.config.data.ext}' 

269 log.debug('Collecting files matching: %s', file_pattern) 

270 file_lists = [] 

271 for path in self.data_paths(start, stop): 

272 if self.remote_db: 

273 files = self._find_files_remotely(path, file_pattern) 

274 else: 

275 files = self._find_files_locally(path, file_pattern) 

276 log.debug('\t ... found %s file(s)', len(files)) 

277 if len(files) > 0: 

278 file_lists.append(files) 

279 return file_lists 

280 

281 def _find_files_remotely(self, path: str, file_pattern: str) -> list: 

282 cmd = f"find {os.path.join(path, '.')} -name \'{file_pattern}\'" 

283 log.debug('Executing %s on %s', cmd, self.remote_db.ssh) 

284 ssh = subprocess.Popen( 

285 ["ssh", "%s" % self.remote_db.ssh, cmd], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

286 ) 

287 

288 files = ssh.stdout.readlines() 

289 files = [f.decode('UTF-8').splitlines()[0].replace(self.remote_db.base, self.config.data.root) for f in files] 

290 return sorted(files) 

291 

292 def _find_files_locally(self, path: str, file_pattern: str) -> list: 

293 log.debug('Entering %s', path) 

294 return sorted(glob.glob(os.path.join(path, file_pattern))) 

295 

296 def _reject_non_matching(self, file_lists: list, start: datetime, stop: datetime = None, avg: bool = False) -> list: 

297 log.debug('rejecting files that do not match conditions...') 

298 file_lists = Collections.flatten_sort(file_lists) 

299 FileDB._reject_pre_start(file_lists, start, avg=avg) 

300 FileDB._reject_post_end(file_lists, stop, avg=avg) 

301 return file_lists 

302 

303 @staticmethod 

304 def _reject_pre_start(files: list, start: datetime, avg: bool = False) -> None: 

305 """ 

306 removes the entries from the given files list, where the timestamps taken 

307 from the file names (s resolution) are earlier than the lookup start time. 

308 I.e. timestamps of files in first folder must be > start time. 

309 For average files (avg=True), it considers the start (first) timestamp from the file name. 

310 

311 Note: Rejection will be performed inplace. 

312 """ 

313 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) < start] 

314 

315 for index in sorted(rejected, reverse=True): 

316 del files[index] 

317 

318 @staticmethod 

319 def _reject_post_end(files: list, stop: datetime, avg: bool = False) -> None: 

320 """ 

321 removes the entries from the given files list, where the timestamps taken from the file names (s resolution) are 

322 later than the lookup end time. I.e. timestamps of files in last folder must be < end time. 

323 For average files (avg=True), it considers the end timestamp from the file name. 

324 

325 Note: Rejection will be performed inplace. 

326 """ 

327 if stop is None: 

328 return 

329 

330 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) > stop] 

331 for index in sorted(rejected, reverse=True): 

332 del files[index] 

333 

334 def check_obs_intervals( 

335 self, 

336 utc_start: str = None, 

337 utc_stop: str = None, 

338 file_list: Iterable = None, 

339 max_gap: int = 3, 

340 raise_error: bool = False, 

341 out_dir: str = None, 

342 ) -> list: 

343 """ 

344 Use it to check the number of different observations recorded during 

345 the input time interval (or files list) 

346 

347 Returns an empty list if there is no time gap 

348 larger than criterion [s] between consecutive frames found in 

349 the database from utc_start to utc_stop. The frames timestamps 

350 are taken from the file names. 

351 

352 Otherwise, returns and logs a list of dict, each giving 

353 the starting and ending file names of the detected observing intervals 

354 (observations). It also plots to a png file and saves the list in self.config.log_dir 

355 

356 ## Params 

357 - utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00 

358 - utc_stop. End time in ISO8601, if not set, only all files 

359 acquired at utc_start are returned 

360 - file_list If given `utc_start` and `utc_stop` is ignored. The dataset within the file_list is 

361 - max_gap The maximum time gap that separates frames from another within the same observation in seconds 

362 if this criterion is exceeded the dataset is expected to span multiple observations 

363 - raise_error If `True` (default `False`) then an error is raised if more than one observation is found. 

364 """ 

365 if utc_start is not None: 

366 files = self.search_files(utc_start, utc_stop=utc_stop) 

367 elif file_list is not None: 

368 files = file_list 

369 else: 

370 raise IllegalArgumentException('Either utc_start or file_list must be specified... ') 

371 

372 times = [FileDB.file_time(os.path.basename(f)) for f in files] 

373 obs_times = [] 

374 obs_files = [] 

375 obs_id = 0 

376 for i in range(len(times) - 1): 

377 time_delta = times[i + 1] - times[i] 

378 if time_delta.seconds > max_gap: 

379 if obs_id == 0: 

380 # first interval 

381 obs_times.append({'start': times[0], 'stop': times[i]}) 

382 obs_files.append({'start': files[0], 'stop': files[i]}) 

383 else: 

384 # intermideate intervals 

385 obs_times.append({'start': times[obs_id + 1], 'stop': times[i]}) 

386 obs_files.append({'start': files[obs_id + 1], 'stop': files[i]}) 

387 obs_id = copy.copy(i) 

388 # adds last interval 

389 if len(obs_times) > 0: 

390 obs_times.append({'start': times[obs_id + 1], 'stop': times[-1]}) 

391 obs_files.append({'start': files[obs_id + 1], 'stop': files[-1]}) 

392 

393 if len(obs_times) > 1: 

394 log.error('Found %s different observing intervals:', len(obs_times)) 

395 for i in range(len(obs_times)): 

396 log.error(' %s -- %s', os.path.basename(obs_files[i]['start']), os.path.basename(obs_files[i]['stop'])) 

397 mins = min(times).strftime('%Y%m%dT%H%M%S') 

398 maxs = max(times).strftime('%Y%m%dT%H%M%S') 

399 

400 if out_dir is not None: 

401 cam = self.config.cam.name 

402 ofile_img = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.png') 

403 ofile_list = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.pkl') 

404 log.error('Dumping observed intervals to %s and %s ...', ofile_img, ofile_list) 

405 if len(obs_times) < 100: 

406 FileDB._plot_obs_intervals(ofile_img, times, obs_times) 

407 else: 

408 log.warning('Too many observations, I will skip the plot but still save the list.') 

409 FileDB._dump_obs_files(ofile_list, files, obs_files, obs_times) 

410 if raise_error: 

411 log.critical(f'Found {len(obs_times)} observations in the dataset. Only one was expected.') 

412 if out_dir is not None: 

413 log.critical('Details can be found in') 

414 log.critical('\t %s', ofile_img) 

415 log.critical('\t %s', ofile_list) 

416 raise ValueError(f'Found {len(obs_times)} observations in the dataset.') 

417 

418 return obs_files 

419 

420 @staticmethod 

421 def _plot_obs_intervals(out_file: str, times: list, obs_times: list) -> None: 

422 with matplot_backend('Agg'): 

423 fig = plt.figure(figsize=(15, 7)) 

424 fig.suptitle('SUSI observed intervals\nIf the time gap is not visible follow the red lines') 

425 plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%dT%H%M%S')) 

426 plt.gca().xaxis.set_major_locator(mdates.MinuteLocator()) 

427 plt.plot(times, np.ones(len(times)), '*') 

428 plt.gcf().autofmt_xdate() 

429 plt.xlabel('Frame time from file name (1[s] resolution)') 

430 

431 for i in range(len(obs_times)): 

432 plt.axvline(x=obs_times[i]['start'], color='r', linestyle='--') 

433 plt.axvline(x=obs_times[i]['stop'], color='r', linestyle='--') 

434 

435 plt.savefig(out_file) 

436 plt.close() 

437 

438 @staticmethod 

439 def _dump_obs_files(out_file: str, files: list, obs_files: list, obs_times: list) -> None: 

440 obs_file_path = out_file.split('.')[0] + '_reduced.pkl' 

441 with open(out_file, 'wb', 0o775) as handle: 

442 all_ofile_list = [] 

443 for i in range(len(obs_times)): 

444 starti = files.index(obs_files[i]['start']) 

445 stopi = files.index(obs_files[i]['stop']) 

446 all_ofile_list.append(files[starti:stopi]) 

447 pickle.dump(all_ofile_list, handle, protocol=pickle.HIGHEST_PROTOCOL) 

448 handle.close() 

449 

450 with open(obs_file_path, 'wb', 0o775) as handle: 

451 pickle.dump(obs_files, handle, protocol=pickle.HIGHEST_PROTOCOL) 

452 handle.close() 

453 

454 @staticmethod 

455 def fname_tstmp(fname: str, tstmp_start: int, tstmp_end: int, avg: bool = False) -> int: 

456 """Cutout the timestamp part of the file name and return it as int""" 

457 if avg: 

458 delta = len('HHmmSS_XXXXXXX-YYYYMMDD_') 

459 return [ 

460 int(os.path.basename(fname)[tstmp_start:tstmp_end]), 

461 int(os.path.basename(fname)[tstmp_start + delta : tstmp_end + delta]), 

462 ] 

463 else: 

464 return int(os.path.basename(fname)[tstmp_start:tstmp_end]) 

465 

466 @staticmethod 

467 def tstmp_path(root: str, time: datetime, cam: str) -> str: 

468 """Generates a file path within the given root folder based on time and cam""" 

469 return os.path.join(root, time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), cam)