Coverage for src/susi/reduc/pipeline/post_processor.py: 100%
35 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the pre-processing part of the SUSI pipeline.
5Is responsible to provide the input for the following blocks
7@author: hoelken
8"""
9from __future__ import annotations
10from datetime import datetime, timezone
12from . import Block
13from ...db import FileDB
14from ...io import FitsBatch, Fits
15from ...base import Logging, Config
16from ...utils import Git
18from ...base.header_keys import *
20log = Logging.get_logger()
23class PostProcessor:
24 """
25 ## PostProcessor
26 The PostProcessor is the final part of the pipeline where parallel operations will merge again.
28 <pre>
29 ╭──────────────╮ ╭───────────────╮ ╭───────────────╮
30 │ PreProcessor │──>│ Processor │──>│ PostProcessor │
31 ╰──────────────╯ ╰───┬───────┬───╯ ╰───────────────╯
32 ├ Job 1 ┤
33 ├ ..... ┤
34 ╰ Job N ╯
35 </pre>
37 The post-processor will receive the output from the last block.
38 It will create the applicable header and write the file(s) to the
39 configured output location.
40 """
42 def __init__(self, batch: FitsBatch, config: Config, blocks: list):
43 self.batch = batch
44 self.blocks = blocks
45 self.config = config
46 self.callbacks = []
47 self.result = FitsBatch()
48 self.db = FileDB(config)
49 self.success = False
50 self.base_out = ''
52 def run(self) -> PostProcessor:
53 self.base_out = self.db.base_out_path(self.blocks)
54 for entry in self.batch:
55 self.result.batch.append(self._process_entry(entry))
56 self.success = self.result.write_to_disk(overwrite=True)
57 return self
59 def _process_entry(self, fits: Fits):
60 return {
61 'file': self.db.file_path(self.base_out, fits),
62 'data': fits.data,
63 'header': self._generic_header(fits),
64 }
66 def _generic_header(self, fits: Fits):
67 Fits.override_header(fits.header, Block.BLOCKS_APPLIED, '|', append=True)
68 Fits.override_header(fits.header, PROCESSOR_NAME, 'SUSI.Pipeline')
69 Fits.override_header(fits.header, PROCESSOR_VERS, Git.current_sha())
70 Fits.override_header(fits.header, PROCESSING_TIME, datetime.now(timezone.utc).isoformat())
71 Fits.override_header(fits.header, PROCESSING_PIPELINE, self.config.base.pipeline)
72 return fits.header