Coverage for src/susi/base/api.py: 74%
73 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Provides methods to query the observation log API
6@author: hoelken
7"""
8import json
9import os
10from urllib import request, error, parse
11from contextlib import contextmanager
12from datetime import timezone
14from dateutil import parser
16from .config import Config
17from .logging import Logging
19log = Logging.get_logger()
22class Api:
23 """
24 ### Api
25 Class to interface the observation log API.
26 """
27 SUSI_API_USER = 'SUSI_API_USER'
28 SUSI_API_TOKEN = 'SUSI_API_TOKEN'
29 USER_KEY = 'user'
30 TOKEN_KEY = 'token'
32 def __init__(self, config: Config):
33 self.url = config.base.obslog_url
34 self._conf = config
35 self._usr = os.environ.get(self.SUSI_API_USER)
36 self._tkn = os.environ.get(self.SUSI_API_TOKEN)
38 def get_times(self, idx: int) -> tuple:
39 """
40 Returns the start and end time as datetime objects
41 of an observation as of the observation log.
42 """
43 with self.errorhandler(idx):
44 res = json.loads(request.urlopen(self._url_observation(idx)).read())
45 t0 = parser.parse(res['start']).replace(tzinfo=timezone.utc)
46 t1 = parser.parse(res['stop']).replace(tzinfo=timezone.utc)
47 return t0, t1
49 def mark_raw_available(self, oid: int) -> None:
50 if not self.can_publish():
51 return
53 with self.errorhandler(oid):
54 req = request.Request(self._url_raw_avail(oid), method="POST")
55 req = self._add_api_header_to(req)
56 res = request.urlopen(req)
57 log.info(json.loads(res.read())['msg'])
59 def add_pipeline_run(self, oid: int, level: int, product: str = '') -> None:
60 if not self.can_publish():
61 return
63 data = {
64 'level': level,
65 'cam': self._conf.cam.name,
66 'pipeline': self._conf.base.pipeline,
67 'product': product
68 }
69 with self.errorhandler(oid):
70 req = request.Request(self._url_pipeline_run(oid), data=parse.urlencode(data).encode())
71 req = self._add_api_header_to(req)
72 res = request.urlopen(req)
73 log.info(json.loads(res.read())['msg'])
75 def _add_api_header_to(self, req: request.Request) -> request.Request:
76 req.add_header(self.USER_KEY, self._usr)
77 req.add_header(self.TOKEN_KEY, self._tkn)
78 return req
80 @staticmethod
81 def encode(data: dict) -> bytes:
82 body = ",".join([f'{k}:"{v}"' for k, v in data.items()])
83 return body.encode('utf-8')
85 def can_publish(self) -> bool:
86 if not self._conf.base.publish_results:
87 return False
89 if self._usr and self._tkn:
90 return True
92 log.error("API credentials not set. Skip update request.")
93 log.info("Ensure %s and %s are set before you access the API", self.SUSI_API_TOKEN, self.SUSI_API_USER)
94 return False
96 def _url_raw_avail(self, oid: int) -> str:
97 return f"{self.url}/api/observation/{oid}/raw_available"
99 def _url_observation(self, oid: int) -> str:
100 return f"{self.url}/api/observation/{oid}"
102 def _url_pipeline_run(self, oid: int) -> str:
103 return f"{self.url}/api/pipeline/{oid}"
105 @contextmanager
106 def errorhandler(self, oid: int):
107 try:
108 yield
109 except error.HTTPError as e:
110 log.error('API Error for id=%s: %s \n%s', oid, e, e.read().decode())
111 except Exception as e:
112 log.error('Could not connect to API on %s for id=%s: %s', self.url, oid, e)