Coverage for src/susi/io/fits.py: 85%

167 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4module provides Fits class, a wrapper for astropy.fits 

5 

6@author: hoelken, iglesias 

7""" 

8from __future__ import annotations 

9 

10import os 

11from datetime import datetime 

12 

13import numpy as np 

14from astropy.io import fits 

15import warnings 

16from astropy.utils.exceptions import AstropyWarning 

17 

18from .camera_decode_hk import camera_hk 

19from .fits_header import Card 

20from ..base import Logging 

21from ..base.header_keys import * 

22from ..utils import ExceptionHandler 

23from ..utils.encoding import Unicode 

24 

25logger = Logging.get_logger() 

26 

27HEADER_DATA_TYPES = { 

28 TIMESTAMP_US: int, 

29 TIMESTAMP_MS: int, 

30 ROI_X0: int, 

31 ROI_X1: int, 

32 ROI_Y0: int, 

33 ROI_Y1: int, 

34 MOD_STATE: int, 

35} 

36 

37 

38class Fits: 

39 """ 

40 ## Fits 

41 Reads and Writes the content of a single HDU from/to a given FITS file 

42 

43 Wrapper for `astropy.fits` 

44 """ 

45 

46 @staticmethod 

47 def override_header(header, key: str, value: object, comment: str = '', append: bool = False) -> fits.header: 

48 if key in header: 

49 if append: 

50 header[key] = f"{header[key]}{value}" 

51 else: 

52 header[key] = f"{value}" 

53 else: 

54 header.append(Card(key, value, comment).to_card()) 

55 return header 

56 

57 def __init__(self, path): 

58 """ 

59 ### Params 

60 - path: [String] path of the fits to read 

61 """ 

62 #: path of the fits to read/write 

63 self.path = path 

64 #: the header information obtained from file 

65 self.header = None 

66 #: The data (may be None, if only header data was read) 

67 self.data = None 

68 #: A set of custom header information 

69 self.custom_cards = [] 

70 #: Flag to tell if it has been saved 

71 self.saved = False 

72 

73 def read(self, hdu=0, slices=None, header_only=False, verify_and_fix=True): 

74 """ 

75 Read the contents of the HDU to mem 

76 

77 ### Params 

78 - hdu: Integer, optional 

79 the hdu to read (Default=PrimaryHDU=0) 

80 - slices: Tuple of slices, optional 

81 If defined only the given slice of the HDU data will be loaded. 

82 Number of slices must match array shape. 

83 - header_only: Optional ok_flag to load the header of the given HDU only 

84 - verify_and_fix: Optional flag to run .verify('silentfix') fits method on the read hdul 

85 default=True 

86 

87 ### Returns 

88 `self` 

89 """ 

90 try: 

91 if header_only: 

92 self.header = Fits.get_header(self.path, hdu) 

93 else: 

94 self.header, self.data = Fits.get_data(self.path, hdu, slices, verify_and_fix) 

95 except OSError as error: 

96 logger.error('Fits.read: %s: %s', self.path, error) 

97 self.saved = True 

98 return self 

99 

100 def read_ics(self, config, hdu=0, slices=None): 

101 """ 

102 Read the contents of a fits file written by SUSI ICS to mem. 

103 

104 -Decode camera HK data from row 2049 and append them to self.header 

105 -Crop data to 2048x2048 w/o keyword row 2049 

106 

107 ### Params 

108 - config: Instance of Config class 

109 - hdu: Integer, optional the hdu to read (Default=PrimaryHDU=0) 

110 - slices: Tuple of slices, optional 

111 If defined only the given slice of the HDU data will be loaded. 

112 Number of slices must match array shape. 

113 

114 ### Returns 

115 `self` 

116 """ 

117 self.read(hdu=hdu, verify_and_fix=True, slices=slices) 

118 self.header[NAXIS2] = (config.cam.data_shape[0].stop, '') 

119 self.append_hk_keys() 

120 self.custom_cards = [] 

121 self.saved = False 

122 return self 

123 

124 def append_hk_keys(self): 

125 """Appends image housekeeping data as humanreadable keywords to the end of the ics file header""" 

126 image_hk = camera_hk(np.flip(self.data, axis=0)) 

127 for item in image_hk: 

128 if len(image_hk[item].converted) > 0: 

129 valstr = image_hk[item].formatstr.format(image_hk[item].converted[0]) 

130 else: 

131 valstr = '{:}'.format(image_hk[item].val[0]) 

132 self.header.append(Card(item, value=Unicode.print_save(valstr), comment=image_hk[item].unit).to_card()) 

133 # cut row 0, where the HK data is, because it is no longer needed. 

134 self.data = self.data[1:, :] 

135 return self 

136 

137 def set_data(self, data): 

138 """ 

139 Set new data. 

