Coverage for src/susi/db/filedb.py: 39%

285 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-08-22 09:20 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Provides methods used to search and navigate the SUSI database file structure 

4 

5@author: iglesias, hoelken 

6""" 

7import copy 

8import glob 

9import logging 

10import os 

11import pickle 

12import subprocess 

13from datetime import datetime, timedelta 

14from typing import Iterable, Union 

15 

16import matplotlib.dates as mdates 

17import matplotlib.pyplot as plt 

18import numpy as np 

19 

20from .remote_connection import RemoteConnection 

21from ..base import Config, IllegalArgumentException, Globals, DATE_OBS, InsufficientDataException, TIMESTAMP_US 

22from ..io import Fits 

23from ..plot import matplot_backend 

24from ..utils import Collections 

25 

26log = logging.getLogger('SUSI') 

27 

28 

29class FileDB: 

30 """ 

31 Tools to search and navigate SUSI file database 

32 """ 

33 

34 def __init__(self, config: Config = Config(), remote_db: RemoteConnection = RemoteConnection()): 

35 #: Configuration 

36 self.config = config 

37 #: Connection information for a remote (ssh) file db. Default: Do not use a remote. 

38 #: If used the file paths found are still returned relative to `self.config.data.root` 

39 #: to be directly used for local reading. 

40 self.remote_db = remote_db 

41 

42 def dir_path(self, time: datetime, depth: str = 'full', base=None) -> str: 

43 """ 

44 Returns a string with the path to the level0.1 directory where the data for input time is located. 

45 

46 NOTE: 

47 if base is set, it is used as the base directory instead of `self.remote_db.base` or `self.config.data.root` 

48 if self.remote_db['base'] is not None, then paths relative to remote base are returned 

49 

50 ## Params 

51 - time [datetime] Time of the dir to return 

52 - depth: Use it to select the deepness of the returned path. Options are: 

53 -'full' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh/cam` 

54 -'upto_hour' Returns: `base/level_dir/YYYY_MM_DD/YYYY_MM_DD_hh` 

55 -'upto_day' Returns: `base/level_dir/YYYY_MM_DD` 

56 -'upto_level' Returns: `base/level_dir` 

57 -'base' Returns: `base` 

58 -'day' Returns: `YYYY_MM_DD` 

59 -'hour' Returns: `YYYY_MM_DD_hh` 

60 -'level' Returns: `level_dir` 

61 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh` 

62 -'day-hour-cam' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh/cam` 

63 - base: [string] If set, it is used as the base directory instead 

64 of `self.remote_db.base` or `self.config.data.root` 

65 """ 

66 if base is None: 

67 base_dir = self.remote_db.base if self.remote_db else self.config.data.root 

68 else: 

69 base_dir = base 

70 

71 src = os.path.realpath(os.path.join(base_dir, self.config.data.level)) 

72 

73 if depth == 'full': 

74 return os.path.join(src, self.dir_structure(time, 'day-hour-cam')) 

75 elif depth == 'upto_hour': 

76 return os.path.join(src, self.dir_structure(time, 'day-hour')) 

77 elif depth == 'upto_day': 

78 return os.path.join(src, self.dir_structure(time, 'day')) 

79 elif depth == 'upto_level': 

80 return src 

81 elif depth == 'base': 

82 return base_dir 

83 elif depth == 'level': 

84 return self.config.data.level 

85 else: 

86 return self.dir_structure(time, depth) 

87 

88 def file_short_basename(self, file: str, avg=False, cam=True) -> str: 

89 """ 

90 Returns the short basename of the file by removing year, month and extension (keeping day) from the file name. 

91 

92 Assumed file format is: `*_YYYYmmdd_HHMMSS_XXXXXXX_cam.ext` 

93 the output is `dd_HHMMSS_XXXXXXX_cam` 

94 

95 if avg, the assumed format is: YYYYMMDD_HHMMSS_XXXXXXX-YYYYMMDD_HHMMSS_XXXXXXX>-<cam>.<ext> 

96 the output is `DD_HHMMSS_XXXXXXX-DD_HHMMSS_XXXXXXX-cam` 

97 

98 :param: file [string] full path or file name 

99 :param: avg [bool] If set, the file name is assumed to have the average filename format 

100 :param: cam [bool] If set, the camera name is included in the output 

101 """ 

102 if not avg: 

103 fname = os.path.basename(file).split('_')[1:4] 

104 fname = '_'.join(fname) 

105 fname = fname[6:] 

106 else: 

107 fname = os.path.basename(file).split('-')[0:2] 

108 fname1 = self.file_short_basename('bla_' + fname[0], cam=False) 

109 fname2 = self.file_short_basename('bla_' + fname[1], cam=False) 

110 fname = f'{fname1}-{fname2}' 

111 

112 if cam and self.config.cam.name: 

113 fname += f'-{self.config.cam.name}' 

114 return fname 

115 

116 def dir_structure(self, time: datetime, depth: str) -> str: 

117 """ 

118 Returns a string with the intermediate path part from the level to the file 

119 ## Params 

120 - time [datetime] Time of the dir to return 

121 - depth: Use it to select the deepness of the returned path. Options are: 

122 -'day' Returns: `YYYY_MM_DD` 

123 -'hour' Returns: `YYYY_MM_DD_hh` 

124 -'cam' Returns: `camX` 

125 -'day-hour' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh` 

