Coverage for src/susi/db/hk_script.py: 10%

507 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3import time 

4import glob 

5import re 

6import pandas as pd 

7import numpy as np 

8import matplotlib.pyplot as plt 

9from scipy.interpolate import interp1d 

10from matplotlib.backends.backend_pdf import PdfPages 

11import json 

12import os 

13import logging 

14import collections 

15 

16# from susi.base.globals import Globals 

17from ..base import Logging 

18 

19from ..base.globals import Globals 

20 

21ENCODING = 'ISO8859-1' 

22 

23log = Logging.get_logger() 

24 

25 

26class HK: 

27 def __init__(self, key: list, ts=None, base_path='/data/sunrise/2024/SUSI/hk'): 

28 """param key: list of keys (type list of strings) 

29 param scomp: name of the subcomponent (type: string) 

30 param ts: list of timestamps or tuple, (start, end, step) 

31 """ 

32 

33 self.ts = ts 

34 if type(key) is str: 

35 self.key = [key] 

36 else: 

37 self.key = key 

38 self._key_input = key # added because modifying self.key later 

39 

40 self.base_path = base_path 

41 try: 

42 self.hk_log = os.path.join(self.base_path, 'hk_log.json') 

43 self.file_list = list(self.get_files_from_hklog().keys()) 

44 

45 except FileNotFoundError: 

46 log.error( 

47 "file hk_log.json not found. \nCreate a json fie with hk key location using hk_script.create_hklog" 

48 ) 

49 

50 if ts is not None: 

51 

52 if isinstance(ts, (list, np.ndarray)): 

53 self.ts = pd.to_datetime(ts, format="%Y-%m-%d %H:%M:%S.%f") 

54 elif isinstance(ts, pd.DatetimeIndex): 

55 self.ts = ts 

56 elif isinstance(ts, tuple) and len(ts) == 3: 

57 start, end, step = ts 

58 

59 if isinstance(step, (int, float)): 

60 self.ts = HK.gen_dt(start, end, step) 

61 else: 

62 raise log.error( 

63 "The step must be a numeric value representing seconds.") 

64 else: 

65 raise log.error( 

66 "ts must be either a list of datetime objects/strings or " 

67 "a tuple : (start_datetime, end_datetime, step)." 

68 ) 

69 elif ts is None: 

70 if type(key) is str: 

71 self.key_metadata, self.key_data = self.load_hk( 

72 self.file_list[0]) 

73 

74 elif type(key) is list: 

75 log.warning( 

76 "A list of keys given, returning data for the first key") 

77 

78 self.key_metadata, self.key_data = self.load_hk( 

79 self.file_list[0]) 

80 

81 def get_files_from_hklog(self): 

82 with open(os.path.join(self.base_path, self.hk_log), 'r') as f: 

83 data_key = json.load(f) 

84 files_to_load = {} 

85 for col in self._key_input: 

86 for file, columns in data_key.items(): 

87 if col in columns: 

88 files_to_load[os.path.basename(file)] = list( 

89 filter(lambda x: x in self._key_input, columns)) 

90 return files_to_load 

91 

92 @staticmethod 

93 def gen_dt(start, end, step=1, unit='s'): 

94 """ 

95 function creates a datetime array with params: start,end,step 

96 start: starting datetime. String in format 'yyyy-mm-dd HH:mm:ss:ffff' or 

97 tuple/data/sunrise/2024/SUSI/hk/susi_scu_reduced.csv, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff) 

98 end: ending datetime String in format 'yyyy-mm-dd HH:mm:ss:ffff' or 

99 tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff) 

100 step: step size in seconds datatype: int or float 

101 """ 

102 

103 if type(start) is tuple: 

104 start = HK.tuple_to_dt_str(start) 

105 end = HK.tuple_to_dt_str(end) 

106 start = pd.to_datetime(start) 

107 end = pd.to_datetime(end) 

108 datetime_arr = pd.date_range( 

109 start=start, end=end, freq=pd.to_timedelta(step, unit=unit)) 

110 elif type(start) is str: 

111 

112 datetime_arr = pd.date_range( 

113 start=start, end=end, freq=pd.to_timedelta(step, unit=unit)) 

