Coverage for src/susi/reduc/pipeline/pre_processor.py: 78%

46 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the pre-processing part of the SUSI pipeline. 

5Is responsible to provide the input for the following blocks 

6 

7@author: hoelken 

8""" 

9from __future__ import annotations 

10 

11import os.path 

12from datetime import datetime 

13 

14from ...db import FileDB 

15from ...io import FitsBatch, Fits 

16from ...base import Logging 

17 

18from .blocks import BlockRegistry 

19from .processing_data import ProcessingData 

20 

21from ...base.header_keys import * 

22 

23log = Logging.get_logger() 

24 

25 

26class PreProcessor: 

27 """ 

28 ## PreProcessor 

29 The Pre-Processor is the preparation part of the pipeline. 

30 It takes care of loading the given file list and setting up the callbacks for the processing blocks. 

31 

32 

33 <pre> 

34 ╭──────────────╮ ╭─────────────╮ ╭───────────────╮ 

35 │ PreProcessor │──>│ Processor │──>│ PostProcessor │ 

36 ╰──────────────╯ ╰─┬─────────┬─╯ ╰───────────────╯ 

37 ├ Block 1 ┤ 

38 ├ ....... ┤ 

39 ╰ Block N ╯ 

40 </pre> 

41 

42 # TODO: If all TARGET files exist and `overwrite=False` skip this block. 

43 """ 

44 

45 def __init__(self, batch: list, proc_data: ProcessingData, blocks: list): 

46 self.files = batch 

47 self.batch = FitsBatch() 

48 self.blocks = blocks 

49 self.proc_data = proc_data 

50 self.existing_result = None 

51 self.callbacks = [] 

52 

53 def run(self) -> PreProcessor: 

54 self._build_callbacks() 

55 self._load_files() 

56 self._check_skippable() 

57 return self 

58 

59 def _build_callbacks(self): 

60 self.callbacks = [BlockRegistry()[b].start for b in self.blocks] 

61 

62 def _load_files(self): 

63 self.batch.load(self.files, 

64 workers=self.proc_data.config.base.io_speed.value, 

65 sort_by=self.proc_data.config.base.timestamp_field) 

66 

67 def _check_skippable(self): 

68 """ 

69 Check if all files that would be generated by this module already exist. 

70 If this is the case the existing_result will be set and the module can be skipped. 

71 """ 

72 if self.proc_data.config.data.force_reprocessing: 

73 # We must not be skipped! 

74 return 

75 

76 db = FileDB(self.proc_data.config) 

77 base_out = db.base_out_path(self.blocks) 

78 if not os.path.exists(base_out): 

79 # not even the target dir exists, so we have to compute 

80 return 

81 

82 prediction = self.batch.header_copy() 

83 for b in self.blocks: 

84 prediction = BlockRegistry()[b].predict_output(prediction, self.proc_data) 

85 for e in prediction.batch: 

86 e['file'] = self._predict_file_name(base_out, db, e) 

87 Fits.override_header(e['header'], BLOCKS_APPLIED, b, append=True) 

88 if all(os.path.exists(e['file']) for e in prediction.batch): 

89 self.existing_result = prediction 

90 

91 @staticmethod 

92 def _predict_file_name(base: str, db: FileDB, entry: dict): 

93 return os.path.join( 

94 base, 

95 db.dir_structure(datetime.fromisoformat(entry['header'][DATE_OBS]), 'day-hour-cam'), 

96 os.path.basename(entry['file']) 

97 )