126 -'day-hour-cam' Returns: `YYYY_MM_DD/YYYY_MM_DD_hh/camX` 

127 """ 

128 if depth == 'hour': 

129 return time.strftime('%Y_%m_%d_%H') 

130 elif depth == 'day': 

131 return time.strftime('%Y_%m_%d') 

132 elif depth == 'cam': 

133 return self.config.cam.name 

134 elif depth == 'day-hour': 

135 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H')) 

136 elif depth == 'day-hour-cam': 

137 return os.path.join(time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), self.config.cam.name) 

138 else: 

139 log.critical('Value of depth "%s" not recognized.', depth) 

140 raise IllegalArgumentException('Value of depth not recognized.') 

141 

142 def avrg_fname(self, f_start: str, f_end: str, appendix='', avg=False) -> str: 

143 """ 

144 Returns the file name for a file that was composed of multiple input files. 

145 Per convention the file name should contain the time of the first and last files. 

146 

147 ### Returns 

148 `<f_start:YYYYMMDD_HHMMSS_XXXXXXX>-<f_end:YYYYMMDD_HHMMSS_XXXXXXX>-<cam>.<ext>` 

149 

150 :param: appendix [string] to append to the end of the file name before extension 

151 :param: avg [bool] If set, the file name of f_start and f_stop are assumed 

152 to have already the average filename format. So we keep the very first and last parts 

153 """ 

154 f1_part = FileDB.file_time(f_start, time_and_num_as_string=True, avg=avg) 

155 f2_part = FileDB.file_time(f_end, time_and_num_as_string=True, avg=avg, avg_last=avg) 

156 ext = self.config.data.ext_out 

157 cam = self.config.cam.name 

158 return f"{f1_part}-{f2_part}-{cam}{appendix}{ext}" 

159 

160 @staticmethod 

161 def file_time(file, time_as_string=False, time_and_num_as_string=False, avg=False, avg_last=False): 

162 """ 

163 Returns the time from the input file name (resolution of seconds). 

164 NOTE that this is not the timestamp. 

165 

166 Assumed file format is: '*_YYYYmmdd_HHMMSS_*' 

167 

168 :param: file [string] full path or file name 

169 :param: time_as_string. If set, returns the string 'YYYYmmdd_HHMMSS' 

170 instead of a datetime object 

171 :param: time_and_num_as_string. If set, returns the string 

172 'YYYYmmdd_HHMMSS_ID' instead of a datetime object 

173 Where ID is the 7 digit unique frame identifier 

174 :param: avg. If set, the file name is assumed to have the average filename format 

175 It returns the STARTING or ENDING time of the average, see avg_last 

176 :param: avg_last. If True it returns the ENDING file time of the average 

