Coverage for src/susi/reduc/pipeline/blocks/block_b.py: 49%

110 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-08-22 09:20 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the block `B` of the susi pipeline. 

5 

6@author: hoelken 

7""" 

8import numpy as np 

9from astropy.io import fits 

10 

11from .block import Block, BlockRegistry 

12from .block_d import BlockD 

13from ..processing_data import ProcessingData 

14from ....db import FileDB 

15from ....io import FitsBatch, Fits, Card 

16from ....base import Logging, Globals 

17from ....utils import Collections 

18 

19from ....base.header_keys import * 

20 

21log = Logging.get_logger() 

22 

23 

24class BlockB(Block): 

25 """ 

26 ## BLOCK B: Binning 

27 

28 This block takes care of binning data 

29 - temporal / x-axis binning (averaging frames) 

30 - spacial / wl binning (averaging rows and columns of frames) 

31 """ 

32 

33 BLOCK_ID = 'B' 

34 

35 @staticmethod 

36 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch: 

37 """ 

38 Callback-like entry point for the block 

39 """ 

40 return BlockB(batch, proc_data).run().result 

41 

42 def _algorithm(self): 

43 self._average_frames() 

44 for entry in self.batch: 

45 self.result.batch.append(self._process_entry(entry)) 

46 

47 def _process_entry(self, fits: Fits) -> dict: 

48 return { 

49 'file': fits.path, 

50 'data': self._modify_data(fits), 

51 'header': self._modify_header(fits), 

52 } 

53 

54 def _average_frames(self): 

55 if self.proc_data.config.cam.temporal_binning == 1: 

56 # Nothing to do 

57 return 

58 

59 if self.batch.is_applied(BlockD.BLOCK_ID): 

60 self.demodulated_avrg() 

61 elif self.proc_data.config.spol.ignore_mod_states: 

62 self._avrg_no_state() 

63 else: 

64 self._avrg_per_state() 

65 

66 def demodulated_avrg(self): 

67 # Demodulated data has another shape. I.e. it has an extra dimension with an entry for each stokes image. 

68 new_batch = FitsBatch() 

69 data_i = np.average([e['data'][0] for e in self.batch.batch], axis=0) 

70 data_q = np.average([e['data'][1] for e in self.batch.batch], axis=0) 

71 data_u = np.average([e['data'][2] for e in self.batch.batch], axis=0) 

72 data_v = np.average([e['data'][3] for e in self.batch.batch], axis=0) 

73 begin_frame = self.batch.batch[0]['file'] 

74 end_frame = self.batch.batch[-1]['file'] 

75 new_batch.batch.append( 

76 { 

77 'file': self._avrg_fname(fnames=[begin_frame, end_frame], avg=True), 

78 'data': np.array([data_i, data_q, data_u, data_v]), 

79 'header': self._merged_header(self.batch.batch[0]['header']), 

80 } 

81 ) 

82 self.batch = new_batch 

83 

84 def _avrg_no_state(self) -> None: 

85 new_batch = FitsBatch() 

86 if self.proc_data.config.cam.temporal_binning > 0: 

87 blocks = np.split(np.arange(len(self.batch)), len(self.batch) // self.proc_data.config.cam.temporal_binning) 

88 else: 

89 blocks = [np.arange(len(self.batch))] 

90 for block in blocks: 

91 new_batch.batch.append( 

92 { 

93 'file': self._avrg_fname(block), 

94 'header': self._merged_header(self.batch.batch[block[0]]['header']), 

95 'data': np.average([self.batch.batch[i]['data'] for i in block], axis=0), 

96 } 

97 ) 

98 self.batch = new_batch 

99 

100 def _avrg_per_state(self) -> None: 

101 # The batch that was prepared by the `Chunker` has already the right size. 

102 # We just have to take care for mod states if necesary. 

103 new_batch = FitsBatch() 

104 for state in BlockB._state_split(self.batch): 

105 new_batch.batch.append( 

106 { 

107 'file': self._avrg_fname([state[0]['file'], state[-1]['file']]), 

108 'data': np.average([s['data'] for s in state], axis=0), 

109 'header': self._merged_header(state[0]['header']), 

110 } 

111 ) 

112 self.batch = new_batch 

113 

114 @staticmethod 

115 def _state_split(batch: FitsBatch) -> list: 

116 state_data = [[] for _ in range(Globals.MOD_CYCLE_FRAMES)] 

117 for e in batch.batch: 

118 state_data[int(e['header'][MOD_STATE])].append(e) 

119 return state_data 

120 

121 def _merged_header(self, header): 

122 new_header = fits.PrimaryHDU().header 

123 for key in self.proc_data.config.base.header_copy_fields: 

124 if key not in header: 

125 continue 

126 if self.proc_data.config.spol.ignore_mod_states and key == MOD_STATE: 

127 new_header.append(Card(MOD_STATE, 'N/A').to_card()) 

128 continue 

129 new_header.append(Card.from_orig(header, key).to_card()) 

130 return new_header 

131 

132 @staticmethod 

133 def _state_avrg(data: tuple) -> tuple: 

134 return data[0], np.average(data[1], axis=0) 

135 

136 def _avrg_fname(self, block=None, fnames=None, avg=False): 

137 if block is not None: 

138 f_start = self.batch[block[0]]['file'] 

139 f_end = self.batch[block[-1]]['file'] 

140 elif fnames is not None: 

141 f_start = fnames[0] 

142 f_end = fnames[-1] 

143 else: 

144 f_start = self.batch[0]['file'] 

145 f_end = self.batch[-1]['file'] 

146 return self.db.avrg_fname(f_start, f_end, avg=avg) 

147 

148 def _modify_data(self, f: Fits): 

149 if len(f.data.shape) == 2: 

150 return Collections.bin(f.data, self.proc_data.config.cam.spacial_binning) 

151 

152 data = np.array( 

153 [Collections.bin(f.data[s], self.proc_data.config.cam.spacial_binning) for s in range(f.data.shape[0])] 

154 ) 

155 return data 

156 

157 def _modify_header(self, frame: Fits): 

158 Fits.override_header(frame.header, Block.BLOCKS_APPLIED, BlockB.BLOCK_ID, append=True) 

159 self._update_rms_snr_mean(frame) 

160 frame.header.append( 

161 Card( 

162 SPATIAL_BIN, 

163 value=','.join([str(i) for i in self.proc_data.config.cam.spacial_binning]), 

164 comment='y,x binning applied', 

165 ).to_card() 

166 ) 

167 frame.header.append( 

168 Card( 

169 TEMPORAL_BIN, value=self.proc_data.config.cam.temporal_binning, comment='No. of averaged frames' 

170 ).to_card() 

171 ) 

172 return frame.header 

173 

174 @staticmethod 

175 def predict_output(batch: FitsBatch, proc_data: ProcessingData): 

176 new_batch = FitsBatch() 

177 db = FileDB(proc_data.config) 

178 if batch.is_applied(BlockD.BLOCK_ID): 

179 new_batch.batch.append( 

180 { 

181 'file': db.avrg_fname(batch.batch[0]['file'], batch.batch[-1]['file']), 

182 'header': batch.batch[0]['header'], 

183 'data': None, 

184 } 

185 ) 

186 elif proc_data.config.spol.ignore_mod_states: 

187 if proc_data.config.cam.temporal_binning > 0: 

188 blocks = np.split(np.arange(len(batch)), len(batch) // proc_data.config.cam.temporal_binning) 

189 else: 

190 blocks = [np.arange(len(batch))] 

191 for block in blocks: 

192 new_batch.batch.append( 

193 { 

194 'file': db.avrg_fname(batch.batch[block[0]]['file'], batch.batch[block[-1]]['file']), 

195 'header': batch.batch[block[0]]['header'], 

196 'data': None, 

197 } 

198 ) 

199 else: 

200 for state in BlockB._state_split(batch): 

201 new_batch.batch.append( 

202 { 

203 'file': db.avrg_fname(state[0]['file'], state[-1]['file']), 

204 'header': state[0]['header'], 

205 'data': None, 

206 } 

207 ) 

208 return new_batch 

209 

210 

211BlockRegistry().register(BlockB.BLOCK_ID, BlockB)