Coverage for src/susi/base/api.py: 74%

73 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Provides methods to query the observation log API 

5 

6@author: hoelken 

7""" 

8import json 

9import os 

10from urllib import request, error, parse 

11from contextlib import contextmanager 

12from datetime import timezone 

13 

14from dateutil import parser 

15 

16from .config import Config 

17from .logging import Logging 

18 

19log = Logging.get_logger() 

20 

21 

22class Api: 

23 """ 

24 ### Api 

25 Class to interface the observation log API. 

26 """ 

27 SUSI_API_USER = 'SUSI_API_USER' 

28 SUSI_API_TOKEN = 'SUSI_API_TOKEN' 

29 USER_KEY = 'user' 

30 TOKEN_KEY = 'token' 

31 

32 def __init__(self, config: Config): 

33 self.url = config.base.obslog_url 

34 self._conf = config 

35 self._usr = os.environ.get(self.SUSI_API_USER) 

36 self._tkn = os.environ.get(self.SUSI_API_TOKEN) 

37 

38 def get_times(self, idx: int) -> tuple: 

39 """ 

40 Returns the start and end time as datetime objects 

41 of an observation as of the observation log. 

42 """ 

43 with self.errorhandler(idx): 

44 res = json.loads(request.urlopen(self._url_observation(idx)).read()) 

45 t0 = parser.parse(res['start']).replace(tzinfo=timezone.utc) 

46 t1 = parser.parse(res['stop']).replace(tzinfo=timezone.utc) 

47 return t0, t1 

48 

49 def mark_raw_available(self, oid: int) -> None: 

50 if not self.can_publish(): 

51 return 

52 

53 with self.errorhandler(oid): 

54 req = request.Request(self._url_raw_avail(oid), method="POST") 

55 req = self._add_api_header_to(req) 

56 res = request.urlopen(req) 

57 log.info(json.loads(res.read())['msg']) 

58 

59 def add_pipeline_run(self, oid: int, level: int, product: str = '') -> None: 

60 if not self.can_publish(): 

61 return 

62 

63 data = { 

64 'level': level, 

65 'cam': self._conf.cam.name, 

66 'pipeline': self._conf.base.pipeline, 

67 'product': product 

68 } 

69 with self.errorhandler(oid): 

70 req = request.Request(self._url_pipeline_run(oid), data=parse.urlencode(data).encode()) 

71 req = self._add_api_header_to(req) 

72 res = request.urlopen(req) 

73 log.info(json.loads(res.read())['msg']) 

74 

75 def _add_api_header_to(self, req: request.Request) -> request.Request: 

76 req.add_header(self.USER_KEY, self._usr) 

77 req.add_header(self.TOKEN_KEY, self._tkn) 

78 return req 

79 

80 @staticmethod 

81 def encode(data: dict) -> bytes: 

82 body = ",".join([f'{k}:"{v}"' for k, v in data.items()]) 

83 return body.encode('utf-8') 

84 

85 def can_publish(self) -> bool: 

86 if not self._conf.base.publish_results: 

87 return False 

88 

89 if self._usr and self._tkn: 

90 return True 

91 

92 log.error("API credentials not set. Skip update request.") 

93 log.info("Ensure %s and %s are set before you access the API", self.SUSI_API_TOKEN, self.SUSI_API_USER) 

94 return False 

95 

96 def _url_raw_avail(self, oid: int) -> str: 

97 return f"{self.url}/api/observation/{oid}/raw_available" 

98 

99 def _url_observation(self, oid: int) -> str: 

100 return f"{self.url}/api/observation/{oid}" 

101 

102 def _url_pipeline_run(self, oid: int) -> str: 

103 return f"{self.url}/api/pipeline/{oid}" 

104 

105 @contextmanager 

106 def errorhandler(self, oid: int): 

107 try: 

108 yield 

109 except error.HTTPError as e: 

110 log.error('API Error for id=%s: %s \n%s', oid, e, e.read().decode()) 

111 except Exception as e: 

112 log.error('Could not connect to API on %s for id=%s: %s', self.url, oid, e)