114 

115 else: 

116 raise log.error( 

117 "String in format 'yyyy-mm-dd HH:mm:ss:ffff' or " "tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)" 

118 ) 

119 

120 return datetime_arr 

121 

122 @staticmethod 

123 def tuple_to_dt_str(tupl): 

124 x = '-'.join(str(x) for x in tupl[:3]) 

125 y = ':'.join(str(y) for y in tupl[3:6]) 

126 if len(tupl) > 6: 

127 return x + " " + y + f'.{tupl[6]}' 

128 else: 

129 return x + " " + y 

130 

131 @staticmethod 

132 def F2_translator(data): 

133 return Globals.F2_MECH_POSITION_MAP.get(data, 'unknown') 

134 

135 @staticmethod 

136 def FwPos_translator(data): 

137 data = int(data / (10**6)) 

138 

139 result = min(Globals.F2_MECH_POSITION_MAP.keys(), 

140 key=lambda x: abs(x - data)) 

141 return Globals.F2_MECH_POSITION_MAP.get(result, 'unknown') 

142 

143 @staticmethod 

144 def AO_runmodenr_translator(data): 

145 result = Globals.A0_RUNMODENR_MAPPING.get(data, str(data)) 

146 return result 

147 

148 @staticmethod 

149 def pointingstate_translator(state): 

150 state = int(state) 

151 binary_representation = format(state, '08b') 

152 

153 # Create a dictionary to store each state bit value 

154 state_bits = {} 

155 

156 # Loop through each bit in the binary representation and map to the state name 

157 for i in range(8): 

158 state_bits[Globals.POINTING_STATE_MAPPING[i]] = int( 

159 binary_representation[7 - i]) 

160 

161 return state_bits 

162 

163 @staticmethod 

164 def translate_multiple_columns(metadata, data): 

165 translation_functions = { 

166 'F2_Mech_Position': HK.F2_translator, 

167 'AO_runmodenr': HK.AO_runmodenr_translator, 

168 'FwPos': HK.FwPos_translator, 

169 } 

170 

171 for column in data.columns: 

172 if column in translation_functions: 

173 data[column] = data[column].apply( 

174 translation_functions[column]) 

175 if column in Globals.UNITMAP: 

176 metadata.loc['unit', column] = Globals.UNITMAP[column] 

177 metadata.loc['datatype', 

178 column] = Globals.Data_Type_Map[column] 

179 if column == 'PointingState': 

180 state_columns = data['PointingState'].apply( 

181 HK.pointingstate_translator).apply(pd.Series) 

182 data = pd.concat([data, state_columns], axis=1) 

183 data = data.drop(columns=['PointingState']) 

184 

185 return metadata, data 

186 

187 def get_back(self): 

188 

189 # metadata_comb = pd.DataFrame() 

190 # interpolated_comb = pd.DataFrame() 

191 file_list = self.file_list 

192 hk_log_dict = self.get_files_from_hklog() 

193 ######################################################## 

194 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated) 

195 keys_list_hk_log = hk_log_dict.values() 

196 keys_list_hk_log = [ 

197 item for sublist in keys_list_hk_log for item in sublist] 

198 list_dup_keys = [ 

199 # list of duplicate keys 

200 item for item, count in collections.Counter(keys_list_hk_log).items() if count > 1 

201 ] 

202 

203 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values() 

204 globals_dup_key_list = [ 

205 item for sublist in globals_dup_key_list for item in sublist] 

206 if any(item not in globals_dup_key_list for item in list_dup_keys): 

207 log.warn( 

208 'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary' 

209 'and are thus being removed.\n Add them to globals dict to analyse them:' 

210 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}' 

211 ) 

212 dup_list = list( 

213 filter(lambda item: item in globals_dup_key_list, list_dup_keys)) 

214 

215 if len(list_dup_keys) > 0: 

216 

217 # remove duplicated keys from self.key 

218 self.key = list(filter(lambda x: x not in list_dup_keys, self.key)) 

219 dupli_check = True 

220 

221 ################################################## 

222 gondola_path_check = False 

