Coverage for src/susi/utils/slurm.py: 20%

41 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2025-06-13 14:15 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Utilities for slurm processing 

5 

6@author: iglesias 

7""" 

8 

9import os 

10from ..base import Logging 

11 

12log = Logging.get_logger() 

13 

14 

15class Slurm: 

16 """ 

17 Class to handle slurm job submission 

18 """ 

19 

20 def __init__(self, yaml): 

21 self.yaml = yaml['slurm'] # the susi config yaml file 

22 self.sbatch_file = None # the sbatch file to be created 

23 self.job_id = None # the job id of the submitted job 

24 

25 def save_sbatch_file(self, cmd, logdir): 

26 """ 

27 Creates a sbatch file in the logdir with the slurm configuration given in the yaml file 

28 :param cmd: slurm command to run 

29 """ 

30 self.logdir = logdir 

31 repo_path = os.path.abspath(__file__) 

32 repo_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(repo_path)))) 

33 # Create the sbatch file 

34 os.makedirs(self.logdir, exist_ok=True) 

35 self.sbatch_file = os.path.join(self.logdir, 'slurm.sbatch') 

36 if os.path.exists(self.sbatch_file): 

37 log.warning(f'{self.sbatch_file} already exists, overwriting it') 

38 with open(self.sbatch_file, 'w') as f: 

39 f.write('#!/bin/bash\n') 

40 # iterates in all the keys of the yaml file and creates a config line 

41 for key, value in self.yaml.items(): 

42 if isinstance(value, list): 

43 value = ' '.join(value) 

44 f.write(f'#SBATCH --{key}={value}\n') 

45 self.out_file = f'{self.logdir }/slurm.out' 

46 f.write(f'#SBATCH --output={self.out_file}\n') 

47 # cmd to cd to repo and activate pip enviroment 

48 f.write(f'cd {repo_path}\n') 

49 f.write(f'source venv/bin/activate\n') 

50 # Add the command to run the script 

51 f.write(f'{cmd}\n') 

52 log.info(f'Slurm sbatch file created: {self.sbatch_file}') 

53 

54 def submit_job(self, do_exit=False): 

55 """ 

56 Submits the sbatch file to slurm 

57 :return: job id 

58 :param exit: if True, exit after submitting the job, else return the job id 

59 """ 

60 if self.sbatch_file is None: 

61 raise ValueError('Sbatch file not created, please run save_sbatch_file() first') 

62 # Submit the sbatch file to slurm 

63 log.info(f'Submitting slurm job: {self.sbatch_file}') 

64 self.job_id = os.popen(f'sbatch {self.sbatch_file}').read().strip() 

65 self.job_id = self.job_id.split()[-1] 

66 log.info(f'Slurm job submitted with ID: {self.job_id}. Check status with sacct') 

67 log.info(f'The slurm output goes to: tail -f {self.out_file}') 

68 # Extract the job id from the output 

69 

70 if do_exit: 

71 log.info(f'Exiting') 

72 exit(0) 

73 return self.job_id