Coverage for src/susi/db/hk_script.py: 10%

510 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-08-11 10:03 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3import time 

4import glob 

5import re 

6import pandas as pd 

7import numpy as np 

8import matplotlib.pyplot as plt 

9from scipy.interpolate import interp1d 

10from matplotlib.backends.backend_pdf import PdfPages 

11import json 

12import os 

13import logging 

14import collections 

15 

16# from susi.base.globals import Globals 

17from ..base import Logging 

18 

19from ..base.globals import Globals 

20 

21ENCODING = 'ISO8859-1' 

22 

23log = Logging.get_logger() 

24 

25 

26class HK: 

27 def __init__(self, key: list, ts=None, base_path='/data/sunrise/2024/SUSI/hk'): 

28 """ 

29 To retrive values use .combine_interpolate(), to directly plot them use generate_plotpdf() 

30 

31 param key: list of keys (type list of strings) 

32 param scomp: name of the subcomponent (type: string) 

33 param ts: list of timestamps or tuple, (start, end, step) 

34 

35 HK object has the following attributes: 

36 - ts: list of timestamps 

37 - key: list of keys 

38 - base_path: path to the hk data 

39 - hk_log: path to the hk log file 

40 - file_list: list of files to load 

41 

42 """ 

43 

44 self.ts = ts 

45 if type(key) is str: 

46 self.key = [key] 

47 else: 

48 self.key = key 

49 self._key_input = key # added because modifying self.key later 

50 

51 self.base_path = base_path 

52 try: 

53 self.hk_log = os.path.join(self.base_path, 'hk_log.json') 

54 self.file_list = list(self.get_files_from_hklog().keys()) 

55 

56 except FileNotFoundError: 

57 log.error( 

58 "file hk_log.json not found. \nCreate a json fie with hk key location using hk_script.create_hklog" 

59 ) 

60 

61 if ts is not None: 

62 

63 if isinstance(ts, (list, np.ndarray)): 

64 try: 

65 self.ts = pd.to_datetime(ts, format="%Y-%m-%d %H:%M:%S.%f") 

66 except ValueError: 

67 self.ts = pd.to_datetime(ts, format="%Y-%m-%dT%H:%M:%S.%f") 

68 

69 elif isinstance(ts, pd.DatetimeIndex): 

70 self.ts = ts 

71 elif isinstance(ts, tuple) and len(ts) == 3: 

72 start, end, step = ts 

73 

74 if isinstance(step, (int, float)): 

75 self.ts = HK.gen_dt(start, end, step) 

76 else: 

77 raise log.error("The step must be a numeric value representing seconds.") 

78 else: 

79 raise log.error( 

80 "ts must be either a list of datetime objects/strings or " 

81 "a tuple : (start_datetime, end_datetime, step)." 

82 ) 

83 elif ts is None: 

84 if type(key) is str: 

85 self.key_metadata, self.key_data = self.load_hk(self.file_list[0]) 

86 

87 elif type(key) is list: 

88 log.warning("A list of keys given, returning data for the first key") 

89 

90 self.key_metadata, self.key_data = self.load_hk(self.file_list[0]) 

91 

92 def get_files_from_hklog(self): 

93 with open(os.path.join(self.base_path, self.hk_log), 'r') as f: 

94 data_key = json.load(f) 

95 files_to_load = {} 

96 for col in self._key_input: 

97 for file, columns in data_key.items(): 

98 if col in columns: 

99 files_to_load[os.path.basename(file)] = list(filter(lambda x: x in self._key_input, columns)) 

100 return files_to_load 

101 

102 @staticmethod 

103 def gen_dt(start, end, step=1, unit='s'): 

104 """ 

105 function creates a datetime array with params: start,end,step 

106 start: starting datetime. String in format 'yyyy-mm-dd HH:mm:ss:ffff' or 

107 tuple/data/sunrise/2024/SUSI/hk/susi_scu_reduced.csv, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff) 

108 end: ending datetime String in format 'yyyy-mm-dd HH:mm:ss:ffff' or 

109 tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff) 

110 step: step size in seconds datatype: int or float 

111 """ 

112 