223 if any('gondola' or 'SR3' in file1 for file1 in file_list): 

224 gondola_path_check = True 

225 print('gondola keys present') 

226 file_list = [ 

227 file1 

228 for file1 in file_list 

229 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv') 

230 ] 

231 current_duplicated = [] 

232 # concat=[] 

233 all_dataframes = [] 

234 concat_metadata = [] 

235 for file in file_list: 

236 

237 if len(list_dup_keys) > 0: 

238 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys(): 

239 keys_in_trusted = list( 

240 Globals.TRUSTED_FILE_DUPLI_KEYS[file]) 

241 

242 # returns duplicated keys from input that are present in this dict 

243 current_duplicated = list( 

244 filter(lambda x: x in keys_in_trusted, list_dup_keys)) 

245 self.key = self.key + current_duplicated 

246 

247 list_dup_keys = list( 

248 filter(lambda item: item not in current_duplicated, list_dup_keys)) 

249 

250 metadata, df = self.load_hk(os.path.join(self.base_path, file)) 

251 metadata, df = HK.translate_multiple_columns(metadata, df) 

252 all_dataframes.append(df) 

253 concat_metadata.append(metadata) 

254 if df is None: 

255 

256 print('None found') 

257 continue 

258 

259 self.key = list( 

260 filter(lambda item: item not in current_duplicated, self.key)) 

261 

262 if gondola_path_check: 

263 metadata_g, df_g = self.load_gondola_hk() 

264 

265 concat_metadata.append(metadata_g) 

266 all_dataframes.append((df_g)) 

267 all_dataframes = pd.concat(all_dataframes) 

268 concat_metadata = pd.concat(concat_metadata) 

269 return concat_metadata, all_dataframes 

270 

271 def combine_interpolate(self, method='linear', order=1, specific=None, translate_map=True): 

272 

273 metadata_comb = pd.DataFrame() 

274 interpolated_comb = pd.DataFrame() 

275 file_list = self.file_list 

276 hk_log_dict = self.get_files_from_hklog() 

277 ######################################################## 

278 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated) 

279 keys_list_hk_log = hk_log_dict.values() 

280 

281 keys_list_hk_log = [ 

282 item for sublist in keys_list_hk_log for item in sublist] 

283 [ 

284 log.warn(f'key: {key} not found. Processing for remaining keys') 

285 for key in self._key_input 

286 if key not in set(keys_list_hk_log) 

287 ] 

288 

289 list_dup_keys = [ 

290 # list of duplicate keys 

291 item for item, count in collections.Counter(keys_list_hk_log).items() if count > 1 

292 ] 

293 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values() 

294 globals_dup_key_list = [ 

295 item for sublist in globals_dup_key_list for item in sublist] 

296 # checks if input key is duplicated and whether registered in trusted_tupli_dict 

297 if any(item not in globals_dup_key_list for item in list_dup_keys): 

298 files_dup_key = [file for file in file_list if any( 

299 item in hk_log_dict[file] for item in list_dup_keys)] 

300 log.warn( 

301 f'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary' 

302 'and are thus being removed.\n Add them to globals dict to analyse them:' 

303 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}' 

304 f'\n These keys are present in the following files: {files_dup_key}' 

305 ) 

306 dup_list = list( 

307 filter(lambda item: item in globals_dup_key_list, list_dup_keys)) 

308 

309 if len(list_dup_keys) > 0: 

310 

311 # remove duplicated keys from self.key 

312 self.key = list(filter(lambda x: x not in list_dup_keys, self.key)) 

313 dupli_check = True 

314 

315 ################################################## 

316 gondola_path_check = False 

317 if any('SR3' in file1 for file1 in file_list): 

318 

319 gondola_path_check = True 

320 print('gondola keys present') 

321 

322 file_list = [ 

323 file1 

324 for file1 in file_list 

325 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv') 

326 ] 

327 current_duplicated = [] 

328 for file in file_list: 

329 if len(list_dup_keys) > 0: 

330 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys(): 

331 keys_in_trusted = list( 

332 Globals.TRUSTED_FILE_DUPLI_KEYS[file]) 

333 

