Coverage for src/susi/io/fits.py: 85%
167 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4module provides Fits class, a wrapper for astropy.fits
6@author: hoelken, iglesias
7"""
8from __future__ import annotations
10import os
11from datetime import datetime
13import numpy as np
14from astropy.io import fits
15import warnings
16from astropy.utils.exceptions import AstropyWarning
18from .camera_decode_hk import camera_hk
19from .fits_header import Card
20from ..base import Logging
21from ..base.header_keys import *
22from ..utils import ExceptionHandler
23from ..utils.encoding import Unicode
25logger = Logging.get_logger()
27HEADER_DATA_TYPES = {
28 TIMESTAMP_US: int,
29 TIMESTAMP_MS: int,
30 ROI_X0: int,
31 ROI_X1: int,
32 ROI_Y0: int,
33 ROI_Y1: int,
34 MOD_STATE: int,
35}
38class Fits:
39 """
40 ## Fits
41 Reads and Writes the content of a single HDU from/to a given FITS file
43 Wrapper for `astropy.fits`
44 """
46 @staticmethod
47 def override_header(header, key: str, value: object, comment: str = '', append: bool = False) -> fits.header:
48 if key in header:
49 if append:
50 header[key] = f"{header[key]}{value}"
51 else:
52 header[key] = f"{value}"
53 else:
54 header.append(Card(key, value, comment).to_card())
55 return header
57 def __init__(self, path):
58 """
59 ### Params
60 - path: [String] path of the fits to read
61 """
62 #: path of the fits to read/write
63 self.path = path
64 #: the header information obtained from file
65 self.header = None
66 #: The data (may be None, if only header data was read)
67 self.data = None
68 #: A set of custom header information
69 self.custom_cards = []
70 #: Flag to tell if it has been saved
71 self.saved = False
73 def read(self, hdu=0, slices=None, header_only=False, verify_and_fix=True):
74 """
75 Read the contents of the HDU to mem
77 ### Params
78 - hdu: Integer, optional
79 the hdu to read (Default=PrimaryHDU=0)
80 - slices: Tuple of slices, optional
81 If defined only the given slice of the HDU data will be loaded.
82 Number of slices must match array shape.
83 - header_only: Optional ok_flag to load the header of the given HDU only
84 - verify_and_fix: Optional flag to run .verify('silentfix') fits method on the read hdul
85 default=True
87 ### Returns
88 `self`
89 """
90 try:
91 if header_only:
92 self.header = Fits.get_header(self.path, hdu)
93 else:
94 self.header, self.data = Fits.get_data(self.path, hdu, slices, verify_and_fix)
95 except OSError as error:
96 logger.error('Fits.read: %s: %s', self.path, error)
97 self.saved = True
98 return self
100 def read_ics(self, config, hdu=0, slices=None):
101 """
102 Read the contents of a fits file written by SUSI ICS to mem.
104 -Decode camera HK data from row 2049 and append them to self.header
105 -Crop data to 2048x2048 w/o keyword row 2049
107 ### Params
108 - config: Instance of Config class
109 - hdu: Integer, optional the hdu to read (Default=PrimaryHDU=0)
110 - slices: Tuple of slices, optional
111 If defined only the given slice of the HDU data will be loaded.
112 Number of slices must match array shape.
114 ### Returns
115 `self`
116 """
117 self.read(hdu=hdu, verify_and_fix=True, slices=slices)
118 self.header[NAXIS2] = (config.cam.data_shape[0].stop, '')
119 self.append_hk_keys()
120 self.custom_cards = []
121 self.saved = False
122 return self
124 def append_hk_keys(self):
125 """Appends image housekeeping data as humanreadable keywords to the end of the ics file header"""
126 image_hk = camera_hk(np.flip(self.data, axis=0))
127 for item in image_hk:
128 if len(image_hk[item].converted) > 0:
129 valstr = image_hk[item].formatstr.format(image_hk[item].converted[0])
130 else:
131 valstr = '{:}'.format(image_hk[item].val[0])
132 self.header.append(Card(item, value=Unicode.print_save(valstr), comment=image_hk[item].unit).to_card())
133 # cut row 0, where the HK data is, because it is no longer needed.
134 self.data = self.data[1:, :]
135 return self
137 def set_data(self, data):
138 """
139 Set new data.
140 Note: Will override the header as well. (custom header is preserved)
142 ### Params
143 - data: the data array to set
144 """
145 hdu = fits.PrimaryHDU(data, self.header)
146 self.data = hdu.data
147 self.header = hdu.header
148 self.saved = False
150 def value_of(self, key):
151 """
152 Method to look up header values.
153 Tries custom_cards first and checks header only if not found
155 ### Params
156 - key: [String] the name of the card
158 ### Returns
159 The value of the card
160 """
161 card = next((card for card in self.custom_cards if card.name == key), None)
162 if card:
163 return card.value
164 return self.header[key] if key in self.header else None
166 def mod_state(self):
167 return self.value_of(MOD_STATE)
169 def observation_time(self):
170 """Convert the timestamp to a creation `datetime` object"""
171 return datetime.fromisoformat(self.value_of(DATE_OBS))
173 def custom_header(self, card: Card, overwrite_existing=True) -> Fits:
174 """
175 Appends the given card to the current header
176 ### Params
177 - card: [Card] the card to append
179 ### Returns
180 `self`
181 """
182 if overwrite_existing:
183 existing = next((c for c in self.custom_cards if c.name == card.name), None)
184 if existing:
185 existing.value = card.value
186 existing.comment = card.comment
187 return self
189 self.custom_cards.append(card)
190 return self
192 def mean(self, roi: tuple = None) -> float:
193 """
194 Computes the MEAN
195 """
196 if self.data is None:
197 raise ValueError('Data is not loaded...')
198 data = self.data if roi is None else self.data[roi]
199 return data.mean()
201 def rms(self, roi: tuple = None) -> float:
202 """
203 Computes the RMS (Root Mean Square) of the data
205 :param roi: An optional region of interest as tuple of slices
206 """
207 if self.data is None:
208 raise ValueError('Data is not loaded...')
209 data = self.data if roi is None else self.data[roi]
210 return data.std()
212 def contrast(self, roi: tuple = None) -> float:
213 """
214 Computes the Contrast of the image as STD/MEAN
216 :param roi: An optional region of interest as tuple of slices
217 """
218 return self.rms(roi) / self.mean(roi)
220 def snr(self, roi: tuple = None) -> float:
221 """
222 Computes the SNR as MEAN/RMS
223 """
224 return self.mean(roi) / self.rms(roi)
226 def write_to_disk(
227 self, overwrite=False, astype='float32', minimal_header=None, scale_back=False, silent_warning=True
228 ):
229 """
230 Writes the data to the configured path
232 ### Params
233 - overwrite: bool, optional
234 If `True`, overwrite the output file if it exists. Raises an
235 `OSError` if `False` and the output file exists. (Default=`False`)
236 - astype: string, optional
237 Define the datatype of the HDU data array. Default: float32
238 Use 'same' to avoid changing the dtype of the input
239 - minimal_header: [header] When set, is used as minimal fits header
240 - scale_back: [Bool] If True, when saving changes to a file that contained scaled image data,
241 restore the data to the original type and reapply the original BSCALE/BZERO values.
242 This could lead to loss of accuracy if scaling back to integer values after performing floating point
243 operations on the data.
244 Pseudo-unsigned integers are automatically rescaled if set to None.
245 Default: False
246 - silent_warning: bool, optional
247 If `True`, suppress warnings from `astropy.io.fits` (Default=`True`)
248 ### Returns
249 The HDUList written to file
250 """
251 if silent_warning:
252 warnings.filterwarnings('ignore', category=AstropyWarning)
253 hdul = fits.HDUList(self._gen_hdu(astype, minimal_header=minimal_header, scale_back=scale_back))
254 if os.path.dirname(self.path):
255 os.makedirs(os.path.dirname(self.path), exist_ok=True)
256 try:
257 Fits.__write_fits(self.path, hdul, overwrite, 'silentfix')
258 self.saved = True
259 except Exception as e:
260 ExceptionHandler.recover_from_write_error(e, self.path, hdul, Fits.__write_fits, overwrite, 'silentfix')
261 return hdul
263 @staticmethod
264 def __write_fits(file, hdul, overwrite, verify=None):
265 if verify:
266 hdul.writeto(file, overwrite=overwrite, output_verify=verify)
267 else:
268 hdul.writeto(file, overwrite=overwrite)
270 def convert_ics(self, config, hdu=0, slices=None, overwrite=False):
271 """
272 Reads and decodes ics fits file using read_ics, then writes it as a Fits compatible file
273 to self.path, except that Config.collection is replaced by Config.out_path.
274 The full output dir structure is created. All entries of the original file header are kept.
276 ### Params
277 - config: Instance of Config class
278 - hdu: Integer, optional
279 the hdu to read (Default=PrimaryHDU=0)
280 - slices: Tuple of slices, optional
281 If defined only the given slice of the HDU data will be loaded.
282 Number of slices must match array shape.
283 - overwrite: bool, optional
284 If `True`, overwrite the output file if it exists. Raises an
285 `OSError` if `False` and the output file exists. (Default=`False`)
287 ### Returns
288 `self`
289 """
290 self.read_ics(config, hdu=hdu, slices=slices)
291 self.path = self.path.replace(config.data.level, config.data.out_path)
292 self.path = self.path.replace(config.data.ext, config.data.ext_out)
293 os.makedirs(os.path.dirname(self.path), exist_ok=True)
294 self.write_to_disk(overwrite=overwrite, astype='same', minimal_header=self.header)
295 return self
297 def compare_header(self, ref):
298 """
299 Prints the entries that are different between sel.header and ref.header to 'SUSI' logger at info level
301 ### Params
302 - ref: [Fits file class instance] Reference frame to compare headers
304 ### Returns
305 If no difference is found `True`, otherwise returns `False`.
306 """
307 same_header = True
308 for k in self.header:
309 if k in ref.header and ref.header[k] != self.header[k]:
310 if k.find('SensorReg') != -1:
311 logger.info(
312 '%s ; %s (%s) ; %s (%s)',
313 k,
314 self.header[k],
315 hex(int(self.header[k])),
316 ref.header[k],
317 hex(int(ref.header[k])),
318 )
319 else:
320 logger.info('%s ; %s ; %s', k, self.header[k], ref.header[k])
321 same_header = False
323 if same_header:
324 logger.info(
325 'The header of files %s and %s are the same', os.path.basename(self.path), os.path.basename(ref.path)
326 )
327 else:
328 logger.warning(
329 'The headers of files %s and %s are different (entry ; this frame ; reference):',
330 os.path.basename(self.path),
331 os.path.basename(ref.path),
332 )
333 return same_header
335 def _gen_hdu(self, astype, minimal_header=None, scale_back=False):
336 """
337 ### Params
338 - astype: string, optional
339 Define the datatype of the HDU data array.
340 (use 'same' to preserve the dtype of the input)
341 - minimal_header: [header] When set, is used as minimal fits header
342 """
343 data = self.data if astype == 'same' else self.data.astype(astype)
344 header = self.header if minimal_header is None else minimal_header
345 hdu = fits.PrimaryHDU(data, header, scale_back=scale_back)
346 for card in self.custom_cards:
347 hdu.header.append(card.to_card(), end=True)
348 return hdu
350 def __repr__(self):
351 txt = '{}: {}\n'.format(self.__class__.__name__, self.path)
352 shape = 'NO DATA' if self.data is None else self.data.shape
353 txt += '{:<21} = {}\n{:<21} = {}\n'.format('Saved', self.saved, 'Data Shape', shape)
354 txt += '==== Header ====\n'
355 txt += '\n'.join(
356 ['{:<21} = {} | {}'.format(k, self.header[k], self.header.comments[k]) for k in self.header.keys()]
357 )
358 txt += '\n----------'
359 txt += '\n'.join(['{:<21} = {} | {}'.format(c.name, c.value, c.comment) for c in self.custom_cards])
360 return txt
362 @staticmethod
363 def get_header(file, hdu: int = 0):
364 header = fits.getheader(file, hdu)
365 header = Fits.set_data_type(header)
366 return header
368 @staticmethod
369 def get_data(file, hdu, slices, verify_and_fix):
370 with fits.open(file) as hdul:
371 if verify_and_fix:
372 hdul[hdu].verify('silentfix')
373 header = hdul[hdu].header
374 header = Fits.set_data_type(header)
375 data = hdul[hdu].data
376 if slices:
377 data = data[tuple(slices)]
378 return header, data
380 @staticmethod
381 def set_data_type(header):
382 for key in HEADER_DATA_TYPES.keys():
383 if key in header.keys():
384 header[key] = HEADER_DATA_TYPES[key](header[key])
385 return header