177 """ 

178 if avg: 

179 if avg_last: 

180 ftime = 'im_' + os.path.basename(file).split('-')[1] + '_cam' 

181 else: 

182 ftime = 'im_' + os.path.basename(file).split('-')[0] + '_cam' 

183 else: 

184 ftime = os.path.basename(file).split('.')[0] 

185 ftime = ftime.split('_') 

186 if time_as_string: 

187 return f'{ftime[1]}_{ftime[2]}' 

188 elif time_and_num_as_string: 

189 return f'{ftime[1]}_{ftime[2]}_{ftime[3]}' 

190 else: 

191 return datetime.strptime(f'{ftime[1]}_{ftime[2]}', '%Y%m%d_%H%M%S') 

192 

193 def file_path(self, base: str, fits: Fits) -> str: 

194 """Return the appropriate full file path based on a given base directory""" 

195 return os.path.join( 

196 base, self.dir_structure(fits.observation_time(), 'day-hour-cam'), os.path.basename(fits.path) 

197 ) 

198 

199 def base_out_path(self, module: list) -> str: 

200 """Return the level out path depending on the configured (default) root and the pipeline module applied.""" 

201 level = Globals.pipeline(self.config.base.pipeline).index(module) 

202 if self.config.data.custom_pipeline_dir is not None: 

203 output_subdir = self.config.data.custom_pipeline_dir 

204 else: 

205 output_subdir = self.config.base.pipeline 

206 if level == 0 and module == ['C']: 

207 return os.path.join(self.config.out_path(), 'level_0') 

208 if self.config.data.custom_out_levels is not None: 

209 out_level = self.config.data.custom_out_levels[level] 

210 else: 

211 out_level = level 

212 return os.path.join(self.config.out_path(), output_subdir, f'level_{out_level}') 

213 

214 def data_paths(self, start: datetime, stop: datetime = None) -> list: 

215 """ 

216 Returns a sorted list of paths to folders that should contain all files acquired between times start and stop 

217 """ 

218 paths = [self.dir_path(start)] 

219 if stop is not None: 

220 one_hour = timedelta(hours=1) 

221 interim = start + one_hour 

222 while interim < stop + one_hour: 

223 paths.append(self.dir_path(interim)) 

224 interim += one_hour 

225 return paths 

226 

227 def search_files( 

228 self, 

229 utc_start: Union[str, datetime], 

230 utc_stop: Union[str, datetime] = None, 

231 avg: bool = False, 

232 accurate_start: Union[str, datetime] = None, 

233 print_info=True, 

234 ascending_imgnumber: bool = True, 

235 ) -> list: 

236 """ 

237 Returns a sorted list of paths to files acquired between times start 

238 and stop found in the SUSI data based specified by self.config 

239 

240 WARNING!! Only accurate to the second. The file name is used to determine the 

241 time of the file. DO NOT USE FRACTIONAL SECONDS in utc_start and utc_stop. 

242 For accurate start use accurate_start 

243 

244 Note that if self.remote_db['ssh'] is ot None, then the search is done 

245 in a remote machine. However, the returned file paths are relative to 

246 self.config.data.root to be directly used for local reading. 

247 

248 @author: iglesias, Inspired by /bin/susi_data.py by Hoelken. 

249 

250 :param: utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00 

251 :param: utc_stop. End time in ISO8601, if not set, only all files 

252 :param: avg. If set, search for files that are the result of an average, which 

253 follow a different nameing convetion 

254 :param: accurate_start. If set, the first frame returned matches accurate_start 

255 down to config.base.time_delta_cam_sync. If is not possible fatal error is raised. 

256 If not set, the first frame is accurate down to 1s (given by filename) 

257 :param: print_info. If set, prints the number of files found and the first and last file names 

258 :param: ascending_imgnumber. If set, rise an error if files do not have ascending image numbers. 

259 """ 

260 start = utc_start if isinstance(utc_start, datetime) else datetime.fromisoformat(utc_start) 

261 stop = None 

262 if utc_stop is not None: 

263 stop = utc_stop if isinstance(utc_stop, datetime) else datetime.fromisoformat(utc_stop) 

264 

265 candidates = self._collect_files(start, stop, avg=avg) 

266 if len(candidates) == 0: 

267 log.warning('NO FILES FOUND!') 

268 return [] 

269 files = self._reject_non_matching(candidates, start, stop, avg=avg) 

270 if not files: 

271 log.warning('NO MATCHING FILES FOUND!') 

272 return [] 

273 

274 if accurate_start is not None: 

275 files = self._select_start_frame(files, accurate_start) 

276 

277 if ascending_imgnumber: 

278 img_numbers = [FileDB.fname_img_number(f, avg=avg) for f in files] 

279 asc_num = [img_numbers[i] <= img_numbers[i + 1] for i in range(len(img_numbers) - 1)] 

280 if not all(asc_num): 

281 wrong_pairs = [ 

282 f'{files[i]} ({img_numbers[i]}) - {files[i + 1]} ({img_numbers[i + 1]})' 

283 for i in range(len(asc_num)) 

284 if not asc_num[i] 

285 ] 

286 log.warning(f'Files sorted by file name do not have ascending file number') 

287 log.warning(f'Resorting by DATE_OBS header keword, this may take a while...') 

288 files = self.sort_by_date_obs(files) 

289 

290 if print_info: 

291 log.info('Found %s file(s) within %s and %s', len(files), utc_start, utc_stop) 

292 log.info('First file is: %s', os.path.basename(files[0])) 

293 log.info('Last file is: %s', os.path.basename(files[-1])) 

294 return files 

295 

296 def sort_by_date_obs(self, files: list) -> list: 

297 """ 