334 # returns duplicated keys from input that are present in this dict 

335 current_duplicated = list( 

336 filter(lambda x: x in keys_in_trusted, list_dup_keys)) 

337 self.key = self.key + current_duplicated 

338 

339 list_dup_keys = list( 

340 filter(lambda item: item not in current_duplicated, list_dup_keys)) 

341 

342 metadata, df = self.load_hk(os.path.join(self.base_path, file)) 

343 if df is None: 

344 

345 print('None found') 

346 continue 

347 interpolated_data = self._interpolate( 

348 df=df, method=method, order=order, specific=specific) 

349 if interpolated_comb.empty: 

350 metadata_comb = metadata 

351 interpolated_comb = interpolated_data 

352 

353 else: 

354 metadata_comb = pd.concat([metadata_comb, metadata], axis=1) 

355 

356 interpolated_comb = pd.concat( 

357 [interpolated_comb, interpolated_data], axis=1) 

358 self.key = list( 

359 filter(lambda item: item not in current_duplicated, self.key)) 

360 

361 if gondola_path_check: 

362 print('in gondola file check') 

363 metadata_g, df_g = self.load_gondola_hk() 

364 

365 interpolated_data_g = self._interpolate( 

366 df=df_g, method=method, order=order, specific=specific) 

367 if interpolated_comb.empty: 

368 metadata_comb = metadata_g 

369 interpolated_comb = interpolated_data_g 

370 

371 else: 

372 metadata_comb = pd.concat([metadata_comb, metadata_g], axis=1) 

373 # concat other with gondola hk data 

374 interpolated_comb = pd.concat( 

375 [interpolated_comb, interpolated_data_g], axis=1) 

376 interpolated_comb = interpolated_comb.loc[:, 

377 ~interpolated_comb.columns.duplicated()] 

378 metadata_comb = metadata_comb.loc[:, 

379 ~metadata_comb.columns.duplicated()] 

380 if len(interpolated_comb) == 0: 

381 log.error("Input Keys don't match. Please check the input keys again") 

382 if translate_map is True: 

383 

384 metadata_comb, interpolated_comb = HK.translate_multiple_columns( 

385 metadata_comb, interpolated_comb) 

386 [ 

387 log.warn( 

388 f'key: {key} was given as an input but does not exist in the output data. ' 

389 'Reasons can be:\n' 

390 '1) Key not found. \n' 

391 '2) Key removed during processing.\n' 

392 '3) wrong file was input in the TRUSTED_FILE_DUPLI_KEYS dict in globals' 

393 ) 

394 for key in self._key_input 

395 if key not in set(metadata_comb.columns) 

396 ] 

397 return metadata_comb, interpolated_comb 

398 

399 def load_gondola_hk(self): 

400 key = self.key 

401 ts = self.ts 

402 start_time = ts.min() 

403 end_time = ts.max() 

404 

405 file_pattern = 'SR3_Flight_2024_07_{}.txt' 

406 df_l = [] 

407 # for loop to iterate files day wise 

408 print('cleaning gondola files') 

409 

410 for date in pd.date_range(start_time, end_time): 

411 

412 file_name = file_pattern.format(date.day) 

413 file_path = os.path.join(self.base_path, 'gondola', file_name) 

414 

415 data = pd.read_csv(file_path, index_col=False, low_memory=False) 

416 data.columns = [c.strip() for c in data.columns] 

417 

418 data['datetime'] = pd.to_datetime( 

419 data['Date'] + ' ' + data['hh:mm:ss'], errors='coerce') 

420 data = data.applymap(lambda x: np.nan if isinstance( 

421 x, str) and x.strip() == '' else x) 

422 # data = data[data['datetime'].dt.date == date.date()] 

423 # print(data['datetime'].dt.date,date.date()) 

424 data['datetime'] = data['datetime'].interpolate() 

425 data = data.sort_values('datetime') 

426 data.set_index(data['datetime'], inplace=True) 

427 data = data[~data.index.duplicated(keep='first')].copy() 

428 # if start_time in data.index: 

429 # print('printing start index:' +str(start_index)) 

430 # start_index = data.index.get_loc(start_time) 

