Coverage for src/susi/reduc/pipeline/blocks/block_x.py: 97%

77 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the block `C` of the susi pipeline. 

5 

6@author: hoelken 

7""" 

8from __future__ import annotations 

9 

10from typing import Union 

11 

12from ....io import FitsBatch, Fits 

13from ....base import Logging 

14 

15from ..processing_data import ProcessingData 

16from .block import Block, BlockRegistry 

17 

18from ....base.header_keys import * 

19 

20log = Logging.get_logger() 

21 

22 

23class BlockX(Block): 

24 """ 

25 ## BLOCK X: Cropping to science ROI 

26 

27 Crop data to defined `config.data_shape` 

28 """ 

29 

30 BLOCK_ID = 'X' 

31 

32 @staticmethod 

33 def start(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch: 

34 """ 

35 Callback-like entry point for the block 

36 """ 

37 return BlockX(batch, proc_data).run().result 

38 

39 def _algorithm(self): 

40 for entry in self.batch: 

41 self.result.batch.append(self._process_entry(entry)) 

42 

43 def _process_entry(self, fits: Fits) -> dict: 

44 science_roi = self.get_roi(fits) 

45 return { 

46 'file': fits.path, 

47 'data': fits.data[science_roi], 

48 'header': self._modify_header(fits, science_roi), 

49 } 

50 

51 def get_roi(self, fits: Fits) -> tuple: 

52 idx = self.proc_data.config.cam.shape_keys.index('x' if self.proc_data.config.cam.name == 'cam3' else 'lambda') 

53 ry0, ry1 = self._slice(fits, ROI_Y0, ROI_Y1, self.proc_data.config.cam.shape_keys.index('y')) 

54 rx0, rx1 = self._slice(fits, ROI_X0, ROI_X1, idx) 

55 if len(fits.data.shape) == 2: 

56 return slice(ry0, ry1), slice(rx0, rx1) 

57 else: 

58 return slice(None, None), slice(ry0, ry1), slice(rx0, rx1) 

59 

60 def _slice(self, fits: Fits, start_key: str, stop_key: str, pos: int) -> tuple: 

61 start = self._deltav(self._fits_value_of(fits, start_key), self.proc_data.config.cam.data_shape[pos].start) 

62 stop = None 

63 if start is not None: 

64 diff = self.proc_data.config.cam.data_shape[pos].stop - self.proc_data.config.cam.data_shape[pos].start 

65 stop = start + diff 

66 stop = self._minv(self._fits_value_of(fits, stop_key), stop) 

67 return start, stop 

68 

69 @staticmethod 

70 def _minv(orig: int, new: int) -> Union[int, None]: 

71 if orig is None: 

72 return new 

73 if new is None: 

74 return orig 

75 return min(orig, new) 

76 

77 @staticmethod 

78 def _deltav(orig: int, new: int) -> Union[int, None]: 

79 if orig is None: 

80 return new 

81 if new is None: 

82 return None 

83 if new < orig: 

84 return 0 

85 return new - orig 

86 

87 def _modify_header(self, fits: Fits, science_roi: tuple): 

88 Fits.override_header(fits.header, BLOCKS_APPLIED, BlockX.BLOCK_ID, append=True) 

89 self._update_rms_snr_mean(fits) 

90 

91 if len(science_roi) == 3: 

92 science_roi = science_roi[2], science_roi[1] 

93 

94 idx = self.proc_data.config.cam.shape_keys.index('x' if self.proc_data.config.cam.name == 'cam3' else 'lambda') 

95 idy = self.proc_data.config.cam.shape_keys.index('y') 

96 

97 Fits.override_header( 

98 fits.header, ROI_X0, value=self._sumifnotnone(self._fits_value_of(fits, ROI_X0), science_roi[1].start) 

99 ) 

100 Fits.override_header( 

101 fits.header, 

102 ROI_X1, 

103 value=self._minv(self._fits_value_of(fits, ROI_X1), self.proc_data.config.cam.data_shape[idx].stop), 

104 ) 

105 Fits.override_header( 

106 fits.header, ROI_Y0, value=self._sumifnotnone(self._fits_value_of(fits, ROI_Y0), science_roi[0].start) 

107 ) 

108 Fits.override_header( 

109 fits.header, 

110 ROI_Y1, 

111 value=self._minv(self._fits_value_of(fits, ROI_Y1), self.proc_data.config.cam.data_shape[idy].stop), 

112 ) 

113 return fits.header 

114 

115 @staticmethod 

116 def _fits_value_of(fits: Fits, key: str): 

117 value = fits.value_of(key) 

118 if value is None: 

119 return None 

120 return int(value) 

121 

122 @staticmethod 

123 def _sumifnotnone(a: int, b: int) -> Union[int, None]: 

124 if a is None: 

125 return b 

126 if b is None: 

127 return a 

128 return int(a) + int(b) 

129 

130 

131BlockRegistry().register(BlockX.BLOCK_ID, BlockX)