Coverage for src/susi/reduc/pipeline/blocks/block_m.py: 84%

76 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the block `M` of the susi pipeline. 

5 

6@author: hoelken 

7""" 

8import os 

9 

10import numpy as np 

11from spectroflat.smile import SmileInterpolator 

12 

13from .block import Block, BlockRegistry 

14from ..processing_data import ProcessingData 

15from ....base import Logging, IllegalStateException, Globals 

16from ....base.header_keys import * 

17from ....io import FitsBatch, Fits, Card 

18from ....utils import Collections 

19from ....utils.cropping import adapt_shapes 

20from ....utils.header_checks import check_same_binning 

21 

22log = Logging.get_logger() 

23 

24 

25class BlockM(Block): 

26 """ 

27 ## BLOCK M: Morphological Operations 

28 

29 This block takes care of morphological calibration steps: 

30 - Smile Correction 

31 - TODO Support data binned in spatial and/or spectral dimension 

32 - TODO Image Registration 

33 - TODO Wavelength calibration (using amended offset map) 

34 """ 

35 

36 BLOCK_ID = 'M' 

37 

38 @staticmethod 

39 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch: 

40 """ 

41 Callback-like entry point for the block 

42 """ 

43 return BlockM(batch, proc_data).run().result 

44 

45 def _algorithm(self): 

46 if self.proc_data.config.is_cam3(): 

47 log.debug('CAM3 run: Morphological operations skipped...') 

48 self.result = self.batch 

49 return 

50 

51 self._get_roi() 

52 self._adapt_shapes() 

53 for entry in self.batch: 

54 self.result.batch.append(self._process_entry(entry)) 

55 

56 def _process_entry(self, fits: Fits) -> dict: 

57 return { 

58 'file': fits.path, 

59 'data': self._modify_data(fits), 

60 'header': self._modify_header(fits), 

61 } 

62 

63 def _modify_data(self, fits: Fits): 

64 if self.proc_data.offset_map.map.shape[0] == 1: 

65 self.proc_data.offset_map.map = np.repeat(self.proc_data.offset_map.map, Globals.MOD_CYCLE_FRAMES, axis=0) 

66 

67 sc = SmileInterpolator(self.proc_data.offset_map, fits.data[0], mod_state=int(fits.header[MOD_STATE])).run() 

68 return np.array([sc.result]) 

69 

70 def _modify_header(self, fits: Fits): 

71 Fits.override_header(fits.header, Block.BLOCKS_APPLIED, BlockM.BLOCK_ID, append=True) 

72 Fits.override_header( 

73 fits.header, OFFSET_MAP, value=os.path.basename(self.proc_data.offset_map.path).split('.')[0] 

74 ) 

75 self._update_roi_header(fits.header) 

76 

77 # check if wl-caelibrated offset map, ie. if DISPERSION is present 

78 if DISPERSION in self.proc_data.offset_map.header: 

79 Fits.override_header(fits.header, WL_CALIBRATED, value='TRUE') 

80 fits.header.append(Card(WL_CALIBRATED, value='TRUE').to_card()) 

81 fits.header.append(Card(DISPERSION, self.proc_data.offset_map.header[DISPERSION]).to_card()) 

82 fits.header.append(Card(MIN_WL_NM, self.proc_data.offset_map.header[MIN_WL_NM]).to_card()) 

83 fits.header.append(Card(MIN_WL_PX, self.proc_data.offset_map.header[MIN_WL_PX]).to_card()) 

84 fits.header.append(Card(MAX_WL_NM, self.proc_data.offset_map.header[MAX_WL_NM]).to_card()) 

85 fits.header.append(Card(MAX_WL_PX, self.proc_data.offset_map.header[MAX_WL_PX]).to_card()) 

86 else: 

87 Fits.override_header(fits.header, WL_CALIBRATED, value='FALSE') 

88 return fits.header 

89 

90 def _get_roi(self) -> None: 

91 h = self.proc_data.offset_map.header 

92 self._target_roi({'y0': int(h[ROI_Y0]), 'y1': int(h[ROI_Y1]), 'x0': int(h[ROI_X0]), 'x1': int(h[ROI_X1])}) 

93 

94 def _adapt_shapes(self): 

95 self.batch = adapt_shapes(self.batch, self.roi) 

96 h = self.proc_data.offset_map.header 

97 dx = int(h[ROI_X0]) if ROI_X0 in h else 0 

98 dy = int(h[ROI_Y0]) if ROI_Y0 in h else 0 

99 roi = ( 

100 slice(None, None), 

101 slice(self.roi['y0'] - dy, self.roi['y1'] - dy), 

102 slice(self.roi['x0'] - dx, self.roi['x1'] - dx), 

103 ) 

104 self.proc_data.offset_map.map = self.proc_data.offset_map.map[roi] 

105 

106 def _apply_binning_factor(self): 

107 check_same_binning(self.batch) 

108 bins = self.batch[0]['header'][SPATIAL_BIN] if SPATIAL_BIN in self.batch[0]['header'] else None 

109 if bins is None: 

110 return 

111 bins = bins.split(',') 

112 # Do not bin mod state dimension 

113 bins.insert(0, 1) 

114 self.proc_data.offset_map.map = Collections.bin(self.proc_data.offset_map.map, Collections.as_int_array(bins)) 

115 

116 @staticmethod 

117 def prepare(proc_data: ProcessingData) -> None: 

118 if not proc_data.config.is_cam3() and proc_data.offset_map is None: 

119 raise IllegalStateException('No OFFSET MAP given') 

120 

121 @staticmethod 

122 def input_needed(cam3: bool = False) -> list: 

123 if cam3: 

124 return [] 

125 else: 

126 return ['offset_map'] 

127 

128 

129BlockRegistry().register(BlockM.BLOCK_ID, BlockM)