431 # else: 

432 # start_index = data.index.get_indexer( 

433 # [start_time], method='pad')[0] 

434 

435 # if end_time in data.index: 

436 # end_index = data.index.get_loc(end_time) 

437 # else: 

438 # end_index = data.index.get_indexer( 

439 # [end_time], method='backfill')[0] 

440 

441 # subset_indices = data.index[start_index:end_index] 

442 # data = data.loc[subset_indices] 

443 

444 df_l.append(data) 

445 

446 df_1 = pd.concat(df_l, axis=0) 

447 df_1.sort_index(inplace=True) # Ensure chronological order after concatenation 

448 df_1 = df_1[~df_1.index.duplicated(keep='first')].copy() 

449 if start_time in df_1.index: 

450 start_index = df_1.index.get_loc(start_time) 

451 else: 

452 start_index = df_1.index.get_indexer( 

453 [start_time], method='pad')[0] 

454 

455 if end_time in df_1.index: 

456 end_index = df_1.index.get_loc(end_time) 

457 else: 

458 end_index = df_1.index.get_indexer( 

459 [end_time], method='backfill')[0] 

460 subset_indices_c = df_1.index[start_index:end_index+1] 

461 df_1 = df_1.loc[subset_indices_c] 

462 columns_to_keep, metadata = self._key_matcher_gondola(df_1, key) 

463 

464 if columns_to_keep is None: 

465 return None, None 

466 elif columns_to_keep is not None: 

467 

468 df_filtered = df_1[list(columns_to_keep.keys())] 

469 

470 df_filtered = df_filtered.rename(columns=columns_to_keep) 

471 df_filtered.index = pd.to_datetime( 

472 df_filtered.index, format='mixed') 

473 df_filtered.index = df_filtered.index.rename('Time') 

474 df_filtered = df_filtered[~df_filtered.index.duplicated( 

475 keep='first')].copy() 

476 df_filtered = df_filtered.apply(self.convert_numeric) 

477 

478 metadata = pd.DataFrame(metadata) 

479 

480 return metadata, df_filtered 

481 

482 def convert_numeric(self, col): 

483 # Convert only convertible entries to float, non-convertible ones remain as is 

484 return pd.to_numeric(col, errors='coerce') 

485 

486 def load_hk(self, file_path: str): 

487 key = self.key 

488 ts = self.ts 

489 

490 data = pd.read_csv(file_path, encoding=ENCODING, 

491 index_col=False, low_memory=False) 

492 

493 data.set_index(data.columns[0], inplace=True) 

494 

495 columns_to_keep, metadata = self._key_matcher(data, key) 

496 

497 if columns_to_keep is None: 

498 return None, None 

499 elif columns_to_keep is not None: 

500 

501 df = data[list(columns_to_keep.keys())] 

502 

503 df = df.rename(columns=columns_to_keep) 

504 

505 df.index = pd.to_datetime(df.index, format='mixed') 

506 df.index = df.index.rename('Time') 

507 df = df[~df.index.duplicated(keep='first')].copy() 

508 

509 if ts is not None: 

510 ts = pd.to_datetime(ts) 

511 start_time = ts.min() 

512 end_time = ts.max() 

513 

514 if start_time in df.index: 

515 start_index = df.index.get_loc(start_time) 

516 else: 

517 start_index = df.index.get_indexer( 

518 [start_time], method='pad')[0] 

519 

520 if end_time in df.index: 

521 end_index = df.index.get_loc(end_time) 

522 else: 

523 end_index = df.index.get_indexer( 

524 [end_time], method='backfill')[0] 

525 

526 subset_indices = df.index[start_index:end_index] 

527 

528 # 4. Subset the DataFrame 

529 df = df.loc[subset_indices] 

530 metadata = pd.DataFrame(metadata) 

531 return metadata, df 

532 

533 def _key_matcher(self, data, key): 

534 data = data 

535 key = key 

536 header = data.columns 

537 columns_to_keep = {} 

538 metadata = {} 

539 

540 # Regex patterns for extracting units and data types 

541 unit_pattern = re.compile(r'\[(.*?)\]') 