140 Note: Will override the header as well. (custom header is preserved) 

141 

142 ### Params 

143 - data: the data array to set 

144 """ 

145 hdu = fits.PrimaryHDU(data, self.header) 

146 self.data = hdu.data 

147 self.header = hdu.header 

148 self.saved = False 

149 

150 def value_of(self, key): 

151 """ 

152 Method to look up header values. 

153 Tries custom_cards first and checks header only if not found 

154 

155 ### Params 

156 - key: [String] the name of the card 

157 

158 ### Returns 

159 The value of the card 

160 """ 

161 card = next((card for card in self.custom_cards if card.name == key), None) 

162 if card: 

163 return card.value 

164 return self.header[key] if key in self.header else None 

165 

166 def mod_state(self): 

167 return self.value_of(MOD_STATE) 

168 

169 def observation_time(self): 

170 """Convert the timestamp to a creation `datetime` object""" 

171 return datetime.fromisoformat(self.value_of(DATE_OBS)) 

172 

173 def custom_header(self, card: Card, overwrite_existing=True) -> Fits: 

174 """ 

175 Appends the given card to the current header 

176 ### Params 

177 - card: [Card] the card to append 

178 

179 ### Returns 

180 `self` 

181 """ 

182 if overwrite_existing: 

183 existing = next((c for c in self.custom_cards if c.name == card.name), None) 

184 if existing: 

185 existing.value = card.value 

186 existing.comment = card.comment 

187 return self 

188 

189 self.custom_cards.append(card) 

190 return self 

191 

192 def mean(self, roi: tuple = None) -> float: 

193 """ 

194 Computes the MEAN 

195 """ 

196 if self.data is None: 

197 raise ValueError('Data is not loaded...') 

198 data = self.data if roi is None else self.data[roi] 

199 return data.mean() 

200 

201 def rms(self, roi: tuple = None) -> float: 

202 """ 

203 Computes the RMS (Root Mean Square) of the data 

204 

205 :param roi: An optional region of interest as tuple of slices 

206 """ 

207 if self.data is None: 

208 raise ValueError('Data is not loaded...') 

209 data = self.data if roi is None else self.data[roi] 

210 return data.std() 

211 

212 def contrast(self, roi: tuple = None) -> float: 

213 """ 

214 Computes the Contrast of the image as STD/MEAN 

215 

216 :param roi: An optional region of interest as tuple of slices 

217 """ 

218 return self.rms(roi) / self.mean(roi) 

219 

220 def snr(self, roi: tuple = None) -> float: 

221 """ 

222 Computes the SNR as MEAN/RMS 

223 """ 

224 return self.mean(roi) / self.rms(roi) 

225 

226 def write_to_disk( 

227 self, overwrite=False, astype='float32', minimal_header=None, scale_back=False, silent_warning=True 

228 ): 

229 """ 

230 Writes the data to the configured path 

231 

232 ### Params 

233 - overwrite: bool, optional 

234 If `True`, overwrite the output file if it exists. Raises an 

235 `OSError` if `False` and the output file exists. (Default=`False`) 

236 - astype: string, optional 

237 Define the datatype of the HDU data array. Default: float32 

238 Use 'same' to avoid changing the dtype of the input 

239 - minimal_header: [header] When set, is used as minimal fits header 

240 - scale_back: [Bool] If True, when saving changes to a file that contained scaled image data, 

241 restore the data to the original type and reapply the original BSCALE/BZERO values. 

242 This could lead to loss of accuracy if scaling back to integer values after performing floating point 

243 operations on the data. 

244 Pseudo-unsigned integers are automatically rescaled if set to None. 

245 Default: False 

246 - silent_warning: bool, optional 

247 If `True`, suppress warnings from `astropy.io.fits` (Default=`True`) 

248 ### Returns 

249 The HDUList written to file 

250 """ 

251 if silent_warning: 

252 warnings.filterwarnings('ignore', category=AstropyWarning) 

253 hdul = fits.HDUList(self._gen_hdu(astype, minimal_header=minimal_header, scale_back=scale_back)) 

254 if os.path.dirname(self.path): 

255 os.makedirs(os.path.dirname(self.path), exist_ok=True) 

256 try: 

257 Fits.__write_fits(self.path, hdul, overwrite, 'silentfix') 

258 self.saved = True 

259 except Exception as e: 

260 ExceptionHandler.recover_from_write_error(e, self.path, hdul, Fits.__write_fits, overwrite, 'silentfix') 

261 return hdul 

262 

263 @staticmethod 

264 def __write_fits(file, hdul, overwrite, verify=None): 

265 if verify: 

266 hdul.writeto(file, overwrite=overwrite, output_verify=verify) 

267 else: 

268 hdul.writeto(file, overwrite=overwrite) 

269 

270 def convert_ics(self, config, hdu=0, slices=None, overwrite=False): 

271 """ 

