Coverage for src/susi/raw2fits/read_raw_save_to_fits.py: 78%
82 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1"""
2Convert raw files to FITS format
3Extract frame header info from frame row 2049 and convert to FITS header entries
5Authors: K. Heerlein, F. Iglesias, A. Feller
6"""
8import os
9import sys
10from datetime import datetime
12from ..utils import Git
14from . import fits_handling
15from ..io import camera_decode_hk
16from ..base import Logging
18logger = Logging.get_logger()
20current_path = os.path.abspath('')
21parent_path = os.path.dirname(current_path)
22sys.path.append(parent_path)
23mainDir = sys.path[0]
25#: Switch to enable / disable deletion of raw files.
26#: Default is `True` = delete files
27deleterawfiles = True
28firmware_id = 17
30# build a table mapping all non-printable characters to None
31NOPRINT_TRANS_TABLE = {
32 i: None for i in range(0, sys.maxunicode + 1) if not chr(i).isprintable()
33}
36def make_printable(s):
37 """Replace non-printable characters in a string."""
39 # the translate method on str removes characters
40 # that map to None from the string
41 return s.translate(NOPRINT_TRANS_TABLE)
44def prepareHDR(hdulist, image_hk, user_hk):
45 try:
46 empty = ' '
47 # Main HeaderInfo ----------------------------------------------------------------------------------------
48 pos = 8
49 now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
50 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', empty))
51 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH Processor Name', 'raw2fits'))
52 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH Processor Version', Git.current_sha()))
53 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH Processing Time', now))
54 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', empty))
55 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', 'Internal Image HK'))
56 for item in image_hk:
57 pos += 1
58 if len(image_hk[item].converted) > 0:
59 valstr = make_printable(image_hk[item].formatstr.format(image_hk[item].converted[0]))
60 else:
61 valstr = make_printable('{:}'.format(image_hk[item].val[0]))
62 unitstr = image_hk[item].unit
63 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH {:}'.format(item), valstr, unitstr))
64 pos += 1
65 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', empty))
66 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', 'Internal User HK'))
68 for item in user_hk:
69 valstr = make_printable('{:}'.format(user_hk[item]))
70 comment = ''
71 if item == 'path' and len(valstr) > 41:
72 valstr = valstr[0:40] # changed to avoid too long paths. franciscoaiglesias@gmail.com 20210623
73 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH {:}'.format(item), valstr, comment))
75 except Exception as e:
76 logger.error('preparing Fits Header - %s', e)
77 return hdulist
80def save_image_as_fits(np_array, file_name, image_hk, user_hk):
81 try:
82 hdulist = fits_handling.image2fits(np_array)
83 hdulist = prepareHDR(hdulist, image_hk, user_hk)
84 fits_handling.save_fits(hdulist, file_name, overwrite=True)
85 logger.info('File saved: %s', os.path.basename(file_name))
86 except Exception as e:
87 logger.error('Saving Fits File: %s', e)
90def SUSI_readrawsavefitsListofFiles(all_raw_files):
91 logger.debug('Thread started with list of files')
92 for i in range(len(all_raw_files)):
93 try:
94 raw_name = all_raw_files[i]
95 if os.path.isfile(raw_name):
96 image_hk, user_hk, image_array_withhk = camera_decode_hk.full_hk(raw_name)
97 fits_name = os.path.splitext(raw_name)[0] + '.fits.gz'
98 save_image_as_fits(image_array_withhk, fits_name, image_hk, user_hk)
99 if deleterawfiles:
100 os.remove(raw_name)
101 logger.debug('deleted file %s', os.path.basename(raw_name))
102 else:
103 logger.debug('raw file not deleted! %s', os.path.basename(raw_name))
104 logger.debug('%5d / %5d files done !', i, len(all_raw_files))
105 except Exception as e:
106 logger.error('SUSI_readrawsavefitsListofFiles - %s', e)
107 logger.debug('Thread done - list of Files')
110def getListofRawFiles(dir_path):
111 dirs = [x[0] for x in os.walk(dir_path)]
112 raw_files = []
113 for mypath in dirs:
114 logger.debug('looking for .raw files in %s', mypath)
115 all_raw_files = [fileName for fileName in os.listdir(mypath) if fileName.endswith(".raw")]
116 logger.debug('found %s .raw file(s)', len(all_raw_files))
117 for raw_file in all_raw_files:
118 raw_files.append(os.path.join(mypath, raw_file))
119 return raw_files