113 if type(start) is tuple: 

114 start = HK.tuple_to_dt_str(start) 

115 end = HK.tuple_to_dt_str(end) 

116 start = pd.to_datetime(start) 

117 end = pd.to_datetime(end) 

118 datetime_arr = pd.date_range(start=start, end=end, freq=pd.to_timedelta(step, unit=unit)) 

119 elif type(start) is str: 

120 

121 datetime_arr = pd.date_range(start=start, end=end, freq=pd.to_timedelta(step, unit=unit)) 

122 

123 else: 

124 raise log.error( 

125 "String in format 'yyyy-mm-dd HH:mm:ss:ffff' or " "tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)" 

126 ) 

127 

128 return datetime_arr 

129 

130 @staticmethod 

131 def tuple_to_dt_str(tupl): 

132 x = '-'.join(str(x) for x in tupl[:3]) 

133 y = ':'.join(str(y) for y in tupl[3:6]) 

134 if len(tupl) > 6: 

135 return x + " " + y + f'.{tupl[6]}' 

136 else: 

137 return x + " " + y 

138 

139 @staticmethod 

140 def F2_translator(data): 

141 return Globals.F2_MECH_POSITION_MAP.get(data, 'unknown') 

142 

143 @staticmethod 

144 def FwPos_translator(data): 

145 data = int(data / (10**6)) 

146 

147 result = min(Globals.F2_MECH_POSITION_MAP.keys(), key=lambda x: abs(x - data)) 

148 return Globals.F2_MECH_POSITION_MAP.get(result, 'unknown') 

149 

150 @staticmethod 

151 def AO_runmodenr_translator(data): 

152 result = Globals.A0_RUNMODENR_MAPPING.get(data, str(data)) 

153 return result 

154 

155 @staticmethod 

156 def pointingstate_translator(state): 

157 state = int(state) 

158 binary_representation = format(state, '08b') 

159 

160 # Create a dictionary to store each state bit value 

161 state_bits = {} 

162 

163 # Loop through each bit in the binary representation and map to the state name 

164 for i in range(8): 

165 state_bits[Globals.POINTING_STATE_MAPPING[i]] = int(binary_representation[7 - i]) 

166 

167 return state_bits 

168 

169 @staticmethod 

170 def translate_multiple_columns(metadata, data): 

171 translation_functions = { 

172 'F2_Mech_Position': HK.F2_translator, 

173 'AO_runmodenr': HK.AO_runmodenr_translator, 

174 'FwPos': HK.FwPos_translator, 

175 } 

176 

177 for column in data.columns: 

178 if column in translation_functions: 

179 data[column] = data[column].apply(translation_functions[column]) 

180 if column in Globals.UNITMAP: 

181 metadata.loc['unit', column] = Globals.UNITMAP[column] 

182 metadata.loc['datatype', column] = Globals.Data_Type_Map[column] 

183 if column == 'PointingState': 

184 state_columns = data['PointingState'].apply(HK.pointingstate_translator).apply(pd.Series) 

185 data = pd.concat([data, state_columns], axis=1) 

186 data = data.drop(columns=['PointingState']) 

187 

188 return metadata, data 

189 

190 def get_back(self): 

191 

192 # metadata_comb = pd.DataFrame() 

193 # interpolated_comb = pd.DataFrame() 

194 file_list = self.file_list 

195 hk_log_dict = self.get_files_from_hklog() 

196 ######################################################## 

197 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated) 

198 keys_list_hk_log = hk_log_dict.values() 

199 keys_list_hk_log = [item for sublist in keys_list_hk_log for item in sublist] 

200 list_dup_keys = [ 

201 # list of duplicate keys 

202 item 

203 for item, count in collections.Counter(keys_list_hk_log).items() 

204 if count > 1 

205 ] 

206 

207 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values() 

208 globals_dup_key_list = [item for sublist in globals_dup_key_list for item in sublist] 

209 if any(item not in globals_dup_key_list for item in list_dup_keys): 

210 log.warn( 

211 'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary' 

212 'and are thus being removed.\n Add them to globals dict to analyse them:' 

213 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}' 

214 ) 

