Coverage for src/susi/raw2fits/read_raw_save_to_fits.py: 78%

82 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1""" 

2Convert raw files to FITS format 

3Extract frame header info from frame row 2049 and convert to FITS header entries 

4 

5Authors: K. Heerlein, F. Iglesias, A. Feller 

6""" 

7 

8import os 

9import sys 

10from datetime import datetime 

11 

12from ..utils import Git 

13 

14from . import fits_handling 

15from ..io import camera_decode_hk 

16from ..base import Logging 

17 

18logger = Logging.get_logger() 

19 

20current_path = os.path.abspath('') 

21parent_path = os.path.dirname(current_path) 

22sys.path.append(parent_path) 

23mainDir = sys.path[0] 

24 

25#: Switch to enable / disable deletion of raw files. 

26#: Default is `True` = delete files 

27deleterawfiles = True 

28firmware_id = 17 

29 

30# build a table mapping all non-printable characters to None 

31NOPRINT_TRANS_TABLE = { 

32 i: None for i in range(0, sys.maxunicode + 1) if not chr(i).isprintable() 

33} 

34 

35 

36def make_printable(s): 

37 """Replace non-printable characters in a string.""" 

38 

39 # the translate method on str removes characters 

40 # that map to None from the string 

41 return s.translate(NOPRINT_TRANS_TABLE) 

42 

43 

44def prepareHDR(hdulist, image_hk, user_hk): 

45 try: 

46 empty = ' ' 

47 # Main HeaderInfo ---------------------------------------------------------------------------------------- 

48 pos = 8 

49 now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") 

50 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', empty)) 

51 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH Processor Name', 'raw2fits')) 

52 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH Processor Version', Git.current_sha())) 

53 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH Processing Time', now)) 

54 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', empty)) 

55 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', 'Internal Image HK')) 

56 for item in image_hk: 

57 pos += 1 

58 if len(image_hk[item].converted) > 0: 

59 valstr = make_printable(image_hk[item].formatstr.format(image_hk[item].converted[0])) 

60 else: 

61 valstr = make_printable('{:}'.format(image_hk[item].val[0])) 

62 unitstr = image_hk[item].unit 

63 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH {:}'.format(item), valstr, unitstr)) 

64 pos += 1 

65 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', empty)) 

66 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('', 'Internal User HK')) 

67 

68 for item in user_hk: 

69 valstr = make_printable('{:}'.format(user_hk[item])) 

70 comment = '' 

71 if item == 'path' and len(valstr) > 41: 

72 valstr = valstr[0:40] # changed to avoid too long paths. franciscoaiglesias@gmail.com 20210623 

73 pos = fits_handling.add_header_entry(hdulist[0].header, pos, ('HIERARCH {:}'.format(item), valstr, comment)) 

74 

75 except Exception as e: 

76 logger.error('preparing Fits Header - %s', e) 

77 return hdulist 

78 

79 

80def save_image_as_fits(np_array, file_name, image_hk, user_hk): 

81 try: 

82 hdulist = fits_handling.image2fits(np_array) 

83 hdulist = prepareHDR(hdulist, image_hk, user_hk) 

84 fits_handling.save_fits(hdulist, file_name, overwrite=True) 

85 logger.info('File saved: %s', os.path.basename(file_name)) 

86 except Exception as e: 

87 logger.error('Saving Fits File: %s', e) 

88 

89 

90def SUSI_readrawsavefitsListofFiles(all_raw_files): 

91 logger.debug('Thread started with list of files') 

92 for i in range(len(all_raw_files)): 

93 try: 

94 raw_name = all_raw_files[i] 

95 if os.path.isfile(raw_name): 

96 image_hk, user_hk, image_array_withhk = camera_decode_hk.full_hk(raw_name) 

97 fits_name = os.path.splitext(raw_name)[0] + '.fits.gz' 

98 save_image_as_fits(image_array_withhk, fits_name, image_hk, user_hk) 

99 if deleterawfiles: 

100 os.remove(raw_name) 

101 logger.debug('deleted file %s', os.path.basename(raw_name)) 

102 else: 

103 logger.debug('raw file not deleted! %s', os.path.basename(raw_name)) 

104 logger.debug('%5d / %5d files done !', i, len(all_raw_files)) 

105 except Exception as e: 

106 logger.error('SUSI_readrawsavefitsListofFiles - %s', e) 

107 logger.debug('Thread done - list of Files') 

108 

109 

110def getListofRawFiles(dir_path): 

111 dirs = [x[0] for x in os.walk(dir_path)] 

112 raw_files = [] 

113 for mypath in dirs: 

114 logger.debug('looking for .raw files in %s', mypath) 

115 all_raw_files = [fileName for fileName in os.listdir(mypath) if fileName.endswith(".raw")] 

116 logger.debug('found %s .raw file(s)', len(all_raw_files)) 

117 for raw_file in all_raw_files: 

118 raw_files.append(os.path.join(mypath, raw_file)) 

119 return raw_files