Coverage for src/susi/reduc/pipeline/blocks/block_c.py: 90%
81 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-22 09:20 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-22 09:20 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the block `C` of the susi pipeline.
6@author: hoelken
7"""
8from __future__ import annotations
10import os
12import numpy as np
14from ....io import FitsBatch, Fits, Card
15from ....base import Logging
16from ....utils.timestamps import Timestamps
18from ...fields import DFCorrector, SHIPXCorrector
19from ...hot_pixels import HOTPXCorrector
20from ...shear_distortion import ShearAndRotCorrector
21from ..processing_data import ProcessingData
22from .block import Block, BlockRegistry
23from ....analyse.slit_mask_border import GetSlitMaskBorder_v2
24from ....base.header_keys import *
26log = Logging.get_logger()
29class BlockC(Block):
30 """
31 ## BLOCK C: Camera Calibration
33 This block takes care of the camera calibration steps (in order):
34 - Camera HK Header decoding
35 - dark correction (optional)
36 - shielded pixel correction (optional if mode N/A)
37 - Hot pixel correction (optional)
38 - TODO: Non-Linearity
39 - Cropping to lid area
40 - Shear distortion correction (optional)
41 - Estimation of the slit flat shift. Currently by locating the slit mask border (optional, only for cams 1 and 2)
42 - TODO: Amend header by system HK
43 """
45 BLOCK_ID = "C"
47 @staticmethod
48 def input_needed(cam3: bool = False) -> list:
49 return ["dark_image"]
51 @staticmethod
52 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch:
53 """
54 Callback-like entry point for the block
55 """
56 return BlockC(batch, proc_data).run().result
58 def _algorithm(self):
59 for entry in self.batch:
60 self.result.batch.append(self._process_entry(entry))
62 def _process_entry(self, fits: Fits) -> dict:
63 if HK_PMU_ANG not in fits.header:
64 # Decode the housekeeping data before everything else.
65 fits.append_hk_keys()
66 return {
67 "file": fits.path,
68 "data": self._modify_data(fits),
69 "header": self._modify_header(fits),
70 }
72 def _modify_data(self, fits: Fits):
73 self.roi = None # reset roi for each fits file
74 if self.proc_data.config.base.no_dark_correction:
75 fits.set_data([fits.data])
76 else:
77 fits.set_data(DFCorrector(self.proc_data.dark_image, fits).run(self.proc_data.config.base.check_header))
78 fits.set_data(SHIPXCorrector(self.proc_data.config, fits).run())
79 fits.set_data(self._crop2lid(fits))
80 if self.proc_data.config.cam.hot_px_mode != "N/A":
81 fits.set_data(HOTPXCorrector(self.proc_data.config, fits).run())
82 if self.proc_data.config.base.shear_and_rot_correction and self.proc_data.config.cam.name in ["cam1", "cam2"]:
83 fits.set_data(self._shear_corr(fits))
84 return fits.data
86 def _shear_corr(self, fits: Fits) -> np.array:
87 corrector = ShearAndRotCorrector(fits, self.proc_data.shear_factor, self.proc_data.rot_angle)
88 modif_data = corrector.run()
89 self._update_roi(other_slice=corrector.out_roi)
90 return modif_data
92 def _crop2lid(self, fits: Fits) -> np.array:
93 if self.proc_data.config.base.no_auto_cropping:
94 return fits.data
95 # check frames are larger than lid area
96 lid_area_size = [self.proc_data.config.cam.lid_area[0].stop - self.proc_data.config.cam.lid_area[0].start,
97 self.proc_data.config.cam.lid_area[1].stop - self.proc_data.config.cam.lid_area[1].start]
98 if fits.data.shape[1] < lid_area_size[0] or fits.data.shape[2] < lid_area_size[1]:
99 raise ValueError(f"Frame dimensions {fits.data.shape} smaller than lid area {lid_area_size}")
100 # During dark correction we ensured that the shape is (1, y, x), thus we need to add a dimension here.
101 x_shape = self.proc_data.config.cam.lid_area[1]
102 y_shape = self.proc_data.config.cam.lid_area[0]
103 self._update_roi(other_slice=self.proc_data.config.cam.lid_area)
104 return fits.data[(slice(None, None), y_shape, x_shape)]
106 def _modify_header(self, fits: Fits):
107 Fits.override_header(fits.header, BLOCKS_APPLIED, BlockC.BLOCK_ID, append=True)
108 if fits.data.shape[1] > 1000:
109 rms_roi = (
110 slice(0, 1),
111 slice(100, fits.data.shape[1] - 50),
112 slice(None, None),
113 )
114 else:
115 rms_roi = None
117 self._update_rms_snr_mean(fits, rms_roi)
119 if self.proc_data.config.base.no_dark_correction:
120 fits.header.append(Card(DARK_IMAGE, value="None applied").to_card())
121 else:
122 # cut extension to fit the file name into the header key.
123 dark_file = os.path.basename(self.proc_data.dark_image.path).split(".")[0]
124 fits.header.append(Card(DARK_IMAGE, value=dark_file).to_card())
126 if DATE_OBS not in fits.header:
127 fits.header.append(Card(DATE_OBS, Timestamps.tstmp2datetime(fits.header[TIMESTAMP_US], "us")).to_card())
129 fits.header.append(Card(SHPX_MODE, value=self.proc_data.config.cam.shielded_px_mode).to_card())
130 fits.header.append(Card(HOTPX_MODE, value=self.proc_data.config.cam.hot_px_mode).to_card())
132 self._update_roi_header(fits.header)
134 if self.proc_data.config.base.shear_and_rot_correction:
135 fits.header.append(Card(SHEAR_CORR, value=self.proc_data.shear_factor).to_card())
136 fits.header.append(Card(ROTATION_ANG, value=self.proc_data.rot_angle, comment="deg").to_card())
138 if self.proc_data.config.base.slit_flat_shift_analysis and self.proc_data.config.cam.name in ["cam1", "cam2"]:
139 slit_mask_border = GetSlitMaskBorder_v2().run_img(fits.data[0])
140 fits.header.append(
141 Card(
142 SLIT_FLAT_OFFSET, value=slit_mask_border[1] + self.roi["y0"], comment="[px] rel. to (0,0)"
143 ).to_card()
144 )
145 fits.header.append(Card(SLIT_FLAT_SLOPE, value=slit_mask_border[0]).to_card())
147 return fits.header
150BlockRegistry().register(BlockC.BLOCK_ID, BlockC)