298 Sorts the input list of files by the DATE_OBS header keyword. 

299 This is more accurate than sorting by file name, by requires reading the header of each file. 

300 :param: files [list] List of file names to sort 

301 :return: [list] Sorted list of file names 

302 """ 

303 from ..io import FitsBatch # to avoid circular ref ? 

304 

305 try: 

306 batch = FitsBatch().load(files, header_only=True, sort_by=DATE_OBS) 

307 except Exception as e: 

308 log.warning(f'Error sorting by {DATE_OBS}, sorting by {TIMESTAMP_US} instead: {e}') 

309 batch = FitsBatch().load(files, header_only=True, sort_by=TIMESTAMP_US) 

310 

311 return batch.file_list() 

312 

313 def _select_start_frame(self, files: list, start: datetime) -> list: 

314 """ 

315 Search within the first 48 files for a frame that has OBS_TIME within 0.5 exposure of start. 

316 If found that becomes the first frame of flist (previous are deleted). 

317 If not found, raise an error. 

318 """ 

319 nfiles_to_search = 48 

320 max_tdelta = self.config.base.time_delta_cam_sync / 1e6 

321 log.info(f'Selecting start frame closest to {start}') 

322 for i in range(len(files[0:nfiles_to_search])): 

323 fits = Fits(files[i]).read(header_only=True) 

324 ftime = fits.header[DATE_OBS] 

325 ftime = datetime.strptime(ftime, '%Y-%m-%dT%H:%M:%S.%f') 

326 delta = abs((start - ftime).total_seconds()) 

327 if delta < max_tdelta: 

328 log.info('Found frame %s at %s', os.path.basename(files[i]), ftime) 

329 return files[i:] 

330 raise InsufficientDataException( 

331 f'No frame found within {format(max_tdelta, ".4f")} s of {start}. Check the start time in your config file.' 

332 ) 

333 

334 def _collect_files(self, start: datetime, stop: datetime = None, avg: bool = False) -> list: 

335 if avg: 

336 file_pattern = f'*-{self.config.cam.name}{self.config.data.ext}' 

337 else: 

338 file_pattern = f'{self.config.data.file_prefix}_*_{self.config.cam.name}{self.config.data.ext}' 

339 log.debug('Collecting files matching: %s', file_pattern) 

340 file_lists = [] 

341 for path in self.data_paths(start, stop): 

342 if self.remote_db: 

343 files = self._find_files_remotely(path, file_pattern) 

344 else: 

345 files = self._find_files_locally(path, file_pattern) 

346 log.debug('\t ... found %s file(s)', len(files)) 

347 if len(files) > 0: 

348 file_lists.append(files) 

349 return file_lists 

350 

351 def _find_files_remotely(self, path: str, file_pattern: str) -> list: 

352 cmd = f"find {os.path.join(path, '.')} -name \'{file_pattern}\'" 

353 log.debug('Executing %s on %s', cmd, self.remote_db.ssh) 

354 ssh = subprocess.Popen( 

355 ["ssh", "%s" % self.remote_db.ssh, cmd], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

356 ) 

357 

358 files = ssh.stdout.readlines() 

359 files = [f.decode('UTF-8').splitlines()[0].replace(self.remote_db.base, self.config.data.root) for f in files] 

360 return sorted(files) 

361 

362 def _find_files_locally(self, path: str, file_pattern: str) -> list: 

363 log.debug('Entering %s', path) 

364 return sorted(glob.glob(os.path.join(path, file_pattern))) 

365 

366 def _reject_non_matching(self, file_lists: list, start: datetime, stop: datetime = None, avg: bool = False) -> list: 

367 log.debug('rejecting files that do not match conditions...') 

368 file_lists = Collections.flatten_sort(file_lists) 

369 FileDB._reject_pre_start(file_lists, start, avg=avg) 

370 FileDB._reject_post_end(file_lists, stop, avg=avg) 

371 return file_lists 

372 

373 @staticmethod 

374 def _reject_pre_start(files: list, start: datetime, avg: bool = False) -> None: 

375 """ 