215 dup_list = list(filter(lambda item: item in globals_dup_key_list, list_dup_keys)) 

216 

217 if len(list_dup_keys) > 0: 

218 

219 # remove duplicated keys from self.key 

220 self.key = list(filter(lambda x: x not in list_dup_keys, self.key)) 

221 dupli_check = True 

222 

223 ################################################## 

224 gondola_path_check = False 

225 if any('gondola' or 'SR3' in file1 for file1 in file_list): 

226 gondola_path_check = True 

227 print('gondola keys present') 

228 file_list = [ 

229 file1 

230 for file1 in file_list 

231 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv') 

232 ] 

233 current_duplicated = [] 

234 # concat=[] 

235 all_dataframes = [] 

236 concat_metadata = [] 

237 for file in file_list: 

238 

239 if len(list_dup_keys) > 0: 

240 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys(): 

241 keys_in_trusted = list(Globals.TRUSTED_FILE_DUPLI_KEYS[file]) 

242 

243 # returns duplicated keys from input that are present in this dict 

244 current_duplicated = list(filter(lambda x: x in keys_in_trusted, list_dup_keys)) 

245 self.key = self.key + current_duplicated 

246 

247 list_dup_keys = list(filter(lambda item: item not in current_duplicated, list_dup_keys)) 

248 

249 metadata, df = self.load_hk(os.path.join(self.base_path, file)) 

250 metadata, df = HK.translate_multiple_columns(metadata, df) 

251 all_dataframes.append(df) 

252 concat_metadata.append(metadata) 

253 if df is None: 

254 

255 print('None found') 

256 continue 

257 

258 self.key = list(filter(lambda item: item not in current_duplicated, self.key)) 

259 

260 if gondola_path_check: 

261 metadata_g, df_g = self.load_gondola_hk() 

262 

263 concat_metadata.append(metadata_g) 

264 all_dataframes.append((df_g)) 

265 all_dataframes = pd.concat(all_dataframes) 

266 concat_metadata = pd.concat(concat_metadata) 

267 return concat_metadata, all_dataframes 

268 

269 def combine_interpolate(self, method='linear', order=1, specific=None, translate_map=True): 

270 

271 metadata_comb = pd.DataFrame() 

272 interpolated_comb = pd.DataFrame() 

273 file_list = self.file_list 

274 hk_log_dict = self.get_files_from_hklog() 

275 ######################################################## 

276 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated) 

277 keys_list_hk_log = hk_log_dict.values() 

278 

279 keys_list_hk_log = [item for sublist in keys_list_hk_log for item in sublist] 

280 [ 

281 log.warn(f'key: {key} not found. Processing for remaining keys') 

282 for key in self._key_input 

283 if key not in set(keys_list_hk_log) 

284 ] 

285 

286 list_dup_keys = [ 

287 # list of duplicate keys 

288 item 

289 for item, count in collections.Counter(keys_list_hk_log).items() 

290 if count > 1 

291 ] 

292 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values() 

293 globals_dup_key_list = [item for sublist in globals_dup_key_list for item in sublist] 

294 # checks if input key is duplicated and whether registered in trusted_tupli_dict 

295 if any(item not in globals_dup_key_list for item in list_dup_keys): 

296 files_dup_key = [file for file in file_list if any(item in hk_log_dict[file] for item in list_dup_keys)] 

297 log.warn( 

298 f'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary' 

299 'and are thus being removed.\n Add them to globals dict to analyse them:' 

300 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}' 

301 f'\n These keys are present in the following files: {files_dup_key}' 

302 ) 

303 dup_list = list(filter(lambda item: item in globals_dup_key_list, list_dup_keys)) 

304 

305 if len(list_dup_keys) > 0: 

306 

307 # remove duplicated keys from self.key 

308 self.key = list(filter(lambda x: x not in list_dup_keys, self.key)) 

309 dupli_check = True 

310 

311 ################################################## 

312 gondola_path_check = False 

313 if any('SR3' in file1 for file1 in file_list): 

314 

315 gondola_path_check = True 

316 print('gondola keys present') 

317 