272 Reads and decodes ics fits file using read_ics, then writes it as a Fits compatible file 

273 to self.path, except that Config.collection is replaced by Config.out_path. 

274 The full output dir structure is created. All entries of the original file header are kept. 

275 

276 ### Params 

277 - config: Instance of Config class 

278 - hdu: Integer, optional 

279 the hdu to read (Default=PrimaryHDU=0) 

280 - slices: Tuple of slices, optional 

281 If defined only the given slice of the HDU data will be loaded. 

282 Number of slices must match array shape. 

283 - overwrite: bool, optional 

284 If `True`, overwrite the output file if it exists. Raises an 

285 `OSError` if `False` and the output file exists. (Default=`False`) 

286 

287 ### Returns 

288 `self` 

289 """ 

290 self.read_ics(config, hdu=hdu, slices=slices) 

291 self.path = self.path.replace(config.data.level, config.data.out_path) 

292 self.path = self.path.replace(config.data.ext, config.data.ext_out) 

293 os.makedirs(os.path.dirname(self.path), exist_ok=True) 

294 self.write_to_disk(overwrite=overwrite, astype='same', minimal_header=self.header) 

295 return self 

296 

297 def compare_header(self, ref): 

298 """ 

299 Prints the entries that are different between sel.header and ref.header to 'SUSI' logger at info level 

300 

301 ### Params 

302 - ref: [Fits file class instance] Reference frame to compare headers 

303 

304 ### Returns 

305 If no difference is found `True`, otherwise returns `False`. 

306 """ 

307 same_header = True 

308 for k in self.header: 

309 if k in ref.header and ref.header[k] != self.header[k]: 

310 if k.find('SensorReg') != -1: 

311 logger.info( 

312 '%s ; %s (%s) ; %s (%s)', 

313 k, 

314 self.header[k], 

315 hex(int(self.header[k])), 

316 ref.header[k], 

317 hex(int(ref.header[k])), 

318 ) 

319 else: 

320 logger.info('%s ; %s ; %s', k, self.header[k], ref.header[k]) 

321 same_header = False 

322 

323 if same_header: 

324 logger.info( 

325 'The header of files %s and %s are the same', os.path.basename(self.path), os.path.basename(ref.path) 

326 ) 

327 else: 

328 logger.warning( 

329 'The headers of files %s and %s are different (entry ; this frame ; reference):', 

330 os.path.basename(self.path), 

331 os.path.basename(ref.path), 

332 ) 

333 return same_header 

334 

335 def _gen_hdu(self, astype, minimal_header=None, scale_back=False): 

336 """ 

337 ### Params 

338 - astype: string, optional 

339 Define the datatype of the HDU data array. 

340 (use 'same' to preserve the dtype of the input) 

341 - minimal_header: [header] When set, is used as minimal fits header 

342 """ 

343 data = self.data if astype == 'same' else self.data.astype(astype) 

344 header = self.header if minimal_header is None else minimal_header 

345 hdu = fits.PrimaryHDU(data, header, scale_back=scale_back) 

346 for card in self.custom_cards: 

347 hdu.header.append(card.to_card(), end=True) 

348 return hdu 

349 

350 def __repr__(self): 

351 txt = '{}: {}\n'.format(self.__class__.__name__, self.path) 

352 shape = 'NO DATA' if self.data is None else self.data.shape 

353 txt += '{:<21} = {}\n{:<21} = {}\n'.format('Saved', self.saved, 'Data Shape', shape) 

354 txt += '==== Header ====\n' 

355 txt += '\n'.join( 

356 ['{:<21} = {} | {}'.format(k, self.header[k], self.header.comments[k]) for k in self.header.keys()] 

357 ) 

358 txt += '\n----------' 

359 txt += '\n'.join(['{:<21} = {} | {}'.format(c.name, c.value, c.comment) for c in self.custom_cards]) 

360 return txt 

361 

362 @staticmethod 

363 def get_header(file, hdu: int = 0): 

364 header = fits.getheader(file, hdu) 

365 header = Fits.set_data_type(header) 

366 return header 

367 

368 @staticmethod 

369 def get_data(file, hdu, slices, verify_and_fix): 

370 with fits.open(file) as hdul: 

371 if verify_and_fix: 

372 hdul[hdu].verify('silentfix') 

373 header = hdul[hdu].header 

374 header = Fits.set_data_type(header) 

375 data = hdul[hdu].data 

376 if slices: 

377 data = data[tuple(slices)] 

378 return header, data 

379 

380 @staticmethod 

381 def set_data_type(header): 

382 for key in HEADER_DATA_TYPES.keys(): 

383 if key in header.keys(): 

384 header[key] = HEADER_DATA_TYPES[key](header[key]) 

385 return header