Coverage for src/susi/reduc/pipeline/blocks/block_b.py: 49%
110 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-22 09:20 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-22 09:20 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the block `B` of the susi pipeline.
6@author: hoelken
7"""
8import numpy as np
9from astropy.io import fits
11from .block import Block, BlockRegistry
12from .block_d import BlockD
13from ..processing_data import ProcessingData
14from ....db import FileDB
15from ....io import FitsBatch, Fits, Card
16from ....base import Logging, Globals
17from ....utils import Collections
19from ....base.header_keys import *
21log = Logging.get_logger()
24class BlockB(Block):
25 """
26 ## BLOCK B: Binning
28 This block takes care of binning data
29 - temporal / x-axis binning (averaging frames)
30 - spacial / wl binning (averaging rows and columns of frames)
31 """
33 BLOCK_ID = 'B'
35 @staticmethod
36 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch:
37 """
38 Callback-like entry point for the block
39 """
40 return BlockB(batch, proc_data).run().result
42 def _algorithm(self):
43 self._average_frames()
44 for entry in self.batch:
45 self.result.batch.append(self._process_entry(entry))
47 def _process_entry(self, fits: Fits) -> dict:
48 return {
49 'file': fits.path,
50 'data': self._modify_data(fits),
51 'header': self._modify_header(fits),
52 }
54 def _average_frames(self):
55 if self.proc_data.config.cam.temporal_binning == 1:
56 # Nothing to do
57 return
59 if self.batch.is_applied(BlockD.BLOCK_ID):
60 self.demodulated_avrg()
61 elif self.proc_data.config.spol.ignore_mod_states:
62 self._avrg_no_state()
63 else:
64 self._avrg_per_state()
66 def demodulated_avrg(self):
67 # Demodulated data has another shape. I.e. it has an extra dimension with an entry for each stokes image.
68 new_batch = FitsBatch()
69 data_i = np.average([e['data'][0] for e in self.batch.batch], axis=0)
70 data_q = np.average([e['data'][1] for e in self.batch.batch], axis=0)
71 data_u = np.average([e['data'][2] for e in self.batch.batch], axis=0)
72 data_v = np.average([e['data'][3] for e in self.batch.batch], axis=0)
73 begin_frame = self.batch.batch[0]['file']
74 end_frame = self.batch.batch[-1]['file']
75 new_batch.batch.append(
76 {
77 'file': self._avrg_fname(fnames=[begin_frame, end_frame], avg=True),
78 'data': np.array([data_i, data_q, data_u, data_v]),
79 'header': self._merged_header(self.batch.batch[0]['header']),
80 }
81 )
82 self.batch = new_batch
84 def _avrg_no_state(self) -> None:
85 new_batch = FitsBatch()
86 if self.proc_data.config.cam.temporal_binning > 0:
87 blocks = np.split(np.arange(len(self.batch)), len(self.batch) // self.proc_data.config.cam.temporal_binning)
88 else:
89 blocks = [np.arange(len(self.batch))]
90 for block in blocks:
91 new_batch.batch.append(
92 {
93 'file': self._avrg_fname(block),
94 'header': self._merged_header(self.batch.batch[block[0]]['header']),
95 'data': np.average([self.batch.batch[i]['data'] for i in block], axis=0),
96 }
97 )
98 self.batch = new_batch
100 def _avrg_per_state(self) -> None:
101 # The batch that was prepared by the `Chunker` has already the right size.
102 # We just have to take care for mod states if necesary.
103 new_batch = FitsBatch()
104 for state in BlockB._state_split(self.batch):
105 new_batch.batch.append(
106 {
107 'file': self._avrg_fname([state[0]['file'], state[-1]['file']]),
108 'data': np.average([s['data'] for s in state], axis=0),
109 'header': self._merged_header(state[0]['header']),
110 }
111 )
112 self.batch = new_batch
114 @staticmethod
115 def _state_split(batch: FitsBatch) -> list:
116 state_data = [[] for _ in range(Globals.MOD_CYCLE_FRAMES)]
117 for e in batch.batch:
118 state_data[int(e['header'][MOD_STATE])].append(e)
119 return state_data
121 def _merged_header(self, header):
122 new_header = fits.PrimaryHDU().header
123 for key in self.proc_data.config.base.header_copy_fields:
124 if key not in header:
125 continue
126 if self.proc_data.config.spol.ignore_mod_states and key == MOD_STATE:
127 new_header.append(Card(MOD_STATE, 'N/A').to_card())
128 continue
129 new_header.append(Card.from_orig(header, key).to_card())
130 return new_header
132 @staticmethod
133 def _state_avrg(data: tuple) -> tuple:
134 return data[0], np.average(data[1], axis=0)
136 def _avrg_fname(self, block=None, fnames=None, avg=False):
137 if block is not None:
138 f_start = self.batch[block[0]]['file']
139 f_end = self.batch[block[-1]]['file']
140 elif fnames is not None:
141 f_start = fnames[0]
142 f_end = fnames[-1]
143 else:
144 f_start = self.batch[0]['file']
145 f_end = self.batch[-1]['file']
146 return self.db.avrg_fname(f_start, f_end, avg=avg)
148 def _modify_data(self, f: Fits):
149 if len(f.data.shape) == 2:
150 return Collections.bin(f.data, self.proc_data.config.cam.spacial_binning)
152 data = np.array(
153 [Collections.bin(f.data[s], self.proc_data.config.cam.spacial_binning) for s in range(f.data.shape[0])]
154 )
155 return data
157 def _modify_header(self, frame: Fits):
158 Fits.override_header(frame.header, Block.BLOCKS_APPLIED, BlockB.BLOCK_ID, append=True)
159 self._update_rms_snr_mean(frame)
160 frame.header.append(
161 Card(
162 SPATIAL_BIN,
163 value=','.join([str(i) for i in self.proc_data.config.cam.spacial_binning]),
164 comment='y,x binning applied',
165 ).to_card()
166 )
167 frame.header.append(
168 Card(
169 TEMPORAL_BIN, value=self.proc_data.config.cam.temporal_binning, comment='No. of averaged frames'
170 ).to_card()
171 )
172 return frame.header
174 @staticmethod
175 def predict_output(batch: FitsBatch, proc_data: ProcessingData):
176 new_batch = FitsBatch()
177 db = FileDB(proc_data.config)
178 if batch.is_applied(BlockD.BLOCK_ID):
179 new_batch.batch.append(
180 {
181 'file': db.avrg_fname(batch.batch[0]['file'], batch.batch[-1]['file']),
182 'header': batch.batch[0]['header'],
183 'data': None,
184 }
185 )
186 elif proc_data.config.spol.ignore_mod_states:
187 if proc_data.config.cam.temporal_binning > 0:
188 blocks = np.split(np.arange(len(batch)), len(batch) // proc_data.config.cam.temporal_binning)
189 else:
190 blocks = [np.arange(len(batch))]
191 for block in blocks:
192 new_batch.batch.append(
193 {
194 'file': db.avrg_fname(batch.batch[block[0]]['file'], batch.batch[block[-1]]['file']),
195 'header': batch.batch[block[0]]['header'],
196 'data': None,
197 }
198 )
199 else:
200 for state in BlockB._state_split(batch):
201 new_batch.batch.append(
202 {
203 'file': db.avrg_fname(state[0]['file'], state[-1]['file']),
204 'header': state[0]['header'],
205 'data': None,
206 }
207 )
208 return new_batch
211BlockRegistry().register(BlockB.BLOCK_ID, BlockB)