318 file_list = [ 

319 file1 

320 for file1 in file_list 

321 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv') 

322 ] 

323 current_duplicated = [] 

324 for file in file_list: 

325 if len(list_dup_keys) > 0: 

326 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys(): 

327 keys_in_trusted = list(Globals.TRUSTED_FILE_DUPLI_KEYS[file]) 

328 

329 # returns duplicated keys from input that are present in this dict 

330 current_duplicated = list(filter(lambda x: x in keys_in_trusted, list_dup_keys)) 

331 self.key = self.key + current_duplicated 

332 

333 list_dup_keys = list(filter(lambda item: item not in current_duplicated, list_dup_keys)) 

334 

335 metadata, df = self.load_hk(os.path.join(self.base_path, file)) 

336 if df is None: 

337 

338 print('None found') 

339 continue 

340 interpolated_data = self._interpolate(df=df, method=method, order=order, specific=specific) 

341 if interpolated_comb.empty: 

342 metadata_comb = metadata 

343 interpolated_comb = interpolated_data 

344 

345 else: 

346 metadata_comb = pd.concat([metadata_comb, metadata], axis=1) 

347 

348 interpolated_comb = pd.concat([interpolated_comb, interpolated_data], axis=1) 

349 self.key = list(filter(lambda item: item not in current_duplicated, self.key)) 

350 

351 if gondola_path_check: 

352 print('in gondola file check') 

353 metadata_g, df_g = self.load_gondola_hk() 

354 

355 interpolated_data_g = self._interpolate(df=df_g, method=method, order=order, specific=specific) 

356 if interpolated_comb.empty: 

357 metadata_comb = metadata_g 

358 interpolated_comb = interpolated_data_g 

359 

360 else: 

361 metadata_comb = pd.concat([metadata_comb, metadata_g], axis=1) 

362 # concat other with gondola hk data 

363 interpolated_comb = pd.concat([interpolated_comb, interpolated_data_g], axis=1) 

364 interpolated_comb = interpolated_comb.loc[:, ~interpolated_comb.columns.duplicated()] 

365 metadata_comb = metadata_comb.loc[:, ~metadata_comb.columns.duplicated()] 

366 if len(interpolated_comb) == 0: 

367 log.error("Input Keys don't match. Please check the input keys again") 

368 if translate_map is True: 

369 

370 metadata_comb, interpolated_comb = HK.translate_multiple_columns(metadata_comb, interpolated_comb) 

371 [ 

372 log.warn( 

373 f'key: {key} was given as an input but does not exist in the output data. ' 

374 'Reasons can be:\n' 

375 '1) Key not found. \n' 

376 '2) Key removed during processing.\n' 

377 '3) wrong file was input in the TRUSTED_FILE_DUPLI_KEYS dict in globals' 

378 ) 

379 for key in self._key_input 

380 if key not in set(metadata_comb.columns) 

381 ] 

382 return metadata_comb, interpolated_comb 

383 

384 def load_gondola_hk(self): 

385 key = self.key 

386 ts = self.ts 

387 start_time = ts.min() 

388 end_time = ts.max() 

389 

390 file_pattern = 'SR3_Flight_2024_07_{}.txt' 

391 df_l = [] 

392 # for loop to iterate files day wise 

393 print('cleaning gondola files') 

394 

395 for date in pd.date_range(start_time, end_time): 

396 

397 file_name = file_pattern.format(date.day) 

398 file_path = os.path.join(self.base_path, 'gondola', file_name) 

399 

400 data = pd.read_csv(file_path, index_col=False, low_memory=False) 

401 data.columns = [c.strip() for c in data.columns] 

402 

403 data['datetime'] = pd.to_datetime(data['Date'] + ' ' + data['hh:mm:ss'], errors='coerce') 

404 data = data.applymap(lambda x: np.nan if isinstance(x, str) and x.strip() == '' else x) 

405 # data = data[data['datetime'].dt.date == date.date()] 

406 # print(data['datetime'].dt.date,date.date()) 

407 data['datetime'] = data['datetime'].interpolate() 

408 data = data.sort_values('datetime') 

409 data.set_index(data['datetime'], inplace=True) 