542 datatype_pattern = re.compile(r'\{(.*?)\}') 

543 for col in header: 

544 base_name_match = re.match( 

545 r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col) 

546 

547 # Extracting the base name with an optional decimal part 

548 if base_name_match: 

549 base_name = base_name_match.group( 

550 1).strip() # Capture base name 

551 # Check for the optional decimal part 

552 if base_name_match.group(2): 

553 base_name += f".{base_name_match.group(2)}" 

554 else: 

555 base_name = col.strip() 

556 # Extract the unit and datatype if they exist 

557 unit_match = unit_pattern.search(col) 

558 datatype_match = datatype_pattern.search(col) 

559 

560 unit = unit_match.group(1) if unit_match else '' 

561 datatype = datatype_match.group(1) if datatype_match else '' 

562 if base_name in ['Longitude', 'Latitude']: 

563 print(f'found key {base_name}') 

564 base_name = str(base_name) + '_' + str(unit) 

565 print(f'key renamed to {base_name}') 

566 

567 if base_name in key: 

568 columns_to_keep[col] = base_name 

569 metadata[base_name] = {'unit': unit, 'datatype': datatype} 

570 

571 return columns_to_keep, metadata 

572 

573 def _key_matcher_gondola(self, data, key): 

574 data = data 

575 key = key 

576 header = data.columns 

577 columns_to_keep = {} 

578 metadata = {} 

579 

580 for col in header: 

581 # Use regex to find the unit within brackets 

582 match = re.search(r'\((.*?)\)', col) 

583 if match: 

584 # Append the found unit to the list 

585 unit_name = match.group(1).strip() 

586 # Remove brackets and trim whitespace 

587 base_name = re.sub(r'\s*\(.*?\)', '', col).strip() 

588 

589 else: 

590 # If no brackets, append None or any placeholder 

591 base_name = col.strip() 

592 unit_name = '' 

593 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']: 

594 print(f'found {base_name} as key ') 

595 base_name = str(base_name) + '_gondola_' + str(unit_name) 

596 print(f'key renamed to {base_name}') 

597 if base_name in key: 

598 metadata[base_name] = {'unit': unit_name, 'datatype': ''} 

599 columns_to_keep[col] = base_name 

600 return columns_to_keep, metadata 

601 

602 def _interpolate(self, df, method, order, specific=None): 

603 """Specific methods to apply to certain keys. 

604 

605 The `specific` parameter is a dictionary that maps specific keys to methods. 

606 For example, `{'spline': ['key1'], 'last': ['key3']}` would apply the 

607 `spline` method to `key1` and the `last` method to `key3`. 

608 

609 Args: 

610 specific (dict): Dictionary of specific methods to apply. 

611 """ 

612 

613 df = df 

614 method = method 

615 order = order 

616 specific = specific 

617 

618 union_index = df.index.union(self.ts) 

619 df_reindexed = df.reindex(union_index) 

620 df_reindexed = pd.DataFrame(df_reindexed) 

621 methods = {key: method for key in self.key} 

622 if specific: 

623 for spec_method, keys in specific.items(): 

624 for k in keys: 

625 if k in methods: 

626 methods[k] = spec_method 

627 self.methods = methods 

628 interpolated_data = pd.DataFrame(index=self.ts) 

629 custom_default_methods = Globals.CUSTOM_DEFAULT_MODES 

630 for col in df.columns: 

631 col_method = methods.get(col, 'linear') 

632 if col_method == 'default': 

633 if col in custom_default_methods['linear']: 

634 col_method = 'linear' 

635 elif col in custom_default_methods['nn']: 

636 col_method = 'nn' 

637 elif col in custom_default_methods['last']: 

638 col_method = 'last' 

639 else: 

640 col_method = 'last' # Default fallback 

641 

642 if col_method in ['linear', 'spline', 'polynomial']: 

643 

644 try: 

645 

646 df_reindexed[col] = df_reindexed[col].apply( 

647 self.convert_numeric) 

648 interpolated_data[col] = df_reindexed[col].interpolate( 

649 method=col_method, order=order) 

650 except Exception: 

