Coverage for src/susi/reduc/pipeline/processor.py: 86%

14 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Module that holds the processing part of the SUSI pipeline. 

5 

6@author: hoelken 

7""" 

8 

9from ...base import Logging, IllegalArgumentException 

10from ...io import FitsBatch 

11 

12from .processing_data import ProcessingData 

13 

14log = Logging.get_logger() 

15 

16 

17class Processor: 

18 """ 

19 ## Processor 

20 The Processor is the central part of the pipeline where parallel operations will be executed. 

21 

22 The processor will execute the callbacks of all blocks and provide them with the 

23 Input needed. All the actual action is to be performed in the callbacks (i.e. the blocks) 

24 

25 <pre> 

26 ╭──────────────╮ ╭─────────────╮ ╭───────────────╮ 

27 │ PreProcessor │──>│ Processor │──>│ PostProcessor │ 

28 ╰──────────────╯ ╰─┬─────────┬─╯ ╰───────────────╯ 

29 ├ Block 1 ┤ 

30 ├ ....... ┤ 

31 ╰ Block N ╯ 

32 </pre> 

33 

34 After all blocks are processed the processor will return. 

35 """ 

36 

37 @staticmethod 

38 def run(batch: FitsBatch, callbacks: list, proc_data: ProcessingData) -> FitsBatch: 

39 """ 

40 Start the execution of the chain of blocks 

41 

42 ## Params 

43 - chunks: The chunks to process should be given as {id: batch} dict, 

44 where the id is a sortable unit (i.e. integers) 

45 - blocks: The blocks to execute must be a list off block callbacks that 

46 have the block signature "callback(batch: FitsBatch, proc_data: ProcessingData) -> FitsBatch" 

47 - proc_data: The relevant processing data. 

48 

49 ## Returns 

50 The result of all jobs in a dict with the same keys as the input chunk. 

51 """ 

52 for block in callbacks: 

53 if not callable(block): 

54 log.critical('`callbacks` argument must be a list of callbacks. Got %s', type(block)) 

55 raise IllegalArgumentException('`callbacks` argument must be a list of callbacks.') 

56 

57 batch = block(batch, proc_data) 

58 return batch