410 data = data[~data.index.duplicated(keep='first')].copy() 

411 # if start_time in data.index: 

412 # print('printing start index:' +str(start_index)) 

413 # start_index = data.index.get_loc(start_time) 

414 # else: 

415 # start_index = data.index.get_indexer( 

416 # [start_time], method='pad')[0] 

417 

418 # if end_time in data.index: 

419 # end_index = data.index.get_loc(end_time) 

420 # else: 

421 # end_index = data.index.get_indexer( 

422 # [end_time], method='backfill')[0] 

423 

424 # subset_indices = data.index[start_index:end_index] 

425 # data = data.loc[subset_indices] 

426 

427 df_l.append(data) 

428 

429 df_1 = pd.concat(df_l, axis=0) 

430 df_1.sort_index(inplace=True) # Ensure chronological order after concatenation 

431 df_1 = df_1[~df_1.index.duplicated(keep='first')].copy() 

432 if start_time in df_1.index: 

433 start_index = df_1.index.get_loc(start_time) 

434 else: 

435 start_index = df_1.index.get_indexer([start_time], method='pad')[0] 

436 

437 if end_time in df_1.index: 

438 end_index = df_1.index.get_loc(end_time) 

439 else: 

440 end_index = df_1.index.get_indexer([end_time], method='backfill')[0] 

441 subset_indices_c = df_1.index[start_index : end_index + 1] 

442 df_1 = df_1.loc[subset_indices_c] 

443 columns_to_keep, metadata = self._key_matcher_gondola(df_1, key) 

444 

445 if columns_to_keep is None: 

446 return None, None 

447 elif columns_to_keep is not None: 

448 

449 df_filtered = df_1[list(columns_to_keep.keys())] 

450 

451 df_filtered = df_filtered.rename(columns=columns_to_keep) 

452 df_filtered.index = pd.to_datetime(df_filtered.index, format='mixed') 

453 df_filtered.index = df_filtered.index.rename('Time') 

454 df_filtered = df_filtered[~df_filtered.index.duplicated(keep='first')].copy() 

455 df_filtered = df_filtered.apply(self.convert_numeric) 

456 

457 metadata = pd.DataFrame(metadata) 

458 

459 return metadata, df_filtered 

460 

461 def convert_numeric(self, col): 

462 # Convert only convertible entries to float, non-convertible ones remain as is 

463 return pd.to_numeric(col, errors='coerce') 

464 

465 def load_hk(self, file_path: str): 

466 key = self.key 

467 ts = self.ts 

468 

469 data = pd.read_csv(file_path, encoding=ENCODING, index_col=False, low_memory=False) 

470 

471 data.set_index(data.columns[0], inplace=True) 

472 

473 columns_to_keep, metadata = self._key_matcher(data, key) 

474 

475 if columns_to_keep is None: 

476 return None, None 

477 elif columns_to_keep is not None: 

478 

479 df = data[list(columns_to_keep.keys())] 

480 

481 df = df.rename(columns=columns_to_keep) 

482 

483 df.index = pd.to_datetime(df.index, format='mixed') 

484 df.index = df.index.rename('Time') 

485 df = df[~df.index.duplicated(keep='first')].copy() 

486 

487 if ts is not None: 

488 ts = pd.to_datetime(ts) 

489 start_time = ts.min() 

490 end_time = ts.max() 

491 

492 if start_time in df.index: 

493 start_index = df.index.get_loc(start_time) 

494 else: 

495 start_index = df.index.get_indexer([start_time], method='pad')[0] 

496 

497 if end_time in df.index: 

498 end_index = df.index.get_loc(end_time) 

499 else: 

500 end_index = df.index.get_indexer([end_time], method='backfill')[0] 

501 

502 subset_indices = df.index[start_index:end_index] 

503 

504 # 4. Subset the DataFrame 

505 df = df.loc[subset_indices] 

506 metadata = pd.DataFrame(metadata) 

507 return metadata, df 

508 

509 def _key_matcher(self, data, key): 

510 data = data 

511 key = key 

512 header = data.columns 

513 columns_to_keep = {} 

514 metadata = {} 

515 

