Coverage for src/susi/reduc/pipeline/blocks/block_c.py: 90%

81 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-08-22 09:20 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the block `C` of the susi pipeline. 

5 

6@author: hoelken 

7""" 

8from __future__ import annotations 

9 

10import os 

11 

12import numpy as np 

13 

14from ....io import FitsBatch, Fits, Card 

15from ....base import Logging 

16from ....utils.timestamps import Timestamps 

17 

18from ...fields import DFCorrector, SHIPXCorrector 

19from ...hot_pixels import HOTPXCorrector 

20from ...shear_distortion import ShearAndRotCorrector 

21from ..processing_data import ProcessingData 

22from .block import Block, BlockRegistry 

23from ....analyse.slit_mask_border import GetSlitMaskBorder_v2 

24from ....base.header_keys import * 

25 

26log = Logging.get_logger() 

27 

28 

29class BlockC(Block): 

30 """ 

31 ## BLOCK C: Camera Calibration 

32 

33 This block takes care of the camera calibration steps (in order): 

34 - Camera HK Header decoding 

35 - dark correction (optional) 

36 - shielded pixel correction (optional if mode N/A) 

37 - Hot pixel correction (optional) 

38 - TODO: Non-Linearity 

39 - Cropping to lid area 

40 - Shear distortion correction (optional) 

41 - Estimation of the slit flat shift. Currently by locating the slit mask border (optional, only for cams 1 and 2) 

42 - TODO: Amend header by system HK 

43 """ 

44 

45 BLOCK_ID = "C" 

46 

47 @staticmethod 

48 def input_needed(cam3: bool = False) -> list: 

49 return ["dark_image"] 

50 

51 @staticmethod 

52 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch: 

53 """ 

54 Callback-like entry point for the block 

55 """ 

56 return BlockC(batch, proc_data).run().result 

57 

58 def _algorithm(self): 

59 for entry in self.batch: 

60 self.result.batch.append(self._process_entry(entry)) 

61 

62 def _process_entry(self, fits: Fits) -> dict: 

63 if HK_PMU_ANG not in fits.header: 

64 # Decode the housekeeping data before everything else. 

65 fits.append_hk_keys() 

66 return { 

67 "file": fits.path, 

68 "data": self._modify_data(fits), 

69 "header": self._modify_header(fits), 

70 } 

71 

72 def _modify_data(self, fits: Fits): 

73 self.roi = None # reset roi for each fits file 

74 if self.proc_data.config.base.no_dark_correction: 

75 fits.set_data([fits.data]) 

76 else: 

77 fits.set_data(DFCorrector(self.proc_data.dark_image, fits).run(self.proc_data.config.base.check_header)) 

78 fits.set_data(SHIPXCorrector(self.proc_data.config, fits).run()) 

79 fits.set_data(self._crop2lid(fits)) 

80 if self.proc_data.config.cam.hot_px_mode != "N/A": 

81 fits.set_data(HOTPXCorrector(self.proc_data.config, fits).run()) 

82 if self.proc_data.config.base.shear_and_rot_correction and self.proc_data.config.cam.name in ["cam1", "cam2"]: 

83 fits.set_data(self._shear_corr(fits)) 

84 return fits.data 

85 

86 def _shear_corr(self, fits: Fits) -> np.array: 

87 corrector = ShearAndRotCorrector(fits, self.proc_data.shear_factor, self.proc_data.rot_angle) 

88 modif_data = corrector.run() 

89 self._update_roi(other_slice=corrector.out_roi) 

90 return modif_data 

91 

92 def _crop2lid(self, fits: Fits) -> np.array: 

93 if self.proc_data.config.base.no_auto_cropping: 

94 return fits.data 

95 # check frames are larger than lid area 

96 lid_area_size = [self.proc_data.config.cam.lid_area[0].stop - self.proc_data.config.cam.lid_area[0].start, 

97 self.proc_data.config.cam.lid_area[1].stop - self.proc_data.config.cam.lid_area[1].start] 

98 if fits.data.shape[1] < lid_area_size[0] or fits.data.shape[2] < lid_area_size[1]: 

99 raise ValueError(f"Frame dimensions {fits.data.shape} smaller than lid area {lid_area_size}") 

100 # During dark correction we ensured that the shape is (1, y, x), thus we need to add a dimension here. 

101 x_shape = self.proc_data.config.cam.lid_area[1] 

102 y_shape = self.proc_data.config.cam.lid_area[0] 

103 self._update_roi(other_slice=self.proc_data.config.cam.lid_area) 

104 return fits.data[(slice(None, None), y_shape, x_shape)] 

105 

106 def _modify_header(self, fits: Fits): 

107 Fits.override_header(fits.header, BLOCKS_APPLIED, BlockC.BLOCK_ID, append=True) 

108 if fits.data.shape[1] > 1000: 

109 rms_roi = ( 

110 slice(0, 1), 

111 slice(100, fits.data.shape[1] - 50), 

112 slice(None, None), 

113 ) 

114 else: 

115 rms_roi = None 

116 

117 self._update_rms_snr_mean(fits, rms_roi) 

118 

119 if self.proc_data.config.base.no_dark_correction: 

120 fits.header.append(Card(DARK_IMAGE, value="None applied").to_card()) 

121 else: 

122 # cut extension to fit the file name into the header key. 

123 dark_file = os.path.basename(self.proc_data.dark_image.path).split(".")[0] 

124 fits.header.append(Card(DARK_IMAGE, value=dark_file).to_card()) 

125 

126 if DATE_OBS not in fits.header: 

127 fits.header.append(Card(DATE_OBS, Timestamps.tstmp2datetime(fits.header[TIMESTAMP_US], "us")).to_card()) 

128 

129 fits.header.append(Card(SHPX_MODE, value=self.proc_data.config.cam.shielded_px_mode).to_card()) 

130 fits.header.append(Card(HOTPX_MODE, value=self.proc_data.config.cam.hot_px_mode).to_card()) 

131 

132 self._update_roi_header(fits.header) 

133 

134 if self.proc_data.config.base.shear_and_rot_correction: 

135 fits.header.append(Card(SHEAR_CORR, value=self.proc_data.shear_factor).to_card()) 

136 fits.header.append(Card(ROTATION_ANG, value=self.proc_data.rot_angle, comment="deg").to_card()) 

137 

138 if self.proc_data.config.base.slit_flat_shift_analysis and self.proc_data.config.cam.name in ["cam1", "cam2"]: 

139 slit_mask_border = GetSlitMaskBorder_v2().run_img(fits.data[0]) 

140 fits.header.append( 

141 Card( 

142 SLIT_FLAT_OFFSET, value=slit_mask_border[1] + self.roi["y0"], comment="[px] rel. to (0,0)" 

143 ).to_card() 

144 ) 

145 fits.header.append(Card(SLIT_FLAT_SLOPE, value=slit_mask_border[0]).to_card()) 

146 

147 return fits.header 

148 

149 

150BlockRegistry().register(BlockC.BLOCK_ID, BlockC)