Coverage for src/susi/reduc/pipeline/pre_processor.py: 78%
46 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the pre-processing part of the SUSI pipeline.
5Is responsible to provide the input for the following blocks
7@author: hoelken
8"""
9from __future__ import annotations
11import os.path
12from datetime import datetime
14from ...db import FileDB
15from ...io import FitsBatch, Fits
16from ...base import Logging
18from .blocks import BlockRegistry
19from .processing_data import ProcessingData
21from ...base.header_keys import *
23log = Logging.get_logger()
26class PreProcessor:
27 """
28 ## PreProcessor
29 The Pre-Processor is the preparation part of the pipeline.
30 It takes care of loading the given file list and setting up the callbacks for the processing blocks.
33 <pre>
34 ╭──────────────╮ ╭─────────────╮ ╭───────────────╮
35 │ PreProcessor │──>│ Processor │──>│ PostProcessor │
36 ╰──────────────╯ ╰─┬─────────┬─╯ ╰───────────────╯
37 ├ Block 1 ┤
38 ├ ....... ┤
39 ╰ Block N ╯
40 </pre>
42 # TODO: If all TARGET files exist and `overwrite=False` skip this block.
43 """
45 def __init__(self, batch: list, proc_data: ProcessingData, blocks: list):
46 self.files = batch
47 self.batch = FitsBatch()
48 self.blocks = blocks
49 self.proc_data = proc_data
50 self.existing_result = None
51 self.callbacks = []
53 def run(self) -> PreProcessor:
54 self._build_callbacks()
55 self._load_files()
56 self._check_skippable()
57 return self
59 def _build_callbacks(self):
60 self.callbacks = [BlockRegistry()[b].start for b in self.blocks]
62 def _load_files(self):
63 self.batch.load(self.files,
64 workers=self.proc_data.config.base.io_speed.value,
65 sort_by=self.proc_data.config.base.timestamp_field)
67 def _check_skippable(self):
68 """
69 Check if all files that would be generated by this module already exist.
70 If this is the case the existing_result will be set and the module can be skipped.
71 """
72 if self.proc_data.config.data.force_reprocessing:
73 # We must not be skipped!
74 return
76 db = FileDB(self.proc_data.config)
77 base_out = db.base_out_path(self.blocks)
78 if not os.path.exists(base_out):
79 # not even the target dir exists, so we have to compute
80 return
82 prediction = self.batch.header_copy()
83 for b in self.blocks:
84 prediction = BlockRegistry()[b].predict_output(prediction, self.proc_data)
85 for e in prediction.batch:
86 e['file'] = self._predict_file_name(base_out, db, e)
87 Fits.override_header(e['header'], BLOCKS_APPLIED, b, append=True)
88 if all(os.path.exists(e['file']) for e in prediction.batch):
89 self.existing_result = prediction
91 @staticmethod
92 def _predict_file_name(base: str, db: FileDB, entry: dict):
93 return os.path.join(
94 base,
95 db.dir_structure(datetime.fromisoformat(entry['header'][DATE_OBS]), 'day-hour-cam'),
96 os.path.basename(entry['file'])
97 )