651 log.error( 

652 f"column method: {col_method} unsuccesful for col: {col}") 

653 

654 elif col_method == 'nn': 

655 try: 

656 df_reindexed[col] = df_reindexed[col].apply( 

657 self.convert_numeric) 

658 interpolated_data[col] = df[col].reindex( 

659 union_index, method='nearest') 

660 except Exception: 

661 log.error( 

662 f"column method: {col_method} unsuccesful for col: {col}") 

663 

664 elif col_method == 'last': 

665 try: 

666 df_reindexed[col] = df_reindexed[col].apply( 

667 self.convert_numeric) 

668 interpolated_data[col] = df_reindexed[col].ffill() 

669 except Exception: 

670 log.error( 

671 f"column method: {col_method} unsuccesful for col: {col}") 

672 

673 else: 

674 raise log.error( 

675 f"Unsupported interpolation method: {col_method}") 

676 

677 interpolated_data = interpolated_data.reindex(self.ts) 

678 interpolated_data = interpolated_data.ffill().bfill() 

679 

680 return interpolated_data 

681 # return df_reindexed 

682 

683 @staticmethod 

684 def plot_genpdf(metadata, df1, time_marker=None, name='hk_plot.pdf'): 

685 with PdfPages(name) as pdf: 

686 

687 for col in df1.columns: 

688 plt.figure(figsize=(12, 6)) 

689 plt.plot(df1.index, df1[col], '--x', label=col) 

690 unit = metadata.get(col, {}).get('unit', '') 

691 ylabel = f"{col} [{unit}]" if unit else col 

692 plt.ylabel(ylabel) 

693 if time_marker is not None: 

694 for tm in time_marker: 

695 plt.axvline(x=tm, color='r', linestyle='--') 

696 plt.xlabel('Time') 

697 plt.title('Plot of Interpolated Data') 

698 plt.legend() 

699 plt.grid(True) 

700 pdf.savefig() 

701 plt.close() 

702 

703 @staticmethod 

704 def get_keys(inp, path='/data/sunrise/2024/SUSI/hk'): 

705 if 'gondola' in inp: 

706 inp = os.path.join(path, 'gondola/SR3_Flight_2024_07_13.txt') 

707 metadata = {} 

708 

709 data = pd.read_csv(inp, index_col=False, low_memory=False, nrows=0) 

710 data.columns = [c.strip() for c in data.columns] 

711 header = data.columns 

712 for col in header: 

713 # Use regex to find the unit within brackets 

714 match = re.search(r'\((.*?)\)', col) 

715 if match: 

716 # Append the found unit to the list 

717 unit_name = match.group(1).strip() 

718 base_name = re.sub(r'\s*\(.*?\)', '', col).strip() 

719 

720 else: 

721 # If no brackets, append None or any placeholder 

722 base_name = col.strip() 

723 unit_name = '' 

724 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']: 

725 base_name = str(base_name) + '_gondola_' + str(unit_name) 

726 metadata[base_name] = {'unit': unit_name, 'datatype': ''} 

727 metadata = pd.DataFrame(metadata) 

728 

729 return metadata 

730 elif 'gondola' not in inp: 

731 if not os.path.isabs(inp): # Check if inp is not an absolute path 

732 # inp = path+inp+'.csv' 

733 inp = os.path.join(path, inp + '.csv') 

734 metadata = {} 

735 header = pd.read_csv(inp, encoding=ENCODING, nrows=0).columns 

736 

737 # Regex patterns for extracting units and data types 

738 unit_pattern = re.compile(r'\[(.*?)\]') 

739 datatype_pattern = re.compile(r'\{(.*?)\}') 

740 

741 for col in header: 

742 

743 # Extract everything before the first '[' 

744 base_name_match = re.match( 

745 r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col 

746 ) 

747 

748 # Extracting the base name with an optional decimal part 

749 if base_name_match: 

750 base_name = base_name_match.group( 

751 1).strip() # Capture base name 

752 # Check for the optional decimal part 

753 if base_name_match.group(2): 

754 base_name += f".{base_name_match.group(2)}" 

755 else: 

756 base_name = col.strip() 

757 # Extract the unit and datatype if they exist 