516 # Regex patterns for extracting units and data types 

517 unit_pattern = re.compile(r'\[(.*?)\]') 

518 datatype_pattern = re.compile(r'\{(.*?)\}') 

519 for col in header: 

520 base_name_match = re.match(r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col) 

521 

522 # Extracting the base name with an optional decimal part 

523 if base_name_match: 

524 base_name = base_name_match.group(1).strip() # Capture base name 

525 # Check for the optional decimal part 

526 if base_name_match.group(2): 

527 base_name += f".{base_name_match.group(2)}" 

528 else: 

529 base_name = col.strip() 

530 # Extract the unit and datatype if they exist 

531 unit_match = unit_pattern.search(col) 

532 datatype_match = datatype_pattern.search(col) 

533 

534 unit = unit_match.group(1) if unit_match else '' 

535 datatype = datatype_match.group(1) if datatype_match else '' 

536 if base_name in ['Longitude', 'Latitude']: 

537 print(f'found key {base_name}') 

538 base_name = str(base_name) + '_' + str(unit) 

539 print(f'key renamed to {base_name}') 

540 

541 if base_name in key: 

542 columns_to_keep[col] = base_name 

543 metadata[base_name] = {'unit': unit, 'datatype': datatype} 

544 

545 return columns_to_keep, metadata 

546 

547 def _key_matcher_gondola(self, data, key): 

548 data = data 

549 key = key 

550 header = data.columns 

551 columns_to_keep = {} 

552 metadata = {} 

553 

554 for col in header: 

555 # Use regex to find the unit within brackets 

556 match = re.search(r'\((.*?)\)', col) 

557 if match: 

558 # Append the found unit to the list 

559 unit_name = match.group(1).strip() 

560 # Remove brackets and trim whitespace 

561 base_name = re.sub(r'\s*\(.*?\)', '', col).strip() 

562 

563 else: 

564 # If no brackets, append None or any placeholder 

565 base_name = col.strip() 

566 unit_name = '' 

567 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']: 

568 print(f'found {base_name} as key ') 

569 base_name = str(base_name) + '_gondola_' + str(unit_name) 

570 print(f'key renamed to {base_name}') 

571 if base_name in key: 

572 metadata[base_name] = {'unit': unit_name, 'datatype': ''} 

573 columns_to_keep[col] = base_name 

574 return columns_to_keep, metadata 

575 

576 def _interpolate(self, df, method, order, specific=None): 

577 """Specific methods to apply to certain keys. 

578 

579 The `specific` parameter is a dictionary that maps specific keys to methods. 

580 For example, `{'spline': ['key1'], 'last': ['key3']}` would apply the 

581 `spline` method to `key1` and the `last` method to `key3`. 

582 

583 Args: 

584 specific (dict): Dictionary of specific methods to apply. 

585 """ 

586 

587 df = df 

588 method = method 

589 order = order 

590 specific = specific 

591 

592 union_index = df.index.union(self.ts) 

593 df_reindexed = df.reindex(union_index) 

594 df_reindexed = pd.DataFrame(df_reindexed) 

595 methods = {key: method for key in self.key} 

596 if specific: 

597 for spec_method, keys in specific.items(): 

598 for k in keys: 

599 if k in methods: 

600 methods[k] = spec_method 

601 self.methods = methods 

602 interpolated_data = pd.DataFrame(index=self.ts) 

603 custom_default_methods = Globals.CUSTOM_DEFAULT_MODES 

604 for col in df.columns: 

605 col_method = methods.get(col, 'linear') 

606 if col_method == 'default': 

607 if col in custom_default_methods['linear']: 

608 col_method = 'linear' 

609 elif col in custom_default_methods['nn']: 

610 col_method = 'nn' 

611 elif col in custom_default_methods['last']: 

612 col_method = 'last' 

613 else: 

614 col_method = 'last' # Default fallback 

615 

616 if col_method in ['linear', 'spline', 'polynomial']: 

617 

618 try: 

619 

620 df_reindexed[col] = df_reindexed[col].apply(self.convert_numeric) 

621 interpolated_data[col] = df_reindexed[col].interpolate(method=col_method, order=order) 

