Coverage for src/susi/reduc/pipeline/processor.py: 86%
14 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-06-13 14:15 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Module that holds the processing part of the SUSI pipeline.
6@author: hoelken
7"""
9from ...base import Logging, IllegalArgumentException
10from ...io import FitsBatch
12from .processing_data import ProcessingData
14log = Logging.get_logger()
17class Processor:
18 """
19 ## Processor
20 The Processor is the central part of the pipeline where parallel operations will be executed.
22 The processor will execute the callbacks of all blocks and provide them with the
23 Input needed. All the actual action is to be performed in the callbacks (i.e. the blocks)
25 <pre>
26 ╭──────────────╮ ╭─────────────╮ ╭───────────────╮
27 │ PreProcessor │──>│ Processor │──>│ PostProcessor │
28 ╰──────────────╯ ╰─┬─────────┬─╯ ╰───────────────╯
29 ├ Block 1 ┤
30 ├ ....... ┤
31 ╰ Block N ╯
32 </pre>
34 After all blocks are processed the processor will return.
35 """
37 @staticmethod
38 def run(batch: FitsBatch, callbacks: list, proc_data: ProcessingData) -> FitsBatch:
39 """
40 Start the execution of the chain of blocks
42 ## Params
43 - chunks: The chunks to process should be given as {id: batch} dict,
44 where the id is a sortable unit (i.e. integers)
45 - blocks: The blocks to execute must be a list off block callbacks that
46 have the block signature "callback(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch"
47 - proc_data: The relevant processing data.
49 ## Returns
50 The result of all jobs in a dict with the same keys as the input chunk.
51 """
52 for block in callbacks:
53 if not callable(block):
54 log.critical('`callbacks` argument must be a list of callbacks. Got %s', type(block))
55 raise IllegalArgumentException('`callbacks` argument must be a list of callbacks.')
57 batch = block(batch, proc_data)
58 return batch