Coverage for src/susi/reduc/pipeline/post_processor.py: 100%

35 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the pre-processing part of the SUSI pipeline. 

5Is responsible to provide the input for the following blocks 

6 

7@author: hoelken 

8""" 

9from __future__ import annotations 

10from datetime import datetime, timezone 

11 

12from . import Block 

13from ...db import FileDB 

14from ...io import FitsBatch, Fits 

15from ...base import Logging, Config 

16from ...utils import Git 

17 

18from ...base.header_keys import * 

19 

20log = Logging.get_logger() 

21 

22 

23class PostProcessor: 

24 """ 

25 ## PostProcessor 

26 The PostProcessor is the final part of the pipeline where parallel operations will merge again. 

27 

28 <pre> 

29 ╭──────────────╮ ╭───────────────╮ ╭───────────────╮ 

30 │ PreProcessor │──>│ Processor │──>│ PostProcessor │ 

31 ╰──────────────╯ ╰───┬───────┬───╯ ╰───────────────╯ 

32 ├ Job 1 ┤ 

33 ├ ..... ┤ 

34 ╰ Job N ╯ 

35 </pre> 

36 

37 The post-processor will receive the output from the last block. 

38 It will create the applicable header and write the file(s) to the 

39 configured output location. 

40 """ 

41 

42 def __init__(self, batch: FitsBatch, config: Config, blocks: list): 

43 self.batch = batch 

44 self.blocks = blocks 

45 self.config = config 

46 self.callbacks = [] 

47 self.result = FitsBatch() 

48 self.db = FileDB(config) 

49 self.success = False 

50 self.base_out = '' 

51 

52 def run(self) -> PostProcessor: 

53 self.base_out = self.db.base_out_path(self.blocks) 

54 for entry in self.batch: 

55 self.result.batch.append(self._process_entry(entry)) 

56 self.success = self.result.write_to_disk(overwrite=True) 

57 return self 

58 

59 def _process_entry(self, fits: Fits): 

60 return { 

61 'file': self.db.file_path(self.base_out, fits), 

62 'data': fits.data, 

63 'header': self._generic_header(fits), 

64 } 

65 

66 def _generic_header(self, fits: Fits): 

67 Fits.override_header(fits.header, Block.BLOCKS_APPLIED, '|', append=True) 

68 Fits.override_header(fits.header, PROCESSOR_NAME, 'SUSI.Pipeline') 

69 Fits.override_header(fits.header, PROCESSOR_VERS, Git.current_sha()) 

70 Fits.override_header(fits.header, PROCESSING_TIME, datetime.now(timezone.utc).isoformat()) 

71 Fits.override_header(fits.header, PROCESSING_PIPELINE, self.config.base.pipeline) 

72 return fits.header