758 unit_match = unit_pattern.search(col) 

759 datatype_match = datatype_pattern.search(col) 

760 

761 unit = unit_match.group(1) if unit_match else '' 

762 datatype = datatype_match.group(1) if datatype_match else '' 

763 if base_name in ['Longitude', 'Latitude']: 

764 base_name = str(base_name) + '_' + str(unit) 

765 metadata[base_name] = {'unit': unit, 'datatype': datatype} 

766 metadata = pd.DataFrame(metadata) 

767 return metadata 

768 

769 @staticmethod 

770 def plot_gen(metadata, df1): 

771 figures = [] 

772 axes = [] 

773 for col in df1.columns: 

774 fig, ax = plt.subplots(figsize=(12, 6)) 

775 ax.plot(df1.index, df1[col], label=col) 

776 unit = metadata.get(col, {}).get('unit', '') 

777 ylabel = f"{col} [{unit}]" if unit else col 

778 ax.set_ylabel(ylabel) 

779 ax.set_xlabel('Time') 

780 ax.set_title('Plot of Interpolated Data') 

781 ax.legend() 

782 ax.grid(True) 

783 figures.append(fig) 

784 axes.append(ax) 

785 return figures, axes 

786 

787 @staticmethod 

788 def plot_gen_get_back(metadata, df1): 

789 figures = [] 

790 axes = [] 

791 for col in df1.columns: 

792 fig, ax = plt.subplots(figsize=(12, 6)) 

793 ax.plot(df1[col].dropna(), label=col) 

794 unit = metadata[col].dropna().get('unit', '') 

795 ylabel = f"{col} [{unit}]" if unit else col 

796 ax.set_ylabel(ylabel) 

797 ax.set_xlabel('Time') 

798 ax.set_title('Plot of Interpolated Data') 

799 ax.legend() 

800 ax.grid(True) 

801 figures.append(fig) 

802 axes.append(ax) 

803 return figures, axes 

804 

805 

806def create_hklog(directory_path, output_metadata_file='hk_log.json', encoding='ISO8859-1'): 

807 import json 

808 

809 metadata = {} 

810 files = glob.glob(os.path.join(directory_path, '**/*'), recursive=True) 

811 combined_files = [] 

812 dump = [] # Initialize a list to store the results 

813 one_time_check = 0 

814 for file in files: 

815 if not file.endswith('/') and 'gondola' in file and file.endswith('.txt'): 

816 # print('making gondola') 

817 one_time_check += 1 

818 if one_time_check == 1: 

819 combined_files.append(file) 

820 # print(file) 

821 else: 

822 dump.append(file) # Add to combined list 

823 elif not file.endswith('/') and file.endswith('.csv') and 'gondola' not in file: 

824 

825 combined_files.append(file) 

826 # print(combined_files) 

827 for filename in combined_files: 

828 try: 

829 columns = HK.get_keys( 

830 filename, path=directory_path).columns.tolist() 

831 metadata[filename] = columns 

832 # print(filename) 

833 except PermissionError: 

834 log.error(f"Permission denied while accesing: {filename}") 

835 continue 

836 

837 # Save metadata to a JSON file for future use 

838 with open(os.path.join(directory_path, output_metadata_file), 'w') as f: 

839 json.dump(metadata, f) 

840 

841 

842def generate_plotpdf( 

843 key, 

844 ts, 

845 base_path='/data/sunrise/2024/SUSI/hk', 

846 method='linear', 

847 order=1, 

848 specific=None, 

849 time_marker=None, 

850 opath=None, translate_map=True 

851): 

852 

853 metadata, df1 = HK(key, ts, base_path).combine_interpolate(method=method, order=order, specific=specific, 

854 translate_map=translate_map) 

855 if opath is not None: 

856 os.makedirs(opath, exist_ok=True) 

857 HK.plot_genpdf(metadata, df1, time_marker=time_marker, 

858 name=os.path.join(opath, 'hk_plot.pdf')) 

859 metadata.to_csv(os.path.join(opath, 'hk_metadata.csv')) 

860 df1.to_csv(os.path.join(opath, 'hk_data.csv'))