622 except Exception: 

623 log.error(f"column method: {col_method} unsuccesful for col: {col}") 

624 

625 elif col_method == 'nn': 

626 try: 

627 df_reindexed[col] = df_reindexed[col].apply(self.convert_numeric) 

628 interpolated_data[col] = df[col].reindex(union_index, method='nearest') 

629 except Exception: 

630 log.error(f"column method: {col_method} unsuccesful for col: {col}") 

631 

632 elif col_method == 'last': 

633 try: 

634 df_reindexed[col] = df_reindexed[col].apply(self.convert_numeric) 

635 interpolated_data[col] = df_reindexed[col].ffill() 

636 except Exception: 

637 log.error(f"column method: {col_method} unsuccesful for col: {col}") 

638 

639 else: 

640 raise log.error(f"Unsupported interpolation method: {col_method}") 

641 

642 interpolated_data = interpolated_data.reindex(self.ts) 

643 interpolated_data = interpolated_data.ffill().bfill() 

644 

645 return interpolated_data 

646 # return df_reindexed 

647 

648 @staticmethod 

649 def plot_genpdf(metadata, df1, time_marker=None, name='hk_plot.pdf'): 

650 with PdfPages(name) as pdf: 

651 

652 for col in df1.columns: 

653 plt.figure(figsize=(12, 6)) 

654 plt.plot(df1.index, df1[col], '--x', label=col) 

655 unit = metadata.get(col, {}).get('unit', '') 

656 ylabel = f"{col} [{unit}]" if unit else col 

657 plt.ylabel(ylabel) 

658 if time_marker is not None: 

659 for tm in time_marker: 

660 plt.axvline(x=tm, color='r', linestyle='--') 

661 plt.xlabel('Time') 

662 plt.title('Plot of Interpolated Data') 

663 plt.legend() 

664 plt.grid(True) 

665 pdf.savefig() 

666 plt.close() 

667 

668 @staticmethod 

669 def get_keys(inp, path='/data/sunrise/2024/SUSI/hk'): 

670 if 'gondola' in inp: 

671 inp = os.path.join(path, 'gondola/SR3_Flight_2024_07_13.txt') 

672 metadata = {} 

673 

674 data = pd.read_csv(inp, index_col=False, low_memory=False, nrows=0) 

675 data.columns = [c.strip() for c in data.columns] 

676 header = data.columns 

677 for col in header: 

678 # Use regex to find the unit within brackets 

679 match = re.search(r'\((.*?)\)', col) 

680 if match: 

681 # Append the found unit to the list 

682 unit_name = match.group(1).strip() 

683 base_name = re.sub(r'\s*\(.*?\)', '', col).strip() 

684 

685 else: 

686 # If no brackets, append None or any placeholder 

687 base_name = col.strip() 

688 unit_name = '' 

689 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']: 

690 base_name = str(base_name) + '_gondola_' + str(unit_name) 

691 metadata[base_name] = {'unit': unit_name, 'datatype': ''} 

692 metadata = pd.DataFrame(metadata) 

693 

694 return metadata 

695 elif 'gondola' not in inp: 

696 if not os.path.isabs(inp): # Check if inp is not an absolute path 

697 # inp = path+inp+'.csv' 

698 inp = os.path.join(path, inp + '.csv') 

699 metadata = {} 

700 header = pd.read_csv(inp, encoding=ENCODING, nrows=0).columns 

701 

702 # Regex patterns for extracting units and data types 

703 unit_pattern = re.compile(r'\[(.*?)\]') 

704 datatype_pattern = re.compile(r'\{(.*?)\}') 

705 

706 for col in header: 

707 

708 # Extract everything before the first '[' 

709 base_name_match = re.match( 

710 r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col 

711 ) 

712 

713 # Extracting the base name with an optional decimal part 

714 if base_name_match: 

715 base_name = base_name_match.group(1).strip() # Capture base name 

716 # Check for the optional decimal part 

717 if base_name_match.group(2): 

718 base_name += f".{base_name_match.group(2)}" 

719 else: 

720 base_name = col.strip() 

721 # Extract the unit and datatype if they exist 

