Coverage for src/susi/analyse/slit_mask_border.py: 47%
229 statements
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-11 10:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2025-08-11 10:03 +0000
1import numpy as np
2from scipy.interpolate import interp1d
3from scipy.optimize import least_squares
4import matplotlib.pyplot as plt
5from scipy.signal import savgol_filter
6from scipy.optimize import curve_fit
7from scipy import stats
8from skimage.feature.tests.test_orb import img
9from ... import susi
10from ..utils import MP
13log = susi.Logging.get_logger()
16class GetSlitMaskBorder_v2:
17 """
18 Fits a line to the border of the slit mask seen in SP images.
19 Returns the line coefficients (runs in parallel for images and optionally for columns)
21 # author: iglesias@mps.mpg.de, lagg@mps.mpg.de
23 The method is based on the following steps:
24 - Step 1: coarse determination of the edge based on the derivative of the column-averaged,
25 heavily smoothed step function.
26 - Step 2: subpixel accurate measurement of the edge in each column by fitting a Gaussian
27 to the derivative (take only +-Nc pixels around te step)
28 - Step 3: Fit a linear function to the location of the Gaussian.
29 To avoid outliers, only the points +-0.5*sigma are taken into account.
30 If not enough points are fulfilling this criterion, the sigma level is
31 successively increased in steps of 0.25
32 """
34 N = 40 # [px] max number of pixels to consider around the column-avg slit mask edge
35 NYOFF = 20 # [px] offset in y direction to remove edge effects
36 STEP = 300 # number of columns processed by each worker
37 COL_FILTER = 30 # Columns that are not at least COL_FILTER [DN] brigther than the mean slit mask level are dropped
38 GAUSS_WIDTH = 20 # [px] max number of pixels to consider around the slit mask edge of each column must be < N
40 def __init__(self, roi=None, debug=False, xbin=1):
41 #: ROI to analyze
42 self.roi = roi
43 #: debug flag
44 self.debug = debug
45 #: result dictionary
46 self.result = None
47 # set to compute columns chunks in paralell
48 self.col_paralell: bool = False
49 #: number of workers to use for columns, if none ncol/STEP is used
50 self.col_workers = 10
51 #: number of workers to use for images
52 self.img_workers = 20
53 #: number of columns to bin before the analysis. 1 means no binning
54 self.bin = xbin
56 def run_img(self, img, parallel=True):
57 """
58 for single 2d array, runs col in parallel
59 """
60 self.col_paralell = parallel
61 img = self._bin_data(img)
62 img_args = self.__gen_img_arguments(img[None, :, :])
63 _, self.result = GetSlitMaskBorder_v2.process_image(img_args[0])
64 return self.result
66 def run_cube(self, cube, clip=None, norm=True):
67 """
68 for 3d array [n,x,y], runs img in parallel
70 """
71 log.info('Starting slit mask border analysis for cube of shape {}'.format(cube.shape))
72 if norm:
73 log.info('Normalizing cube images to its mean...')
74 global_mean = np.nanmean(cube)
75 cube /= np.nanmean(cube, axis=(1, 2), keepdims=True)
76 if clip is not None:
77 log.info('Clipping cube to {}...'.format(clip))
78 cube = np.clip(cube, clip[0], clip[1])
79 log.info('Binning cube data by {}...'.format(self.bin))
80 cube = self._bin_data(cube) * global_mean
81 img_args = self.__gen_img_arguments(cube)
82 log.info(f'Analysing with {self.img_workers} workers...')
83 result = MP.simultaneous(GetSlitMaskBorder_v2.process_image, img_args, workers=self.img_workers)
84 log.info('Analysis done, collecting results...')
85 self.result = []
86 result = sorted(result, key=lambda x: x[0])
87 for idx, coeffs in result:
88 self.result.append(coeffs)
89 return self.result
91 def _bin_data(self, data):
92 if data.ndim == 2:
93 return susi.utils.collections.Collections.bin(data, [1, self.bin])
94 else:
95 return susi.utils.collections.Collections.bin(data, [1, 1, self.bin])
97 def __gen_img_arguments(self, cube):
98 img_args = [
99 {
100 'img_idx': i,
101 'img': cube[i],
102 'roi': self.roi,
103 'debug': self.debug,
104 'col_workers': self.col_workers,
105 'col_paralell': self.col_paralell,
106 }
107 for i in range(cube.shape[0])
108 ]
109 return img_args
111 @staticmethod
112 def process_image(args):
113 img, roi, debug, col_workers, col_paralell = (
114 args['img'],
115 args['roi'],
116 args['debug'],
117 args['col_workers'],
118 args['col_paralell'],
119 )
120 if roi is None:
121 roi = [slice(0, img.shape[0]), slice(0, img.shape[1])]
123 nx, ny = img.shape[1], img.shape[0]
124 # detect the "step" from dark area to bright area by summing up all columns along x direction:
125 img_col = np.sum(img, axis=1)
126 img_col_sg = savgol_filter(img_col, 15, 1)
127 img_coldy = np.diff(img_col_sg)
128 dy_max = np.float64(np.argmax(img_coldy))
129 if debug:
130 plt.clf()
131 plt.plot(img_col)
132 plt.plot(img_col_sg)
133 plt.axvline(x=dy_max)
134 plt.show()
135 pass
137 # store indices +-N pixels around edge:
138 iloc = [
139 int(max([dy_max - GetSlitMaskBorder_v2.N, GetSlitMaskBorder_v2.NYOFF])),
140 int(min([dy_max + GetSlitMaskBorder_v2.N, ny])),
141 ]
142 loc_max = dy_max
143 # go through every column in parallel and find the maximum of gaussian fit
144 dy_max = GetSlitMaskBorder_v2.process_all_columns(img, iloc, loc_max, col_paralell, debug, col_workers)
145 if len(dy_max) == 0:
146 log.warning(f"Error during slit mask border detection of image {args['img_idx']}, returning nan")
147 return args['img_idx'], [np.nan, np.nan]
148 dy_max += iloc[0] + roi[0].start
149 # fit a line to dy_max, remove +-x sigma outliers
150 sigma = 0.5
151 ok = False
152 while not ok:
153 dy_stats = stats.describe(dy_max)
154 outlier = np.abs(dy_stats.mean - dy_max) > sigma * dy_stats.variance**0.5
155 # there need to be a sufficient number of valid pixels left, middle and right:
156 nxmin = nx / 3 * 0.80 # 20% of the section shall be valid
157 left = np.sum(outlier[: int(nx / 3)])
158 middle = np.sum(outlier[int(nx / 3) : int(2 * nx / 3)])
159 right = np.sum(outlier[int(2 * nx / 3) :])
160 if left <= nxmin and middle <= nxmin and right <= nxmin:
161 ok = True
162 continue
163 else:
164 sigma += 0.25
165 if sigma >= 3:
166 log.info("Boundary not found during slit mask border analysis")
167 continue
168 dy_max[outlier] = np.nan
169 # fit a linear function to dy_max:
170 xvec = np.arange(len(dy_max)) + roi[1].start
171 idx = np.isfinite(dy_max)
172 coeffs = np.polyfit(xvec[idx], dy_max[idx], 1)
173 ffit = np.polyval(coeffs, xvec)
174 if debug:
175 plt.clf()
176 plt.imshow(img, extent=(roi[1].start, roi[1].stop, roi[0].stop, roi[0].start))
177 plt.plot(xvec, ffit, c='orange', lw=2.0)
178 plt.plot(xvec, dy_max, c='r', lw=0.5, alpha=0.5)
179 plt.title(
180 "coeffs={:.3e},{:.2e}, angle={:.3e} deg".format(
181 coeffs[0], coeffs[1], np.arctan(coeffs[0]) * 180 / np.pi
182 )
183 )
184 plt.show()
185 plt.pause(0.1)
186 return args['img_idx'], coeffs
188 def process_all_columns(img, iloc, loc_max, col_paralell=True, debug=False, col_workers=None):
189 # Drop all columns that are not at least COL_FILTER% brigther than the mean slit mask level
190 slit_mask_level = np.mean(img[0 : int(loc_max - GetSlitMaskBorder_v2.N), :])
191 col_mean = np.mean(img[int(loc_max + GetSlitMaskBorder_v2.N) :, :], axis=0)
192 col_mask = col_mean > (slit_mask_level + GetSlitMaskBorder_v2.COL_FILTER)
193 filt_img = img[:, col_mask]
194 if filt_img.shape[1] < 50:
195 log.warning("Not enough columns for slit mask border detection, image too dark ?")
196 return []
197 dy_max = np.zeros(filt_img.shape[1])
198 if col_paralell:
199 args = GetSlitMaskBorder_v2.gen_col_arguments(filt_img, iloc, loc_max, debug)
201 if col_workers is None:
202 col_workers = filt_img.shape[1] // GetSlitMaskBorder_v2.STEP
204 result = MP.simultaneous(GetSlitMaskBorder_v2.columns_shift, args, workers=col_workers)
205 result = [item for sublist in result for item in sublist]
206 for idx, dy in result:
207 dy_max[idx] = dy
209 else:
210 for j in range(filt_img.shape[1])[:: GetSlitMaskBorder_v2.STEP]:
211 args = {
212 'idx': j,
213 'idx1': j + GetSlitMaskBorder_v2.STEP,
214 'img_colj': filt_img[:, j : j + GetSlitMaskBorder_v2.STEP],
215 'iloc': iloc,
216 'loc_max': loc_max,
217 'debug': debug,
218 }
219 result = GetSlitMaskBorder_v2.columns_shift(args)
220 result = [item[1] for item in result]
221 dy_max[j : j + GetSlitMaskBorder_v2.STEP] = result
222 return dy_max
224 def gen_col_arguments(img, iloc, loc_max, debug) -> list:
225 args = []
226 for j in range(img.shape[1])[:: GetSlitMaskBorder_v2.STEP]:
227 args.append(
228 {
229 'idx': j,
230 'idx1': j + GetSlitMaskBorder_v2.STEP,
231 'img_colj': img[:, j : j + GetSlitMaskBorder_v2.STEP],
232 'iloc': iloc,
233 'loc_max': loc_max,
234 'debug': debug,
235 }
236 )
237 return args
239 def columns_shift(args: dict):
240 """
241 Compute the shift of the slit mask edge in a set of columns
242 """
243 results = []
244 for j in range(0, args['img_colj'].shape[1]):
245 # profile func to fit
246 def gauss(x, a, x0, sigma):
247 return a * np.exp(-((x - x0) ** 2) / (2 * sigma**2))
249 # compute derivative from slightly smoothed img column:
250 col_sg = savgol_filter(args['img_colj'][:, j], 9, 1)
251 col_dy = np.diff(col_sg)[args['iloc'][0] : args['iloc'][1]]
252 # locate the maximum of the derivative fit to col_dy +-Nc pixels around args['loc_max']:
253 Nc = GetSlitMaskBorder_v2.GAUSS_WIDTH
254 x = np.int32(np.arange(-Nc, Nc + 1) + args['loc_max']) - args['iloc'][0]
255 col_cut = col_dy[x[0] : x[-1] + 1]
256 try:
257 popt, pcov = curve_fit(gauss, x, col_cut, p0=[max(col_cut), args['loc_max'] - args['iloc'][0], 5.0])
258 dy_max = popt[1]
259 results.append([args['idx'] + j, dy_max])
260 except Exception as e:
261 # log.debug("Gauss fit did not converge for column %d" % (args['idx'] + j))
262 # if args['debug']:
263 # plt.clf()
264 # plt.plot(x, col_cut)
265 # plt.show()
266 # plt.pause(0.2)
267 # plt.close()
268 results.append([args['idx'] + j, np.nan])
269 return results
272class GetSlitMaskBorder:
273 """
274 Fits a line to the border of the slit mask (level based) seen in SP images.
275 Returns the line angle and its intercept wrt the reference profile.
276 0 deg is considered horizontal (row) direction to the right
277 90 deg is vertical (column) direction to the top
279 # author: iglesias@mps.mpg.de
280 """
282 # numebr of x positions to sample the edge rotation
283 NXLOC = 100
284 # Oversample factor for the edge detection
285 OVERSAMPLE = 30
286 # Max shit [px] allowed (which defines the max angle you can find)
287 MAX_YSHIFT = 30
289 def __init__(self, img: np.array, ref_profile=None, clipv=None):
290 #: input image. Must be cropped to show mostly the slit mask border
291 self.orig = img
292 #: 1D reference profile (same size as img.shape[0]) used to compute column shifts.
293 #: The row coordinate of the half max of the reference profile
294 #: defines the y=0 of the returned line intercept
295 #: If None then is obtained from the column with largest mean value
296 #: Not useful to get an absolute shift but OK to get the angle.
297 self.ref_profile = ref_profile
298 #: The detected rotation angle in degree
299 self.angle: float = 0.0
300 #: the detected offset of the slit edge wrt to half max of the reference
301 self.intercept: float = 0
302 #: if not none, then the normalized reference profile and the image is saturated at [clipv[0],clipv[1]]
303 self.clipv = clipv
305 def run(self, return_fits=False, return_slope=False):
306 """
307 Fits a line to the border of the slit mask (level based), reurns the line angle and its intercept.
308 :param return_fits: if True, returns the fit x,y location in addition of the line parameters
309 :param return_slope: if True, returns the slope instead of the angle (in degrees)
310 """
311 xcuts = self._get_columns_to_analyze()
312 yloc, xcuts = self._get_edge_yloc(xcuts)
313 self.angle, self.intercept = self._get_line(xcuts, yloc, return_slope=return_slope)
314 if return_fits:
315 return self, yloc, xcuts
316 else:
317 return self
319 def _get_line(self, xcuts, yloc, return_slope=False):
320 # fit a line to the edge points and return slope (or angle) and intercept
321 fit = np.polyfit(xcuts, yloc, 1)
322 if return_slope:
323 return fit[0], fit[1]
324 else:
325 return np.degrees(np.arctan(fit[0])), fit[1]
327 def _get_edge_yloc(self, xcuts):
328 # get the location of the slit mask edge using as reference
329 yloc = []
330 cut_len = self.orig.shape[0]
331 xcol = np.arange(cut_len)
332 xinterp = np.linspace(0, cut_len - 1, cut_len * self.OVERSAMPLE)
333 if self.ref_profile is None:
334 ref_xloc = np.mean(self.orig[:, xcuts], axis=0).argmax()
335 ref_prof = self.orig[:, ref_xloc]
336 log.debug(f"Reference profile column: {ref_xloc}")
337 else:
338 ref_prof = self.ref_profile
339 ref_prof = interp1d(xcol, ref_prof, kind="linear")(xinterp)
340 ref_prof /= np.mean(ref_prof)
341 if self.clipv is not None:
342 ref_prof = np.clip(ref_prof, self.clipv[0], self.clipv[1])
343 for xcut in xcuts:
344 col = self.orig[:, xcut]
345 col = interp1d(xcol, col, kind="linear")(xinterp)
346 col /= np.mean(col)
347 if self.clipv is not None:
348 col = np.clip(col, self.clipv[0], self.clipv[1])
349 loc = self._compute_shift(ref_prof, col)
350 yloc.append(loc)
351 # filter outliers
352 yloc = np.array(yloc)
353 ind = np.where(np.abs(yloc - np.median(yloc)) < 3 * np.std(yloc))
354 # line passes for the half max of the ref profile
355 half_max_level = (np.percentile(ref_prof, 90) - np.percentile(ref_prof, 10)) / 2
356 half_max_loc = np.min(np.where(ref_prof > half_max_level))
357 return (yloc[ind] + half_max_loc) / self.OVERSAMPLE, xcuts[ind]
359 def _get_columns_to_analyze(self):
360 # Divide image dim 1 in NXLOC and get for each cut the column with the highest mean value
361 xcuts = []
362 for i in range(self.NXLOC):
363 xcut = int(i * self.orig.shape[1] / self.NXLOC)
364 col = np.mean(self.orig[:, xcut : xcut + self.orig.shape[1] // self.NXLOC], axis=0)
365 col = np.argmax(col)
366 xcuts.append(col + xcut)
367 return np.array(xcuts)
369 def _apply_subpixel_shift(self, signal, shift):
370 original_indices = np.arange(len(signal))
371 shifted_indices = original_indices - shift
372 interpolator = interp1d(original_indices, signal, kind="cubic", fill_value="extrapolate")
373 shifted_signal = interpolator(shifted_indices)
375 return shifted_signal
377 def _diff_err(self, x, signal1, signal2):
378 diff = signal1 - self._apply_subpixel_shift(signal2, x[0])
379 int_x0 = int(np.abs(np.floor(x[0])))
380 err = np.sum(diff[int_x0 + 1 : -int_x0 - 1] ** 2)
381 return err
383 def _compute_shift(self, signal1, signal2, min_disp=-MAX_YSHIFT * OVERSAMPLE, max_disp=MAX_YSHIFT * OVERSAMPLE):
384 return self._compute_shift_fit(signal1, signal2, min_disp=min_disp, max_disp=max_disp)
386 def _compute_shift_fit(self, signal1, signal2, min_disp=0, max_disp=0):
387 res = least_squares(self._diff_err, [0], args=(signal1, signal2), bounds=([min_disp], [max_disp]))
388 return res.x[0]