376 removes the entries from the given files list, where the timestamps taken 

377 from the file names (s resolution) are earlier than the lookup start time. 

378 I.e. timestamps of files in first folder must be > start time. 

379 For average files (avg=True), it considers the start (first) timestamp from the file name. 

380 

381 Note: Rejection will be performed inplace. 

382 """ 

383 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) < start] 

384 

385 for index in sorted(rejected, reverse=True): 

386 del files[index] 

387 

388 @staticmethod 

389 def _reject_post_end(files: list, stop: datetime, avg: bool = False) -> None: 

390 """ 

391 removes the entries from the given files list, where the timestamps taken from the file names (s resolution) are 

392 later than the lookup end time. I.e. timestamps of files in last folder must be < end time. 

393 For average files (avg=True), it considers the end timestamp from the file name. 

394 

395 Note: Rejection will be performed inplace. 

396 """ 

397 if stop is None: 

398 return 

399 

400 rejected = [i for i in range(len(files)) if FileDB.file_time(files[i], avg=avg) > stop] 

401 for index in sorted(rejected, reverse=True): 

402 del files[index] 

403 

404 def check_obs_intervals( 

405 self, 

406 utc_start: str = None, 

407 utc_stop: str = None, 

408 file_list: Iterable = None, 

409 max_gap: int = 3, 

410 raise_error: bool = False, 

411 out_dir: str = None, 

412 ) -> list: 

413 """ 

414 Use it to check the number of different observations recorded during 

415 the input time interval (or files list) 

416 

417 Returns an empty list if there is no time gap 

418 larger than criterion [s] between consecutive frames found in 

419 the database from utc_start to utc_stop. The frames timestamps 

420 are taken from the file names. 

421 

422 Otherwise, returns and logs a list of dict, each giving 

423 the starting and ending file names of the detected observing intervals 

424 (observations). It also plots to a png file and saves the list in self.config.log_dir 

425 

426 ## Params 

427 - utc_start. Start time in ISO8601, e.g., 2021-11-11T40:00:00 

428 - utc_stop. End time in ISO8601, if not set, only all files 

429 acquired at utc_start are returned 

430 - file_list If given `utc_start` and `utc_stop` is ignored. The dataset within the file_list is 

431 - max_gap The maximum time gap that separates frames from another within the same observation in seconds 

432 if this criterion is exceeded the dataset is expected to span multiple observations 

433 - raise_error If `True` (default `False`) then an error is raised if more than one observation is found. 

