Coverage for src/susi/db/hk_script.py: 10%
507 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3import time
4import glob
5import re
6import pandas as pd
7import numpy as np
8import matplotlib.pyplot as plt
9from scipy.interpolate import interp1d
10from matplotlib.backends.backend_pdf import PdfPages
11import json
12import os
13import logging
14import collections
16# from susi.base.globals import Globals
17from ..base import Logging
19from ..base.globals import Globals
21ENCODING = 'ISO8859-1'
23log = Logging.get_logger()
26class HK:
27 def __init__(self, key: list, ts=None, base_path='/data/sunrise/2024/SUSI/hk'):
28 """param key: list of keys (type list of strings)
29 param scomp: name of the subcomponent (type: string)
30 param ts: list of timestamps or tuple, (start, end, step)
31 """
33 self.ts = ts
34 if type(key) is str:
35 self.key = [key]
36 else:
37 self.key = key
38 self._key_input = key # added because modifying self.key later
40 self.base_path = base_path
41 try:
42 self.hk_log = os.path.join(self.base_path, 'hk_log.json')
43 self.file_list = list(self.get_files_from_hklog().keys())
45 except FileNotFoundError:
46 log.error(
47 "file hk_log.json not found. \nCreate a json fie with hk key location using hk_script.create_hklog"
48 )
50 if ts is not None:
52 if isinstance(ts, (list, np.ndarray)):
53 self.ts = pd.to_datetime(ts, format="%Y-%m-%d %H:%M:%S.%f")
54 elif isinstance(ts, pd.DatetimeIndex):
55 self.ts = ts
56 elif isinstance(ts, tuple) and len(ts) == 3:
57 start, end, step = ts
59 if isinstance(step, (int, float)):
60 self.ts = HK.gen_dt(start, end, step)
61 else:
62 raise log.error(
63 "The step must be a numeric value representing seconds.")
64 else:
65 raise log.error(
66 "ts must be either a list of datetime objects/strings or "
67 "a tuple : (start_datetime, end_datetime, step)."
68 )
69 elif ts is None:
70 if type(key) is str:
71 self.key_metadata, self.key_data = self.load_hk(
72 self.file_list[0])
74 elif type(key) is list:
75 log.warning(
76 "A list of keys given, returning data for the first key")
78 self.key_metadata, self.key_data = self.load_hk(
79 self.file_list[0])
81 def get_files_from_hklog(self):
82 with open(os.path.join(self.base_path, self.hk_log), 'r') as f:
83 data_key = json.load(f)
84 files_to_load = {}
85 for col in self._key_input:
86 for file, columns in data_key.items():
87 if col in columns:
88 files_to_load[os.path.basename(file)] = list(
89 filter(lambda x: x in self._key_input, columns))
90 return files_to_load
92 @staticmethod
93 def gen_dt(start, end, step=1, unit='s'):
94 """
95 function creates a datetime array with params: start,end,step
96 start: starting datetime. String in format 'yyyy-mm-dd HH:mm:ss:ffff' or
97 tuple/data/sunrise/2024/SUSI/hk/susi_scu_reduced.csv, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)
98 end: ending datetime String in format 'yyyy-mm-dd HH:mm:ss:ffff' or
99 tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)
100 step: step size in seconds datatype: int or float
101 """
103 if type(start) is tuple:
104 start = HK.tuple_to_dt_str(start)
105 end = HK.tuple_to_dt_str(end)
106 start = pd.to_datetime(start)
107 end = pd.to_datetime(end)
108 datetime_arr = pd.date_range(
109 start=start, end=end, freq=pd.to_timedelta(step, unit=unit))
110 elif type(start) is str:
112 datetime_arr = pd.date_range(
113 start=start, end=end, freq=pd.to_timedelta(step, unit=unit))
115 else:
116 raise log.error(
117 "String in format 'yyyy-mm-dd HH:mm:ss:ffff' or " "tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)"
118 )
120 return datetime_arr
122 @staticmethod
123 def tuple_to_dt_str(tupl):
124 x = '-'.join(str(x) for x in tupl[:3])
125 y = ':'.join(str(y) for y in tupl[3:6])
126 if len(tupl) > 6:
127 return x + " " + y + f'.{tupl[6]}'
128 else:
129 return x + " " + y
131 @staticmethod
132 def F2_translator(data):
133 return Globals.F2_MECH_POSITION_MAP.get(data, 'unknown')
135 @staticmethod
136 def FwPos_translator(data):
137 data = int(data / (10**6))
139 result = min(Globals.F2_MECH_POSITION_MAP.keys(),
140 key=lambda x: abs(x - data))
141 return Globals.F2_MECH_POSITION_MAP.get(result, 'unknown')
143 @staticmethod
144 def AO_runmodenr_translator(data):
145 result = Globals.A0_RUNMODENR_MAPPING.get(data, str(data))
146 return result
148 @staticmethod
149 def pointingstate_translator(state):
150 state = int(state)
151 binary_representation = format(state, '08b')
153 # Create a dictionary to store each state bit value
154 state_bits = {}
156 # Loop through each bit in the binary representation and map to the state name
157 for i in range(8):
158 state_bits[Globals.POINTING_STATE_MAPPING[i]] = int(
159 binary_representation[7 - i])
161 return state_bits
163 @staticmethod
164 def translate_multiple_columns(metadata, data):
165 translation_functions = {
166 'F2_Mech_Position': HK.F2_translator,
167 'AO_runmodenr': HK.AO_runmodenr_translator,
168 'FwPos': HK.FwPos_translator,
169 }
171 for column in data.columns:
172 if column in translation_functions:
173 data[column] = data[column].apply(
174 translation_functions[column])
175 if column in Globals.UNITMAP:
176 metadata.loc['unit', column] = Globals.UNITMAP[column]
177 metadata.loc['datatype',
178 column] = Globals.Data_Type_Map[column]
179 if column == 'PointingState':
180 state_columns = data['PointingState'].apply(
181 HK.pointingstate_translator).apply(pd.Series)
182 data = pd.concat([data, state_columns], axis=1)
183 data = data.drop(columns=['PointingState'])
185 return metadata, data
187 def get_back(self):
189 # metadata_comb = pd.DataFrame()
190 # interpolated_comb = pd.DataFrame()
191 file_list = self.file_list
192 hk_log_dict = self.get_files_from_hklog()
193 ########################################################
194 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated)
195 keys_list_hk_log = hk_log_dict.values()
196 keys_list_hk_log = [
197 item for sublist in keys_list_hk_log for item in sublist]
198 list_dup_keys = [
199 # list of duplicate keys
200 item for item, count in collections.Counter(keys_list_hk_log).items() if count > 1
201 ]
203 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values()
204 globals_dup_key_list = [
205 item for sublist in globals_dup_key_list for item in sublist]
206 if any(item not in globals_dup_key_list for item in list_dup_keys):
207 log.warn(
208 'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary'
209 'and are thus being removed.\n Add them to globals dict to analyse them:'
210 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}'
211 )
212 dup_list = list(
213 filter(lambda item: item in globals_dup_key_list, list_dup_keys))
215 if len(list_dup_keys) > 0:
217 # remove duplicated keys from self.key
218 self.key = list(filter(lambda x: x not in list_dup_keys, self.key))
219 dupli_check = True
221 ##################################################
222 gondola_path_check = False
223 if any('gondola' or 'SR3' in file1 for file1 in file_list):
224 gondola_path_check = True
225 print('gondola keys present')
226 file_list = [
227 file1
228 for file1 in file_list
229 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv')
230 ]
231 current_duplicated = []
232 # concat=[]
233 all_dataframes = []
234 concat_metadata = []
235 for file in file_list:
237 if len(list_dup_keys) > 0:
238 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys():
239 keys_in_trusted = list(
240 Globals.TRUSTED_FILE_DUPLI_KEYS[file])
242 # returns duplicated keys from input that are present in this dict
243 current_duplicated = list(
244 filter(lambda x: x in keys_in_trusted, list_dup_keys))
245 self.key = self.key + current_duplicated
247 list_dup_keys = list(
248 filter(lambda item: item not in current_duplicated, list_dup_keys))
250 metadata, df = self.load_hk(os.path.join(self.base_path, file))
251 metadata, df = HK.translate_multiple_columns(metadata, df)
252 all_dataframes.append(df)
253 concat_metadata.append(metadata)
254 if df is None:
256 print('None found')
257 continue
259 self.key = list(
260 filter(lambda item: item not in current_duplicated, self.key))
262 if gondola_path_check:
263 metadata_g, df_g = self.load_gondola_hk()
265 concat_metadata.append(metadata_g)
266 all_dataframes.append((df_g))
267 all_dataframes = pd.concat(all_dataframes)
268 concat_metadata = pd.concat(concat_metadata)
269 return concat_metadata, all_dataframes
271 def combine_interpolate(self, method='linear', order=1, specific=None, translate_map=True):
273 metadata_comb = pd.DataFrame()
274 interpolated_comb = pd.DataFrame()
275 file_list = self.file_list
276 hk_log_dict = self.get_files_from_hklog()
277 ########################################################
278 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated)
279 keys_list_hk_log = hk_log_dict.values()
281 keys_list_hk_log = [
282 item for sublist in keys_list_hk_log for item in sublist]
283 [
284 log.warn(f'key: {key} not found. Processing for remaining keys')
285 for key in self._key_input
286 if key not in set(keys_list_hk_log)
287 ]
289 list_dup_keys = [
290 # list of duplicate keys
291 item for item, count in collections.Counter(keys_list_hk_log).items() if count > 1
292 ]
293 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values()
294 globals_dup_key_list = [
295 item for sublist in globals_dup_key_list for item in sublist]
296 # checks if input key is duplicated and whether registered in trusted_tupli_dict
297 if any(item not in globals_dup_key_list for item in list_dup_keys):
298 files_dup_key = [file for file in file_list if any(
299 item in hk_log_dict[file] for item in list_dup_keys)]
300 log.warn(
301 f'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary'
302 'and are thus being removed.\n Add them to globals dict to analyse them:'
303 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}'
304 f'\n These keys are present in the following files: {files_dup_key}'
305 )
306 dup_list = list(
307 filter(lambda item: item in globals_dup_key_list, list_dup_keys))
309 if len(list_dup_keys) > 0:
311 # remove duplicated keys from self.key
312 self.key = list(filter(lambda x: x not in list_dup_keys, self.key))
313 dupli_check = True
315 ##################################################
316 gondola_path_check = False
317 if any('SR3' in file1 for file1 in file_list):
319 gondola_path_check = True
320 print('gondola keys present')
322 file_list = [
323 file1
324 for file1 in file_list
325 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv')
326 ]
327 current_duplicated = []
328 for file in file_list:
329 if len(list_dup_keys) > 0:
330 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys():
331 keys_in_trusted = list(
332 Globals.TRUSTED_FILE_DUPLI_KEYS[file])
334 # returns duplicated keys from input that are present in this dict
335 current_duplicated = list(
336 filter(lambda x: x in keys_in_trusted, list_dup_keys))
337 self.key = self.key + current_duplicated
339 list_dup_keys = list(
340 filter(lambda item: item not in current_duplicated, list_dup_keys))
342 metadata, df = self.load_hk(os.path.join(self.base_path, file))
343 if df is None:
345 print('None found')
346 continue
347 interpolated_data = self._interpolate(
348 df=df, method=method, order=order, specific=specific)
349 if interpolated_comb.empty:
350 metadata_comb = metadata
351 interpolated_comb = interpolated_data
353 else:
354 metadata_comb = pd.concat([metadata_comb, metadata], axis=1)
356 interpolated_comb = pd.concat(
357 [interpolated_comb, interpolated_data], axis=1)
358 self.key = list(
359 filter(lambda item: item not in current_duplicated, self.key))
361 if gondola_path_check:
362 print('in gondola file check')
363 metadata_g, df_g = self.load_gondola_hk()
365 interpolated_data_g = self._interpolate(
366 df=df_g, method=method, order=order, specific=specific)
367 if interpolated_comb.empty:
368 metadata_comb = metadata_g
369 interpolated_comb = interpolated_data_g
371 else:
372 metadata_comb = pd.concat([metadata_comb, metadata_g], axis=1)
373 # concat other with gondola hk data
374 interpolated_comb = pd.concat(
375 [interpolated_comb, interpolated_data_g], axis=1)
376 interpolated_comb = interpolated_comb.loc[:,
377 ~interpolated_comb.columns.duplicated()]
378 metadata_comb = metadata_comb.loc[:,
379 ~metadata_comb.columns.duplicated()]
380 if len(interpolated_comb) == 0:
381 log.error("Input Keys don't match. Please check the input keys again")
382 if translate_map is True:
384 metadata_comb, interpolated_comb = HK.translate_multiple_columns(
385 metadata_comb, interpolated_comb)
386 [
387 log.warn(
388 f'key: {key} was given as an input but does not exist in the output data. '
389 'Reasons can be:\n'
390 '1) Key not found. \n'
391 '2) Key removed during processing.\n'
392 '3) wrong file was input in the TRUSTED_FILE_DUPLI_KEYS dict in globals'
393 )
394 for key in self._key_input
395 if key not in set(metadata_comb.columns)
396 ]
397 return metadata_comb, interpolated_comb
399 def load_gondola_hk(self):
400 key = self.key
401 ts = self.ts
402 start_time = ts.min()
403 end_time = ts.max()
405 file_pattern = 'SR3_Flight_2024_07_{}.txt'
406 df_l = []
407 # for loop to iterate files day wise
408 print('cleaning gondola files')
410 for date in pd.date_range(start_time, end_time):
412 file_name = file_pattern.format(date.day)
413 file_path = os.path.join(self.base_path, 'gondola', file_name)
415 data = pd.read_csv(file_path, index_col=False, low_memory=False)
416 data.columns = [c.strip() for c in data.columns]
418 data['datetime'] = pd.to_datetime(
419 data['Date'] + ' ' + data['hh:mm:ss'], errors='coerce')
420 data = data.applymap(lambda x: np.nan if isinstance(
421 x, str) and x.strip() == '' else x)
422 # data = data[data['datetime'].dt.date == date.date()]
423 # print(data['datetime'].dt.date,date.date())
424 data['datetime'] = data['datetime'].interpolate()
425 data = data.sort_values('datetime')
426 data.set_index(data['datetime'], inplace=True)
427 data = data[~data.index.duplicated(keep='first')].copy()
428 # if start_time in data.index:
429 # print('printing start index:' +str(start_index))
430 # start_index = data.index.get_loc(start_time)
431 # else:
432 # start_index = data.index.get_indexer(
433 # [start_time], method='pad')[0]
435 # if end_time in data.index:
436 # end_index = data.index.get_loc(end_time)
437 # else:
438 # end_index = data.index.get_indexer(
439 # [end_time], method='backfill')[0]
441 # subset_indices = data.index[start_index:end_index]
442 # data = data.loc[subset_indices]
444 df_l.append(data)
446 df_1 = pd.concat(df_l, axis=0)
447 df_1.sort_index(inplace=True) # Ensure chronological order after concatenation
448 df_1 = df_1[~df_1.index.duplicated(keep='first')].copy()
449 if start_time in df_1.index:
450 start_index = df_1.index.get_loc(start_time)
451 else:
452 start_index = df_1.index.get_indexer(
453 [start_time], method='pad')[0]
455 if end_time in df_1.index:
456 end_index = df_1.index.get_loc(end_time)
457 else:
458 end_index = df_1.index.get_indexer(
459 [end_time], method='backfill')[0]
460 subset_indices_c = df_1.index[start_index:end_index+1]
461 df_1 = df_1.loc[subset_indices_c]
462 columns_to_keep, metadata = self._key_matcher_gondola(df_1, key)
464 if columns_to_keep is None:
465 return None, None
466 elif columns_to_keep is not None:
468 df_filtered = df_1[list(columns_to_keep.keys())]
470 df_filtered = df_filtered.rename(columns=columns_to_keep)
471 df_filtered.index = pd.to_datetime(
472 df_filtered.index, format='mixed')
473 df_filtered.index = df_filtered.index.rename('Time')
474 df_filtered = df_filtered[~df_filtered.index.duplicated(
475 keep='first')].copy()
476 df_filtered = df_filtered.apply(self.convert_numeric)
478 metadata = pd.DataFrame(metadata)
480 return metadata, df_filtered
482 def convert_numeric(self, col):
483 # Convert only convertible entries to float, non-convertible ones remain as is
484 return pd.to_numeric(col, errors='coerce')
486 def load_hk(self, file_path: str):
487 key = self.key
488 ts = self.ts
490 data = pd.read_csv(file_path, encoding=ENCODING,
491 index_col=False, low_memory=False)
493 data.set_index(data.columns[0], inplace=True)
495 columns_to_keep, metadata = self._key_matcher(data, key)
497 if columns_to_keep is None:
498 return None, None
499 elif columns_to_keep is not None:
501 df = data[list(columns_to_keep.keys())]
503 df = df.rename(columns=columns_to_keep)
505 df.index = pd.to_datetime(df.index, format='mixed')
506 df.index = df.index.rename('Time')
507 df = df[~df.index.duplicated(keep='first')].copy()
509 if ts is not None:
510 ts = pd.to_datetime(ts)
511 start_time = ts.min()
512 end_time = ts.max()
514 if start_time in df.index:
515 start_index = df.index.get_loc(start_time)
516 else:
517 start_index = df.index.get_indexer(
518 [start_time], method='pad')[0]
520 if end_time in df.index:
521 end_index = df.index.get_loc(end_time)
522 else:
523 end_index = df.index.get_indexer(
524 [end_time], method='backfill')[0]
526 subset_indices = df.index[start_index:end_index]
528 # 4. Subset the DataFrame
529 df = df.loc[subset_indices]
530 metadata = pd.DataFrame(metadata)
531 return metadata, df
533 def _key_matcher(self, data, key):
534 data = data
535 key = key
536 header = data.columns
537 columns_to_keep = {}
538 metadata = {}
540 # Regex patterns for extracting units and data types
541 unit_pattern = re.compile(r'\[(.*?)\]')
542 datatype_pattern = re.compile(r'\{(.*?)\}')
543 for col in header:
544 base_name_match = re.match(
545 r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col)
547 # Extracting the base name with an optional decimal part
548 if base_name_match:
549 base_name = base_name_match.group(
550 1).strip() # Capture base name
551 # Check for the optional decimal part
552 if base_name_match.group(2):
553 base_name += f".{base_name_match.group(2)}"
554 else:
555 base_name = col.strip()
556 # Extract the unit and datatype if they exist
557 unit_match = unit_pattern.search(col)
558 datatype_match = datatype_pattern.search(col)
560 unit = unit_match.group(1) if unit_match else ''
561 datatype = datatype_match.group(1) if datatype_match else ''
562 if base_name in ['Longitude', 'Latitude']:
563 print(f'found key {base_name}')
564 base_name = str(base_name) + '_' + str(unit)
565 print(f'key renamed to {base_name}')
567 if base_name in key:
568 columns_to_keep[col] = base_name
569 metadata[base_name] = {'unit': unit, 'datatype': datatype}
571 return columns_to_keep, metadata
573 def _key_matcher_gondola(self, data, key):
574 data = data
575 key = key
576 header = data.columns
577 columns_to_keep = {}
578 metadata = {}
580 for col in header:
581 # Use regex to find the unit within brackets
582 match = re.search(r'\((.*?)\)', col)
583 if match:
584 # Append the found unit to the list
585 unit_name = match.group(1).strip()
586 # Remove brackets and trim whitespace
587 base_name = re.sub(r'\s*\(.*?\)', '', col).strip()
589 else:
590 # If no brackets, append None or any placeholder
591 base_name = col.strip()
592 unit_name = ''
593 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']:
594 print(f'found {base_name} as key ')
595 base_name = str(base_name) + '_gondola_' + str(unit_name)
596 print(f'key renamed to {base_name}')
597 if base_name in key:
598 metadata[base_name] = {'unit': unit_name, 'datatype': ''}
599 columns_to_keep[col] = base_name
600 return columns_to_keep, metadata
602 def _interpolate(self, df, method, order, specific=None):
603 """Specific methods to apply to certain keys.
605 The `specific` parameter is a dictionary that maps specific keys to methods.
606 For example, `{'spline': ['key1'], 'last': ['key3']}` would apply the
607 `spline` method to `key1` and the `last` method to `key3`.
609 Args:
610 specific (dict): Dictionary of specific methods to apply.
611 """
613 df = df
614 method = method
615 order = order
616 specific = specific
618 union_index = df.index.union(self.ts)
619 df_reindexed = df.reindex(union_index)
620 df_reindexed = pd.DataFrame(df_reindexed)
621 methods = {key: method for key in self.key}
622 if specific:
623 for spec_method, keys in specific.items():
624 for k in keys:
625 if k in methods:
626 methods[k] = spec_method
627 self.methods = methods
628 interpolated_data = pd.DataFrame(index=self.ts)
629 custom_default_methods = Globals.CUSTOM_DEFAULT_MODES
630 for col in df.columns:
631 col_method = methods.get(col, 'linear')
632 if col_method == 'default':
633 if col in custom_default_methods['linear']:
634 col_method = 'linear'
635 elif col in custom_default_methods['nn']:
636 col_method = 'nn'
637 elif col in custom_default_methods['last']:
638 col_method = 'last'
639 else:
640 col_method = 'last' # Default fallback
642 if col_method in ['linear', 'spline', 'polynomial']:
644 try:
646 df_reindexed[col] = df_reindexed[col].apply(
647 self.convert_numeric)
648 interpolated_data[col] = df_reindexed[col].interpolate(
649 method=col_method, order=order)
650 except Exception:
651 log.error(
652 f"column method: {col_method} unsuccesful for col: {col}")
654 elif col_method == 'nn':
655 try:
656 df_reindexed[col] = df_reindexed[col].apply(
657 self.convert_numeric)
658 interpolated_data[col] = df[col].reindex(
659 union_index, method='nearest')
660 except Exception:
661 log.error(
662 f"column method: {col_method} unsuccesful for col: {col}")
664 elif col_method == 'last':
665 try:
666 df_reindexed[col] = df_reindexed[col].apply(
667 self.convert_numeric)
668 interpolated_data[col] = df_reindexed[col].ffill()
669 except Exception:
670 log.error(
671 f"column method: {col_method} unsuccesful for col: {col}")
673 else:
674 raise log.error(
675 f"Unsupported interpolation method: {col_method}")
677 interpolated_data = interpolated_data.reindex(self.ts)
678 interpolated_data = interpolated_data.ffill().bfill()
680 return interpolated_data
681 # return df_reindexed
683 @staticmethod
684 def plot_genpdf(metadata, df1, time_marker=None, name='hk_plot.pdf'):
685 with PdfPages(name) as pdf:
687 for col in df1.columns:
688 plt.figure(figsize=(12, 6))
689 plt.plot(df1.index, df1[col], '--x', label=col)
690 unit = metadata.get(col, {}).get('unit', '')
691 ylabel = f"{col} [{unit}]" if unit else col
692 plt.ylabel(ylabel)
693 if time_marker is not None:
694 for tm in time_marker:
695 plt.axvline(x=tm, color='r', linestyle='--')
696 plt.xlabel('Time')
697 plt.title('Plot of Interpolated Data')
698 plt.legend()
699 plt.grid(True)
700 pdf.savefig()
701 plt.close()
703 @staticmethod
704 def get_keys(inp, path='/data/sunrise/2024/SUSI/hk'):
705 if 'gondola' in inp:
706 inp = os.path.join(path, 'gondola/SR3_Flight_2024_07_13.txt')
707 metadata = {}
709 data = pd.read_csv(inp, index_col=False, low_memory=False, nrows=0)
710 data.columns = [c.strip() for c in data.columns]
711 header = data.columns
712 for col in header:
713 # Use regex to find the unit within brackets
714 match = re.search(r'\((.*?)\)', col)
715 if match:
716 # Append the found unit to the list
717 unit_name = match.group(1).strip()
718 base_name = re.sub(r'\s*\(.*?\)', '', col).strip()
720 else:
721 # If no brackets, append None or any placeholder
722 base_name = col.strip()
723 unit_name = ''
724 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']:
725 base_name = str(base_name) + '_gondola_' + str(unit_name)
726 metadata[base_name] = {'unit': unit_name, 'datatype': ''}
727 metadata = pd.DataFrame(metadata)
729 return metadata
730 elif 'gondola' not in inp:
731 if not os.path.isabs(inp): # Check if inp is not an absolute path
732 # inp = path+inp+'.csv'
733 inp = os.path.join(path, inp + '.csv')
734 metadata = {}
735 header = pd.read_csv(inp, encoding=ENCODING, nrows=0).columns
737 # Regex patterns for extracting units and data types
738 unit_pattern = re.compile(r'\[(.*?)\]')
739 datatype_pattern = re.compile(r'\{(.*?)\}')
741 for col in header:
743 # Extract everything before the first '['
744 base_name_match = re.match(
745 r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col
746 )
748 # Extracting the base name with an optional decimal part
749 if base_name_match:
750 base_name = base_name_match.group(
751 1).strip() # Capture base name
752 # Check for the optional decimal part
753 if base_name_match.group(2):
754 base_name += f".{base_name_match.group(2)}"
755 else:
756 base_name = col.strip()
757 # Extract the unit and datatype if they exist
758 unit_match = unit_pattern.search(col)
759 datatype_match = datatype_pattern.search(col)
761 unit = unit_match.group(1) if unit_match else ''
762 datatype = datatype_match.group(1) if datatype_match else ''
763 if base_name in ['Longitude', 'Latitude']:
764 base_name = str(base_name) + '_' + str(unit)
765 metadata[base_name] = {'unit': unit, 'datatype': datatype}
766 metadata = pd.DataFrame(metadata)
767 return metadata
769 @staticmethod
770 def plot_gen(metadata, df1):
771 figures = []
772 axes = []
773 for col in df1.columns:
774 fig, ax = plt.subplots(figsize=(12, 6))
775 ax.plot(df1.index, df1[col], label=col)
776 unit = metadata.get(col, {}).get('unit', '')
777 ylabel = f"{col} [{unit}]" if unit else col
778 ax.set_ylabel(ylabel)
779 ax.set_xlabel('Time')
780 ax.set_title('Plot of Interpolated Data')
781 ax.legend()
782 ax.grid(True)
783 figures.append(fig)
784 axes.append(ax)
785 return figures, axes
787 @staticmethod
788 def plot_gen_get_back(metadata, df1):
789 figures = []
790 axes = []
791 for col in df1.columns:
792 fig, ax = plt.subplots(figsize=(12, 6))
793 ax.plot(df1[col].dropna(), label=col)
794 unit = metadata[col].dropna().get('unit', '')
795 ylabel = f"{col} [{unit}]" if unit else col
796 ax.set_ylabel(ylabel)
797 ax.set_xlabel('Time')
798 ax.set_title('Plot of Interpolated Data')
799 ax.legend()
800 ax.grid(True)
801 figures.append(fig)
802 axes.append(ax)
803 return figures, axes
806def create_hklog(directory_path, output_metadata_file='hk_log.json', encoding='ISO8859-1'):
807 import json
809 metadata = {}
810 files = glob.glob(os.path.join(directory_path, '**/*'), recursive=True)
811 combined_files = []
812 dump = [] # Initialize a list to store the results
813 one_time_check = 0
814 for file in files:
815 if not file.endswith('/') and 'gondola' in file and file.endswith('.txt'):
816 # print('making gondola')
817 one_time_check += 1
818 if one_time_check == 1:
819 combined_files.append(file)
820 # print(file)
821 else:
822 dump.append(file) # Add to combined list
823 elif not file.endswith('/') and file.endswith('.csv') and 'gondola' not in file:
825 combined_files.append(file)
826 # print(combined_files)
827 for filename in combined_files:
828 try:
829 columns = HK.get_keys(
830 filename, path=directory_path).columns.tolist()
831 metadata[filename] = columns
832 # print(filename)
833 except PermissionError:
834 log.error(f"Permission denied while accesing: {filename}")
835 continue
837 # Save metadata to a JSON file for future use
838 with open(os.path.join(directory_path, output_metadata_file), 'w') as f:
839 json.dump(metadata, f)
842def generate_plotpdf(
843 key,
844 ts,
845 base_path='/data/sunrise/2024/SUSI/hk',
846 method='linear',
847 order=1,
848 specific=None,
849 time_marker=None,
850 opath=None, translate_map=True
851):
853 metadata, df1 = HK(key, ts, base_path).combine_interpolate(method=method, order=order, specific=specific,
854 translate_map=translate_map)
855 if opath is not None:
856 os.makedirs(opath, exist_ok=True)
857 HK.plot_genpdf(metadata, df1, time_marker=time_marker,
858 name=os.path.join(opath, 'hk_plot.pdf'))
859 metadata.to_csv(os.path.join(opath, 'hk_metadata.csv'))
860 df1.to_csv(os.path.join(opath, 'hk_data.csv'))