722 unit_match = unit_pattern.search(col) 

723 datatype_match = datatype_pattern.search(col) 

724 

725 unit = unit_match.group(1) if unit_match else '' 

726 datatype = datatype_match.group(1) if datatype_match else '' 

727 if base_name in ['Longitude', 'Latitude']: 

728 base_name = str(base_name) + '_' + str(unit) 

729 metadata[base_name] = {'unit': unit, 'datatype': datatype} 

730 metadata = pd.DataFrame(metadata) 

731 return metadata 

732 

733 @staticmethod 

734 def plot_gen(metadata, df1): 

735 figures = [] 

736 axes = [] 

737 for col in df1.columns: 

738 fig, ax = plt.subplots(figsize=(12, 6)) 

739 ax.plot(df1.index, df1[col], label=col) 

740 unit = metadata.get(col, {}).get('unit', '') 

741 ylabel = f"{col} [{unit}]" if unit else col 

742 ax.set_ylabel(ylabel) 

743 ax.set_xlabel('Time') 

744 ax.set_title('Plot of Interpolated Data') 

745 ax.legend() 

746 ax.grid(True) 

747 figures.append(fig) 

748 axes.append(ax) 

749 return figures, axes 

750 

751 @staticmethod 

752 def plot_gen_get_back(metadata, df1): 

753 figures = [] 

754 axes = [] 

755 for col in df1.columns: 

756 fig, ax = plt.subplots(figsize=(12, 6)) 

757 ax.plot(df1[col].dropna(), label=col) 

758 unit = metadata[col].dropna().get('unit', '') 

759 ylabel = f"{col} [{unit}]" if unit else col 

760 ax.set_ylabel(ylabel) 

761 ax.set_xlabel('Time') 

762 ax.set_title('Plot of Interpolated Data') 

763 ax.legend() 

764 ax.grid(True) 

765 figures.append(fig) 

766 axes.append(ax) 

767 return figures, axes 

768 

769 

770def create_hklog(directory_path, output_metadata_file='hk_log.json', encoding='ISO8859-1'): 

771 import json 

772 

773 metadata = {} 

774 files = glob.glob(os.path.join(directory_path, '**/*'), recursive=True) 

775 combined_files = [] 

776 dump = [] # Initialize a list to store the results 

777 one_time_check = 0 

778 for file in files: 

779 if not file.endswith('/') and 'gondola' in file and file.endswith('.txt'): 

780 # print('making gondola') 

781 one_time_check += 1 

782 if one_time_check == 1: 

783 combined_files.append(file) 

784 # print(file) 

785 else: 

786 dump.append(file) # Add to combined list 

787 elif not file.endswith('/') and file.endswith('.csv') and 'gondola' not in file: 

788 

789 combined_files.append(file) 

790 # print(combined_files) 

791 for filename in combined_files: 

792 try: 

793 columns = HK.get_keys(filename, path=directory_path).columns.tolist() 

794 metadata[filename] = columns 

795 # print(filename) 

796 except PermissionError: 

797 log.error(f"Permission denied while accesing: {filename}") 

798 continue 

799 

800 # Save metadata to a JSON file for future use 

801 with open(os.path.join(directory_path, output_metadata_file), 'w') as f: 

802 json.dump(metadata, f) 

803 

804 

805def generate_plotpdf( 

806 key, 

807 ts, 

808 base_path='/data/sunrise/2024/SUSI/hk', 

809 method='linear', 

810 order=1, 

811 specific=None, 

812 time_marker=None, 

813 opath=None, 

814 translate_map=True, 

815): 

816 

817 metadata, df1 = HK(key, ts, base_path).combine_interpolate( 

818 method=method, order=order, specific=specific, translate_map=translate_map 

819 ) 

820 if opath is not None: 

821 os.makedirs(opath, exist_ok=True) 

822 HK.plot_genpdf(metadata, df1, time_marker=time_marker, name=os.path.join(opath, 'hk_plot.pdf')) 

823 metadata.to_csv(os.path.join(opath, 'hk_metadata.csv')) 

824 df1.to_csv(os.path.join(opath, 'hk_data.csv'))