434 """ 

435 

436 if utc_start is not None: 

437 files = self.search_files(utc_start, utc_stop=utc_stop) 

438 elif file_list is not None: 

439 files = file_list 

440 else: 

441 raise IllegalArgumentException('Either utc_start or file_list must be specified... ') 

442 

443 times = [FileDB.file_time(os.path.basename(f)) for f in files] 

444 obs_times = [] 

445 obs_files = [] 

446 obs_id = 0 

447 for i in range(len(times) - 1): 

448 time_delta = times[i + 1] - times[i] 

449 if time_delta.seconds > max_gap: 

450 if obs_id == 0: 

451 # first interval 

452 obs_times.append({'start': times[0], 'stop': times[i]}) 

453 obs_files.append({'start': files[0], 'stop': files[i]}) 

454 else: 

455 # intermideate intervals 

456 obs_times.append({'start': times[obs_id + 1], 'stop': times[i]}) 

457 obs_files.append({'start': files[obs_id + 1], 'stop': files[i]}) 

458 obs_id = copy.copy(i) 

459 # adds last interval 

460 if len(obs_times) > 0: 

461 obs_times.append({'start': times[obs_id + 1], 'stop': times[-1]}) 

462 obs_files.append({'start': files[obs_id + 1], 'stop': files[-1]}) 

463 

464 if len(obs_times) > 1: 

465 log.error('Found %s different observing intervals:', len(obs_times)) 

466 for i in range(len(obs_times)): 

467 log.error(' %s -- %s', os.path.basename(obs_files[i]['start']), os.path.basename(obs_files[i]['stop'])) 

468 mins = min(times).strftime('%Y%m%dT%H%M%S') 

469 maxs = max(times).strftime('%Y%m%dT%H%M%S') 

470 

471 if out_dir is not None: 

472 cam = self.config.cam.name 

473 ofile_img = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.png') 

474 ofile_list = os.path.join(out_dir, f'check_obs_intervals_{mins}_{maxs}_{cam}.pkl') 

475 log.error('Dumping observed intervals to %s and %s ...', ofile_img, ofile_list) 

476 if len(obs_times) < 100: 

477 FileDB._plot_obs_intervals(ofile_img, times, obs_times) 

478 else: 

479 log.warning('Too many observations, I will skip the plot but still save the list.') 

480 FileDB._dump_obs_files(ofile_list, files, obs_files, obs_times) 

481 if raise_error: 

482 log.critical(f'Found {len(obs_times)} observations in the dataset. Only one was expected.') 

483 if out_dir is not None: 

484 log.critical('Details can be found in') 

485 log.critical('\t %s', ofile_img) 

486 log.critical('\t %s', ofile_list) 

487 raise ValueError(f'Found {len(obs_times)} observations in the dataset.') 

488 

489 return obs_files 

490 

491 @staticmethod 

492 def _plot_obs_intervals(out_file: str, times: list, obs_times: list) -> None: 

493 with matplot_backend('Agg'): 

494 fig = plt.figure(figsize=(15, 7)) 

495 fig.suptitle('SUSI observed intervals\nIf the time gap is not visible follow the red lines') 

496 plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%dT%H%M%S')) 

497 plt.gca().xaxis.set_major_locator(mdates.MinuteLocator()) 

498 plt.plot(times, np.ones(len(times)), '*') 

499 plt.gcf().autofmt_xdate() 

500 plt.xlabel('Frame time from file name (1[s] resolution)') 

501 

502 for i in range(len(obs_times)): 

503 plt.axvline(x=obs_times[i]['start'], color='r', linestyle='--') 

504 plt.axvline(x=obs_times[i]['stop'], color='r', linestyle='--') 

505 

506 plt.savefig(out_file) 

507 plt.close() 

508 

509 @staticmethod 

510 def _dump_obs_files(out_file: str, files: list, obs_files: list, obs_times: list) -> None: 

511 obs_file_path = out_file.split('.')[0] + '_reduced.pkl' 

512 with open(out_file, 'wb', 0o775) as handle: 

513 all_ofile_list = [] 

514 for i in range(len(obs_times)): 

515 starti = files.index(obs_files[i]['start']) 

516 stopi = files.index(obs_files[i]['stop']) 

517 all_ofile_list.append(files[starti:stopi]) 

518 pickle.dump(all_ofile_list, handle, protocol=pickle.HIGHEST_PROTOCOL) 

519 handle.close() 

520 

521 with open(obs_file_path, 'wb', 0o775) as handle: 

522 pickle.dump(obs_files, handle, protocol=pickle.HIGHEST_PROTOCOL) 

523 handle.close() 

524 

525 @staticmethod 

526 def fname_tstmp(fname: str, tstmp_start: int, tstmp_end: int, avg: bool = False) -> int: 

527 """Cutout the timestamp part of the file name and return it as int""" 

528 if avg: 

529 delta = len('HHmmSS_XXXXXXX-YYYYMMDD_') 

530 return [ 

531 int(os.path.basename(fname)[tstmp_start:tstmp_end]), 

532 int(os.path.basename(fname)[tstmp_start + delta : tstmp_end + delta]), 

533 ] 

534 else: 

535 return int(os.path.basename(fname)[tstmp_start:tstmp_end]) 

536 

537 @staticmethod 

538 def tstmp_path(root: str, time: datetime, cam: str) -> str: 

539 """Generates a file path within the given root folder based on time and cam""" 

540 return os.path.join(root, time.strftime('%Y_%m_%d'), time.strftime('%Y_%m_%d_%H'), cam) 

541 

542 @staticmethod 

543 def fname_img_number(fname: str, avg: bool = False) -> int: 

544 """ 

545 Returns the image number from the file name. 

546 If avg is True, it returns the both image numbers in the average file name. 

547 """ 

548 if avg: 

549 return [ 

550 int(os.path.basename(fname).split('-')[0].split('_')[2]), 

551 int(os.path.basename(fname).split('-')[1].split('_')[2]), 

552 ] 

553 else: 

554 return int(os.path.basename(fname).split('_')[3])