Coverage for src/susi/reduc/pipeline/blocks/block_m.py: 84%
76 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the block `M` of the susi pipeline.
6@author: hoelken
7"""
8import os
10import numpy as np
11from spectroflat.smile import SmileInterpolator
13from .block import Block, BlockRegistry
14from ..processing_data import ProcessingData
15from ....base import Logging, IllegalStateException, Globals
16from ....base.header_keys import *
17from ....io import FitsBatch, Fits, Card
18from ....utils import Collections
19from ....utils.cropping import adapt_shapes
20from ....utils.header_checks import check_same_binning
22log = Logging.get_logger()
25class BlockM(Block):
26 """
27 ## BLOCK M: Morphological Operations
29 This block takes care of morphological calibration steps:
30 - Smile Correction
31 - TODO Support data binned in spatial and/or spectral dimension
32 - TODO Image Registration
33 - TODO Wavelength calibration (using amended offset map)
34 """
36 BLOCK_ID = 'M'
38 @staticmethod
39 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch:
40 """
41 Callback-like entry point for the block
42 """
43 return BlockM(batch, proc_data).run().result
45 def _algorithm(self):
46 if self.proc_data.config.is_cam3():
47 log.debug('CAM3 run: Morphological operations skipped...')
48 self.result = self.batch
49 return
51 self._get_roi()
52 self._adapt_shapes()
53 for entry in self.batch:
54 self.result.batch.append(self._process_entry(entry))
56 def _process_entry(self, fits: Fits) -> dict:
57 return {
58 'file': fits.path,
59 'data': self._modify_data(fits),
60 'header': self._modify_header(fits),
61 }
63 def _modify_data(self, fits: Fits):
64 if self.proc_data.offset_map.map.shape[0] == 1:
65 self.proc_data.offset_map.map = np.repeat(self.proc_data.offset_map.map, Globals.MOD_CYCLE_FRAMES, axis=0)
67 sc = SmileInterpolator(self.proc_data.offset_map, fits.data[0], mod_state=int(fits.header[MOD_STATE])).run()
68 return np.array([sc.result])
70 def _modify_header(self, fits: Fits):
71 Fits.override_header(fits.header, Block.BLOCKS_APPLIED, BlockM.BLOCK_ID, append=True)
72 Fits.override_header(
73 fits.header, OFFSET_MAP, value=os.path.basename(self.proc_data.offset_map.path).split('.')[0]
74 )
75 self._update_roi_header(fits.header)
77 # check if wl-caelibrated offset map, ie. if DISPERSION is present
78 if DISPERSION in self.proc_data.offset_map.header:
79 Fits.override_header(fits.header, WL_CALIBRATED, value='TRUE')
80 fits.header.append(Card(WL_CALIBRATED, value='TRUE').to_card())
81 fits.header.append(Card(DISPERSION, self.proc_data.offset_map.header[DISPERSION]).to_card())
82 fits.header.append(Card(MIN_WL_NM, self.proc_data.offset_map.header[MIN_WL_NM]).to_card())
83 fits.header.append(Card(MIN_WL_PX, self.proc_data.offset_map.header[MIN_WL_PX]).to_card())
84 fits.header.append(Card(MAX_WL_NM, self.proc_data.offset_map.header[MAX_WL_NM]).to_card())
85 fits.header.append(Card(MAX_WL_PX, self.proc_data.offset_map.header[MAX_WL_PX]).to_card())
86 else:
87 Fits.override_header(fits.header, WL_CALIBRATED, value='FALSE')
88 return fits.header
90 def _get_roi(self) -> None:
91 h = self.proc_data.offset_map.header
92 self._target_roi({'y0': int(h[ROI_Y0]), 'y1': int(h[ROI_Y1]), 'x0': int(h[ROI_X0]), 'x1': int(h[ROI_X1])})
94 def _adapt_shapes(self):
95 self.batch = adapt_shapes(self.batch, self.roi)
96 h = self.proc_data.offset_map.header
97 dx = int(h[ROI_X0]) if ROI_X0 in h else 0
98 dy = int(h[ROI_Y0]) if ROI_Y0 in h else 0
99 roi = (
100 slice(None, None),
101 slice(self.roi['y0'] - dy, self.roi['y1'] - dy),
102 slice(self.roi['x0'] - dx, self.roi['x1'] - dx),
103 )
104 self.proc_data.offset_map.map = self.proc_data.offset_map.map[roi]
106 def _apply_binning_factor(self):
107 check_same_binning(self.batch)
108 bins = self.batch[0]['header'][SPATIAL_BIN] if SPATIAL_BIN in self.batch[0]['header'] else None
109 if bins is None:
110 return
111 bins = bins.split(',')
112 # Do not bin mod state dimension
113 bins.insert(0, 1)
114 self.proc_data.offset_map.map = Collections.bin(self.proc_data.offset_map.map, Collections.as_int_array(bins))
116 @staticmethod
117 def prepare(proc_data: ProcessingData) -> None:
118 if not proc_data.config.is_cam3() and proc_data.offset_map is None:
119 raise IllegalStateException('No OFFSET MAP given')
121 @staticmethod
122 def input_needed(cam3: bool = False) -> list:
123 if cam3:
124 return []
125 else:
126 return ['offset_map']
129BlockRegistry().register(BlockM.BLOCK_ID, BlockM)