Coverage for src/susi/db/hk_script.py: 10%
510 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-11 10:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-11 10:03 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3import time
4import glob
5import re
6import pandas as pd
7import numpy as np
8import matplotlib.pyplot as plt
9from scipy.interpolate import interp1d
10from matplotlib.backends.backend_pdf import PdfPages
11import json
12import os
13import logging
14import collections
16# from susi.base.globals import Globals
17from ..base import Logging
19from ..base.globals import Globals
21ENCODING = 'ISO8859-1'
23log = Logging.get_logger()
26class HK:
27 def __init__(self, key: list, ts=None, base_path='/data/sunrise/2024/SUSI/hk'):
28 """
29 To retrive values use .combine_interpolate(), to directly plot them use generate_plotpdf()
31 param key: list of keys (type list of strings)
32 param scomp: name of the subcomponent (type: string)
33 param ts: list of timestamps or tuple, (start, end, step)
35 HK object has the following attributes:
36 - ts: list of timestamps
37 - key: list of keys
38 - base_path: path to the hk data
39 - hk_log: path to the hk log file
40 - file_list: list of files to load
42 """
44 self.ts = ts
45 if type(key) is str:
46 self.key = [key]
47 else:
48 self.key = key
49 self._key_input = key # added because modifying self.key later
51 self.base_path = base_path
52 try:
53 self.hk_log = os.path.join(self.base_path, 'hk_log.json')
54 self.file_list = list(self.get_files_from_hklog().keys())
56 except FileNotFoundError:
57 log.error(
58 "file hk_log.json not found. \nCreate a json fie with hk key location using hk_script.create_hklog"
59 )
61 if ts is not None:
63 if isinstance(ts, (list, np.ndarray)):
64 try:
65 self.ts = pd.to_datetime(ts, format="%Y-%m-%d %H:%M:%S.%f")
66 except ValueError:
67 self.ts = pd.to_datetime(ts, format="%Y-%m-%dT%H:%M:%S.%f")
69 elif isinstance(ts, pd.DatetimeIndex):
70 self.ts = ts
71 elif isinstance(ts, tuple) and len(ts) == 3:
72 start, end, step = ts
74 if isinstance(step, (int, float)):
75 self.ts = HK.gen_dt(start, end, step)
76 else:
77 raise log.error("The step must be a numeric value representing seconds.")
78 else:
79 raise log.error(
80 "ts must be either a list of datetime objects/strings or "
81 "a tuple : (start_datetime, end_datetime, step)."
82 )
83 elif ts is None:
84 if type(key) is str:
85 self.key_metadata, self.key_data = self.load_hk(self.file_list[0])
87 elif type(key) is list:
88 log.warning("A list of keys given, returning data for the first key")
90 self.key_metadata, self.key_data = self.load_hk(self.file_list[0])
92 def get_files_from_hklog(self):
93 with open(os.path.join(self.base_path, self.hk_log), 'r') as f:
94 data_key = json.load(f)
95 files_to_load = {}
96 for col in self._key_input:
97 for file, columns in data_key.items():
98 if col in columns:
99 files_to_load[os.path.basename(file)] = list(filter(lambda x: x in self._key_input, columns))
100 return files_to_load
102 @staticmethod
103 def gen_dt(start, end, step=1, unit='s'):
104 """
105 function creates a datetime array with params: start,end,step
106 start: starting datetime. String in format 'yyyy-mm-dd HH:mm:ss:ffff' or
107 tuple/data/sunrise/2024/SUSI/hk/susi_scu_reduced.csv, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)
108 end: ending datetime String in format 'yyyy-mm-dd HH:mm:ss:ffff' or
109 tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)
110 step: step size in seconds datatype: int or float
111 """
113 if type(start) is tuple:
114 start = HK.tuple_to_dt_str(start)
115 end = HK.tuple_to_dt_str(end)
116 start = pd.to_datetime(start)
117 end = pd.to_datetime(end)
118 datetime_arr = pd.date_range(start=start, end=end, freq=pd.to_timedelta(step, unit=unit))
119 elif type(start) is str:
121 datetime_arr = pd.date_range(start=start, end=end, freq=pd.to_timedelta(step, unit=unit))
123 else:
124 raise log.error(
125 "String in format 'yyyy-mm-dd HH:mm:ss:ffff' or " "tuple, (yyyy,mm,dd),(yyyy,mm,dd,HH,mm,ss,ffff)"
126 )
128 return datetime_arr
130 @staticmethod
131 def tuple_to_dt_str(tupl):
132 x = '-'.join(str(x) for x in tupl[:3])
133 y = ':'.join(str(y) for y in tupl[3:6])
134 if len(tupl) > 6:
135 return x + " " + y + f'.{tupl[6]}'
136 else:
137 return x + " " + y
139 @staticmethod
140 def F2_translator(data):
141 return Globals.F2_MECH_POSITION_MAP.get(data, 'unknown')
143 @staticmethod
144 def FwPos_translator(data):
145 data = int(data / (10**6))
147 result = min(Globals.F2_MECH_POSITION_MAP.keys(), key=lambda x: abs(x - data))
148 return Globals.F2_MECH_POSITION_MAP.get(result, 'unknown')
150 @staticmethod
151 def AO_runmodenr_translator(data):
152 result = Globals.A0_RUNMODENR_MAPPING.get(data, str(data))
153 return result
155 @staticmethod
156 def pointingstate_translator(state):
157 state = int(state)
158 binary_representation = format(state, '08b')
160 # Create a dictionary to store each state bit value
161 state_bits = {}
163 # Loop through each bit in the binary representation and map to the state name
164 for i in range(8):
165 state_bits[Globals.POINTING_STATE_MAPPING[i]] = int(binary_representation[7 - i])
167 return state_bits
169 @staticmethod
170 def translate_multiple_columns(metadata, data):
171 translation_functions = {
172 'F2_Mech_Position': HK.F2_translator,
173 'AO_runmodenr': HK.AO_runmodenr_translator,
174 'FwPos': HK.FwPos_translator,
175 }
177 for column in data.columns:
178 if column in translation_functions:
179 data[column] = data[column].apply(translation_functions[column])
180 if column in Globals.UNITMAP:
181 metadata.loc['unit', column] = Globals.UNITMAP[column]
182 metadata.loc['datatype', column] = Globals.Data_Type_Map[column]
183 if column == 'PointingState':
184 state_columns = data['PointingState'].apply(HK.pointingstate_translator).apply(pd.Series)
185 data = pd.concat([data, state_columns], axis=1)
186 data = data.drop(columns=['PointingState'])
188 return metadata, data
190 def get_back(self):
192 # metadata_comb = pd.DataFrame()
193 # interpolated_comb = pd.DataFrame()
194 file_list = self.file_list
195 hk_log_dict = self.get_files_from_hklog()
196 ########################################################
197 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated)
198 keys_list_hk_log = hk_log_dict.values()
199 keys_list_hk_log = [item for sublist in keys_list_hk_log for item in sublist]
200 list_dup_keys = [
201 # list of duplicate keys
202 item
203 for item, count in collections.Counter(keys_list_hk_log).items()
204 if count > 1
205 ]
207 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values()
208 globals_dup_key_list = [item for sublist in globals_dup_key_list for item in sublist]
209 if any(item not in globals_dup_key_list for item in list_dup_keys):
210 log.warn(
211 'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary'
212 'and are thus being removed.\n Add them to globals dict to analyse them:'
213 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}'
214 )
215 dup_list = list(filter(lambda item: item in globals_dup_key_list, list_dup_keys))
217 if len(list_dup_keys) > 0:
219 # remove duplicated keys from self.key
220 self.key = list(filter(lambda x: x not in list_dup_keys, self.key))
221 dupli_check = True
223 ##################################################
224 gondola_path_check = False
225 if any('gondola' or 'SR3' in file1 for file1 in file_list):
226 gondola_path_check = True
227 print('gondola keys present')
228 file_list = [
229 file1
230 for file1 in file_list
231 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv')
232 ]
233 current_duplicated = []
234 # concat=[]
235 all_dataframes = []
236 concat_metadata = []
237 for file in file_list:
239 if len(list_dup_keys) > 0:
240 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys():
241 keys_in_trusted = list(Globals.TRUSTED_FILE_DUPLI_KEYS[file])
243 # returns duplicated keys from input that are present in this dict
244 current_duplicated = list(filter(lambda x: x in keys_in_trusted, list_dup_keys))
245 self.key = self.key + current_duplicated
247 list_dup_keys = list(filter(lambda item: item not in current_duplicated, list_dup_keys))
249 metadata, df = self.load_hk(os.path.join(self.base_path, file))
250 metadata, df = HK.translate_multiple_columns(metadata, df)
251 all_dataframes.append(df)
252 concat_metadata.append(metadata)
253 if df is None:
255 print('None found')
256 continue
258 self.key = list(filter(lambda item: item not in current_duplicated, self.key))
260 if gondola_path_check:
261 metadata_g, df_g = self.load_gondola_hk()
263 concat_metadata.append(metadata_g)
264 all_dataframes.append((df_g))
265 all_dataframes = pd.concat(all_dataframes)
266 concat_metadata = pd.concat(concat_metadata)
267 return concat_metadata, all_dataframes
269 def combine_interpolate(self, method='linear', order=1, specific=None, translate_map=True):
271 metadata_comb = pd.DataFrame()
272 interpolated_comb = pd.DataFrame()
273 file_list = self.file_list
274 hk_log_dict = self.get_files_from_hklog()
275 ########################################################
276 # gets list of subset of keys from hk_log that are present in self.key(i.e duplicated keys get repeated)
277 keys_list_hk_log = hk_log_dict.values()
279 keys_list_hk_log = [item for sublist in keys_list_hk_log for item in sublist]
280 [
281 log.warn(f'key: {key} not found. Processing for remaining keys')
282 for key in self._key_input
283 if key not in set(keys_list_hk_log)
284 ]
286 list_dup_keys = [
287 # list of duplicate keys
288 item
289 for item, count in collections.Counter(keys_list_hk_log).items()
290 if count > 1
291 ]
292 globals_dup_key_list = Globals.TRUSTED_FILE_DUPLI_KEYS.values()
293 globals_dup_key_list = [item for sublist in globals_dup_key_list for item in sublist]
294 # checks if input key is duplicated and whether registered in trusted_tupli_dict
295 if any(item not in globals_dup_key_list for item in list_dup_keys):
296 files_dup_key = [file for file in file_list if any(item in hk_log_dict[file] for item in list_dup_keys)]
297 log.warn(
298 f'Following duplicate key(s) not found in globals TRUSTED_FILE_DUPLI_KEYS dictionary'
299 'and are thus being removed.\n Add them to globals dict to analyse them:'
300 f'\n {[item for item in list_dup_keys if item not in globals_dup_key_list]}'
301 f'\n These keys are present in the following files: {files_dup_key}'
302 )
303 dup_list = list(filter(lambda item: item in globals_dup_key_list, list_dup_keys))
305 if len(list_dup_keys) > 0:
307 # remove duplicated keys from self.key
308 self.key = list(filter(lambda x: x not in list_dup_keys, self.key))
309 dupli_check = True
311 ##################################################
312 gondola_path_check = False
313 if any('SR3' in file1 for file1 in file_list):
315 gondola_path_check = True
316 print('gondola keys present')
318 file_list = [
319 file1
320 for file1 in file_list
321 if 'SR3' not in file1 and not file1.endswith('/') and file1.endswith('.csv')
322 ]
323 current_duplicated = []
324 for file in file_list:
325 if len(list_dup_keys) > 0:
326 if file in Globals.TRUSTED_FILE_DUPLI_KEYS.keys():
327 keys_in_trusted = list(Globals.TRUSTED_FILE_DUPLI_KEYS[file])
329 # returns duplicated keys from input that are present in this dict
330 current_duplicated = list(filter(lambda x: x in keys_in_trusted, list_dup_keys))
331 self.key = self.key + current_duplicated
333 list_dup_keys = list(filter(lambda item: item not in current_duplicated, list_dup_keys))
335 metadata, df = self.load_hk(os.path.join(self.base_path, file))
336 if df is None:
338 print('None found')
339 continue
340 interpolated_data = self._interpolate(df=df, method=method, order=order, specific=specific)
341 if interpolated_comb.empty:
342 metadata_comb = metadata
343 interpolated_comb = interpolated_data
345 else:
346 metadata_comb = pd.concat([metadata_comb, metadata], axis=1)
348 interpolated_comb = pd.concat([interpolated_comb, interpolated_data], axis=1)
349 self.key = list(filter(lambda item: item not in current_duplicated, self.key))
351 if gondola_path_check:
352 print('in gondola file check')
353 metadata_g, df_g = self.load_gondola_hk()
355 interpolated_data_g = self._interpolate(df=df_g, method=method, order=order, specific=specific)
356 if interpolated_comb.empty:
357 metadata_comb = metadata_g
358 interpolated_comb = interpolated_data_g
360 else:
361 metadata_comb = pd.concat([metadata_comb, metadata_g], axis=1)
362 # concat other with gondola hk data
363 interpolated_comb = pd.concat([interpolated_comb, interpolated_data_g], axis=1)
364 interpolated_comb = interpolated_comb.loc[:, ~interpolated_comb.columns.duplicated()]
365 metadata_comb = metadata_comb.loc[:, ~metadata_comb.columns.duplicated()]
366 if len(interpolated_comb) == 0:
367 log.error("Input Keys don't match. Please check the input keys again")
368 if translate_map is True:
370 metadata_comb, interpolated_comb = HK.translate_multiple_columns(metadata_comb, interpolated_comb)
371 [
372 log.warn(
373 f'key: {key} was given as an input but does not exist in the output data. '
374 'Reasons can be:\n'
375 '1) Key not found. \n'
376 '2) Key removed during processing.\n'
377 '3) wrong file was input in the TRUSTED_FILE_DUPLI_KEYS dict in globals'
378 )
379 for key in self._key_input
380 if key not in set(metadata_comb.columns)
381 ]
382 return metadata_comb, interpolated_comb
384 def load_gondola_hk(self):
385 key = self.key
386 ts = self.ts
387 start_time = ts.min()
388 end_time = ts.max()
390 file_pattern = 'SR3_Flight_2024_07_{}.txt'
391 df_l = []
392 # for loop to iterate files day wise
393 print('cleaning gondola files')
395 for date in pd.date_range(start_time, end_time):
397 file_name = file_pattern.format(date.day)
398 file_path = os.path.join(self.base_path, 'gondola', file_name)
400 data = pd.read_csv(file_path, index_col=False, low_memory=False)
401 data.columns = [c.strip() for c in data.columns]
403 data['datetime'] = pd.to_datetime(data['Date'] + ' ' + data['hh:mm:ss'], errors='coerce')
404 data = data.applymap(lambda x: np.nan if isinstance(x, str) and x.strip() == '' else x)
405 # data = data[data['datetime'].dt.date == date.date()]
406 # print(data['datetime'].dt.date,date.date())
407 data['datetime'] = data['datetime'].interpolate()
408 data = data.sort_values('datetime')
409 data.set_index(data['datetime'], inplace=True)
410 data = data[~data.index.duplicated(keep='first')].copy()
411 # if start_time in data.index:
412 # print('printing start index:' +str(start_index))
413 # start_index = data.index.get_loc(start_time)
414 # else:
415 # start_index = data.index.get_indexer(
416 # [start_time], method='pad')[0]
418 # if end_time in data.index:
419 # end_index = data.index.get_loc(end_time)
420 # else:
421 # end_index = data.index.get_indexer(
422 # [end_time], method='backfill')[0]
424 # subset_indices = data.index[start_index:end_index]
425 # data = data.loc[subset_indices]
427 df_l.append(data)
429 df_1 = pd.concat(df_l, axis=0)
430 df_1.sort_index(inplace=True) # Ensure chronological order after concatenation
431 df_1 = df_1[~df_1.index.duplicated(keep='first')].copy()
432 if start_time in df_1.index:
433 start_index = df_1.index.get_loc(start_time)
434 else:
435 start_index = df_1.index.get_indexer([start_time], method='pad')[0]
437 if end_time in df_1.index:
438 end_index = df_1.index.get_loc(end_time)
439 else:
440 end_index = df_1.index.get_indexer([end_time], method='backfill')[0]
441 subset_indices_c = df_1.index[start_index : end_index + 1]
442 df_1 = df_1.loc[subset_indices_c]
443 columns_to_keep, metadata = self._key_matcher_gondola(df_1, key)
445 if columns_to_keep is None:
446 return None, None
447 elif columns_to_keep is not None:
449 df_filtered = df_1[list(columns_to_keep.keys())]
451 df_filtered = df_filtered.rename(columns=columns_to_keep)
452 df_filtered.index = pd.to_datetime(df_filtered.index, format='mixed')
453 df_filtered.index = df_filtered.index.rename('Time')
454 df_filtered = df_filtered[~df_filtered.index.duplicated(keep='first')].copy()
455 df_filtered = df_filtered.apply(self.convert_numeric)
457 metadata = pd.DataFrame(metadata)
459 return metadata, df_filtered
461 def convert_numeric(self, col):
462 # Convert only convertible entries to float, non-convertible ones remain as is
463 return pd.to_numeric(col, errors='coerce')
465 def load_hk(self, file_path: str):
466 key = self.key
467 ts = self.ts
469 data = pd.read_csv(file_path, encoding=ENCODING, index_col=False, low_memory=False)
471 data.set_index(data.columns[0], inplace=True)
473 columns_to_keep, metadata = self._key_matcher(data, key)
475 if columns_to_keep is None:
476 return None, None
477 elif columns_to_keep is not None:
479 df = data[list(columns_to_keep.keys())]
481 df = df.rename(columns=columns_to_keep)
483 df.index = pd.to_datetime(df.index, format='mixed')
484 df.index = df.index.rename('Time')
485 df = df[~df.index.duplicated(keep='first')].copy()
487 if ts is not None:
488 ts = pd.to_datetime(ts)
489 start_time = ts.min()
490 end_time = ts.max()
492 if start_time in df.index:
493 start_index = df.index.get_loc(start_time)
494 else:
495 start_index = df.index.get_indexer([start_time], method='pad')[0]
497 if end_time in df.index:
498 end_index = df.index.get_loc(end_time)
499 else:
500 end_index = df.index.get_indexer([end_time], method='backfill')[0]
502 subset_indices = df.index[start_index:end_index]
504 # 4. Subset the DataFrame
505 df = df.loc[subset_indices]
506 metadata = pd.DataFrame(metadata)
507 return metadata, df
509 def _key_matcher(self, data, key):
510 data = data
511 key = key
512 header = data.columns
513 columns_to_keep = {}
514 metadata = {}
516 # Regex patterns for extracting units and data types
517 unit_pattern = re.compile(r'\[(.*?)\]')
518 datatype_pattern = re.compile(r'\{(.*?)\}')
519 for col in header:
520 base_name_match = re.match(r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col)
522 # Extracting the base name with an optional decimal part
523 if base_name_match:
524 base_name = base_name_match.group(1).strip() # Capture base name
525 # Check for the optional decimal part
526 if base_name_match.group(2):
527 base_name += f".{base_name_match.group(2)}"
528 else:
529 base_name = col.strip()
530 # Extract the unit and datatype if they exist
531 unit_match = unit_pattern.search(col)
532 datatype_match = datatype_pattern.search(col)
534 unit = unit_match.group(1) if unit_match else ''
535 datatype = datatype_match.group(1) if datatype_match else ''
536 if base_name in ['Longitude', 'Latitude']:
537 print(f'found key {base_name}')
538 base_name = str(base_name) + '_' + str(unit)
539 print(f'key renamed to {base_name}')
541 if base_name in key:
542 columns_to_keep[col] = base_name
543 metadata[base_name] = {'unit': unit, 'datatype': datatype}
545 return columns_to_keep, metadata
547 def _key_matcher_gondola(self, data, key):
548 data = data
549 key = key
550 header = data.columns
551 columns_to_keep = {}
552 metadata = {}
554 for col in header:
555 # Use regex to find the unit within brackets
556 match = re.search(r'\((.*?)\)', col)
557 if match:
558 # Append the found unit to the list
559 unit_name = match.group(1).strip()
560 # Remove brackets and trim whitespace
561 base_name = re.sub(r'\s*\(.*?\)', '', col).strip()
563 else:
564 # If no brackets, append None or any placeholder
565 base_name = col.strip()
566 unit_name = ''
567 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']:
568 print(f'found {base_name} as key ')
569 base_name = str(base_name) + '_gondola_' + str(unit_name)
570 print(f'key renamed to {base_name}')
571 if base_name in key:
572 metadata[base_name] = {'unit': unit_name, 'datatype': ''}
573 columns_to_keep[col] = base_name
574 return columns_to_keep, metadata
576 def _interpolate(self, df, method, order, specific=None):
577 """Specific methods to apply to certain keys.
579 The `specific` parameter is a dictionary that maps specific keys to methods.
580 For example, `{'spline': ['key1'], 'last': ['key3']}` would apply the
581 `spline` method to `key1` and the `last` method to `key3`.
583 Args:
584 specific (dict): Dictionary of specific methods to apply.
585 """
587 df = df
588 method = method
589 order = order
590 specific = specific
592 union_index = df.index.union(self.ts)
593 df_reindexed = df.reindex(union_index)
594 df_reindexed = pd.DataFrame(df_reindexed)
595 methods = {key: method for key in self.key}
596 if specific:
597 for spec_method, keys in specific.items():
598 for k in keys:
599 if k in methods:
600 methods[k] = spec_method
601 self.methods = methods
602 interpolated_data = pd.DataFrame(index=self.ts)
603 custom_default_methods = Globals.CUSTOM_DEFAULT_MODES
604 for col in df.columns:
605 col_method = methods.get(col, 'linear')
606 if col_method == 'default':
607 if col in custom_default_methods['linear']:
608 col_method = 'linear'
609 elif col in custom_default_methods['nn']:
610 col_method = 'nn'
611 elif col in custom_default_methods['last']:
612 col_method = 'last'
613 else:
614 col_method = 'last' # Default fallback
616 if col_method in ['linear', 'spline', 'polynomial']:
618 try:
620 df_reindexed[col] = df_reindexed[col].apply(self.convert_numeric)
621 interpolated_data[col] = df_reindexed[col].interpolate(method=col_method, order=order)
622 except Exception:
623 log.error(f"column method: {col_method} unsuccesful for col: {col}")
625 elif col_method == 'nn':
626 try:
627 df_reindexed[col] = df_reindexed[col].apply(self.convert_numeric)
628 interpolated_data[col] = df[col].reindex(union_index, method='nearest')
629 except Exception:
630 log.error(f"column method: {col_method} unsuccesful for col: {col}")
632 elif col_method == 'last':
633 try:
634 df_reindexed[col] = df_reindexed[col].apply(self.convert_numeric)
635 interpolated_data[col] = df_reindexed[col].ffill()
636 except Exception:
637 log.error(f"column method: {col_method} unsuccesful for col: {col}")
639 else:
640 raise log.error(f"Unsupported interpolation method: {col_method}")
642 interpolated_data = interpolated_data.reindex(self.ts)
643 interpolated_data = interpolated_data.ffill().bfill()
645 return interpolated_data
646 # return df_reindexed
648 @staticmethod
649 def plot_genpdf(metadata, df1, time_marker=None, name='hk_plot.pdf'):
650 with PdfPages(name) as pdf:
652 for col in df1.columns:
653 plt.figure(figsize=(12, 6))
654 plt.plot(df1.index, df1[col], '--x', label=col)
655 unit = metadata.get(col, {}).get('unit', '')
656 ylabel = f"{col} [{unit}]" if unit else col
657 plt.ylabel(ylabel)
658 if time_marker is not None:
659 for tm in time_marker:
660 plt.axvline(x=tm, color='r', linestyle='--')
661 plt.xlabel('Time')
662 plt.title('Plot of Interpolated Data')
663 plt.legend()
664 plt.grid(True)
665 pdf.savefig()
666 plt.close()
668 @staticmethod
669 def get_keys(inp, path='/data/sunrise/2024/SUSI/hk'):
670 if 'gondola' in inp:
671 inp = os.path.join(path, 'gondola/SR3_Flight_2024_07_13.txt')
672 metadata = {}
674 data = pd.read_csv(inp, index_col=False, low_memory=False, nrows=0)
675 data.columns = [c.strip() for c in data.columns]
676 header = data.columns
677 for col in header:
678 # Use regex to find the unit within brackets
679 match = re.search(r'\((.*?)\)', col)
680 if match:
681 # Append the found unit to the list
682 unit_name = match.group(1).strip()
683 base_name = re.sub(r'\s*\(.*?\)', '', col).strip()
685 else:
686 # If no brackets, append None or any placeholder
687 base_name = col.strip()
688 unit_name = ''
689 if base_name in ['Latitude', 'Longitude', 'Brake', 'Track State']:
690 base_name = str(base_name) + '_gondola_' + str(unit_name)
691 metadata[base_name] = {'unit': unit_name, 'datatype': ''}
692 metadata = pd.DataFrame(metadata)
694 return metadata
695 elif 'gondola' not in inp:
696 if not os.path.isabs(inp): # Check if inp is not an absolute path
697 # inp = path+inp+'.csv'
698 inp = os.path.join(path, inp + '.csv')
699 metadata = {}
700 header = pd.read_csv(inp, encoding=ENCODING, nrows=0).columns
702 # Regex patterns for extracting units and data types
703 unit_pattern = re.compile(r'\[(.*?)\]')
704 datatype_pattern = re.compile(r'\{(.*?)\}')
706 for col in header:
708 # Extract everything before the first '['
709 base_name_match = re.match(
710 r'^\s*([^\[\s]+(?:\s[^\[\s]+)*)?(?:\s*\[[^\]]+\]\s*\{[^}]+\})?(?:\.(\d+))?', col
711 )
713 # Extracting the base name with an optional decimal part
714 if base_name_match:
715 base_name = base_name_match.group(1).strip() # Capture base name
716 # Check for the optional decimal part
717 if base_name_match.group(2):
718 base_name += f".{base_name_match.group(2)}"
719 else:
720 base_name = col.strip()
721 # Extract the unit and datatype if they exist
722 unit_match = unit_pattern.search(col)
723 datatype_match = datatype_pattern.search(col)
725 unit = unit_match.group(1) if unit_match else ''
726 datatype = datatype_match.group(1) if datatype_match else ''
727 if base_name in ['Longitude', 'Latitude']:
728 base_name = str(base_name) + '_' + str(unit)
729 metadata[base_name] = {'unit': unit, 'datatype': datatype}
730 metadata = pd.DataFrame(metadata)
731 return metadata
733 @staticmethod
734 def plot_gen(metadata, df1):
735 figures = []
736 axes = []
737 for col in df1.columns:
738 fig, ax = plt.subplots(figsize=(12, 6))
739 ax.plot(df1.index, df1[col], label=col)
740 unit = metadata.get(col, {}).get('unit', '')
741 ylabel = f"{col} [{unit}]" if unit else col
742 ax.set_ylabel(ylabel)
743 ax.set_xlabel('Time')
744 ax.set_title('Plot of Interpolated Data')
745 ax.legend()
746 ax.grid(True)
747 figures.append(fig)
748 axes.append(ax)
749 return figures, axes
751 @staticmethod
752 def plot_gen_get_back(metadata, df1):
753 figures = []
754 axes = []
755 for col in df1.columns:
756 fig, ax = plt.subplots(figsize=(12, 6))
757 ax.plot(df1[col].dropna(), label=col)
758 unit = metadata[col].dropna().get('unit', '')
759 ylabel = f"{col} [{unit}]" if unit else col
760 ax.set_ylabel(ylabel)
761 ax.set_xlabel('Time')
762 ax.set_title('Plot of Interpolated Data')
763 ax.legend()
764 ax.grid(True)
765 figures.append(fig)
766 axes.append(ax)
767 return figures, axes
770def create_hklog(directory_path, output_metadata_file='hk_log.json', encoding='ISO8859-1'):
771 import json
773 metadata = {}
774 files = glob.glob(os.path.join(directory_path, '**/*'), recursive=True)
775 combined_files = []
776 dump = [] # Initialize a list to store the results
777 one_time_check = 0
778 for file in files:
779 if not file.endswith('/') and 'gondola' in file and file.endswith('.txt'):
780 # print('making gondola')
781 one_time_check += 1
782 if one_time_check == 1:
783 combined_files.append(file)
784 # print(file)
785 else:
786 dump.append(file) # Add to combined list
787 elif not file.endswith('/') and file.endswith('.csv') and 'gondola' not in file:
789 combined_files.append(file)
790 # print(combined_files)
791 for filename in combined_files:
792 try:
793 columns = HK.get_keys(filename, path=directory_path).columns.tolist()
794 metadata[filename] = columns
795 # print(filename)
796 except PermissionError:
797 log.error(f"Permission denied while accesing: {filename}")
798 continue
800 # Save metadata to a JSON file for future use
801 with open(os.path.join(directory_path, output_metadata_file), 'w') as f:
802 json.dump(metadata, f)
805def generate_plotpdf(
806 key,
807 ts,
808 base_path='/data/sunrise/2024/SUSI/hk',
809 method='linear',
810 order=1,
811 specific=None,
812 time_marker=None,
813 opath=None,
814 translate_map=True,
815):
817 metadata, df1 = HK(key, ts, base_path).combine_interpolate(
818 method=method, order=order, specific=specific, translate_map=translate_map
819 )
820 if opath is not None:
821 os.makedirs(opath, exist_ok=True)
822 HK.plot_genpdf(metadata, df1, time_marker=time_marker, name=os.path.join(opath, 'hk_plot.pdf'))
823 metadata.to_csv(os.path.join(opath, 'hk_metadata.csv'))
824 df1.to_csv(os.path.join(opath, 'hk_data.csv'))