Coverage for src/susi/reduc/pipeline/blocks/block_x.py: 97%
77 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the block `C` of the susi pipeline.
6@author: hoelken
7"""
8from __future__ import annotations
10from typing import Union
12from ....io import FitsBatch, Fits
13from ....base import Logging
15from ..processing_data import ProcessingData
16from .block import Block, BlockRegistry
18from ....base.header_keys import *
20log = Logging.get_logger()
23class BlockX(Block):
24 """
25 ## BLOCK X: Cropping to science ROI
27 Crop data to defined `config.data_shape`
28 """
30 BLOCK_ID = 'X'
32 @staticmethod
33 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch:
34 """
35 Callback-like entry point for the block
36 """
37 return BlockX(batch, proc_data).run().result
39 def _algorithm(self):
40 for entry in self.batch:
41 self.result.batch.append(self._process_entry(entry))
43 def _process_entry(self, fits: Fits) -> dict:
44 science_roi = self.get_roi(fits)
45 return {
46 'file': fits.path,
47 'data': fits.data[science_roi],
48 'header': self._modify_header(fits, science_roi),
49 }
51 def get_roi(self, fits: Fits) -> tuple:
52 idx = self.proc_data.config.cam.shape_keys.index('x' if self.proc_data.config.cam.name == 'cam3' else 'lambda')
53 ry0, ry1 = self._slice(fits, ROI_Y0, ROI_Y1, self.proc_data.config.cam.shape_keys.index('y'))
54 rx0, rx1 = self._slice(fits, ROI_X0, ROI_X1, idx)
55 if len(fits.data.shape) == 2:
56 return slice(ry0, ry1), slice(rx0, rx1)
57 else:
58 return slice(None, None), slice(ry0, ry1), slice(rx0, rx1)
60 def _slice(self, fits: Fits, start_key: str, stop_key: str, pos: int) -> tuple:
61 start = self._deltav(self._fits_value_of(fits, start_key), self.proc_data.config.cam.data_shape[pos].start)
62 stop = None
63 if start is not None:
64 diff = self.proc_data.config.cam.data_shape[pos].stop - self.proc_data.config.cam.data_shape[pos].start
65 stop = start + diff
66 stop = self._minv(self._fits_value_of(fits, stop_key), stop)
67 return start, stop
69 @staticmethod
70 def _minv(orig: int, new: int) -> Union[int, None]:
71 if orig is None:
72 return new
73 if new is None:
74 return orig
75 return min(orig, new)
77 @staticmethod
78 def _deltav(orig: int, new: int) -> Union[int, None]:
79 if orig is None:
80 return new
81 if new is None:
82 return None
83 if new < orig:
84 return 0
85 return new - orig
87 def _modify_header(self, fits: Fits, science_roi: tuple):
88 Fits.override_header(fits.header, BLOCKS_APPLIED, BlockX.BLOCK_ID, append=True)
89 self._update_rms_snr_mean(fits)
91 if len(science_roi) == 3:
92 science_roi = science_roi[2], science_roi[1]
94 idx = self.proc_data.config.cam.shape_keys.index('x' if self.proc_data.config.cam.name == 'cam3' else 'lambda')
95 idy = self.proc_data.config.cam.shape_keys.index('y')
97 Fits.override_header(
98 fits.header, ROI_X0, value=self._sumifnotnone(self._fits_value_of(fits, ROI_X0), science_roi[1].start)
99 )
100 Fits.override_header(
101 fits.header,
102 ROI_X1,
103 value=self._minv(self._fits_value_of(fits, ROI_X1), self.proc_data.config.cam.data_shape[idx].stop),
104 )
105 Fits.override_header(
106 fits.header, ROI_Y0, value=self._sumifnotnone(self._fits_value_of(fits, ROI_Y0), science_roi[0].start)
107 )
108 Fits.override_header(
109 fits.header,
110 ROI_Y1,
111 value=self._minv(self._fits_value_of(fits, ROI_Y1), self.proc_data.config.cam.data_shape[idy].stop),
112 )
113 return fits.header
115 @staticmethod
116 def _fits_value_of(fits: Fits, key: str):
117 value = fits.value_of(key)
118 if value is None:
119 return None
120 return int(value)
122 @staticmethod
123 def _sumifnotnone(a: int, b: int) -> Union[int, None]:
124 if a is None:
125 return b
126 if b is None:
127 return a
128 return int(a) + int(b)
131BlockRegistry().register(BlockX.BLOCK_ID, BlockX)