Source code for xarpes.mdcs

# Copyright (C) 2025 xARPES Developers
# This program is free software under the terms of the GNU GPLv3 license.

# get_ax_fig_plt and add_fig_kwargs originate from pymatgen/util/plotting.py.
# Copyright (C) 2011-2024 Shyue Ping Ong and the pymatgen Development Team
# Pymatgen is released under the MIT License.

# See also abipy/tools/plotting.py.
# Copyright (C) 2021 Matteo Giantomassi and the AbiPy Group
# AbiPy is free software under the terms of the GNU GPLv2 license.

"""File containing the MDCs class."""

import numpy as np
from .plotting import get_ax_fig_plt, add_fig_kwargs
from .functions import extend_function
from .constants import KILO

[docs] class MDCs: r""" Container for momentum distribution curves (MDCs) and their fits. This class stores the MDC intensity maps, angular and energy grids, and the aggregated fit results produced by :meth:`fit_selection`. Parameters ---------- intensities : ndarray MDC intensity data. Typically a 2D array with shape ``(n_energy, n_angle)`` or a 1D array for a single curve. angles : ndarray Angular grid corresponding to the MDCs [degrees]. angle_resolution : float Angular step size or effective angular resolution [degrees]. energy_resolution : float Energy resolution associated with the MDCs [eV]. temperature: float Temperature associated with the band map [K]. enel : ndarray or float Electron binding energies of the MDC slices [eV]. Can be a scalar for a single MDC. hnuminPhi : float Photon energy minus work function, used to convert ``enel`` to kinetic energy [eV]. Attributes ---------- intensities : ndarray MDC intensity data (same object as passed to the constructor). angles : ndarray Angular grid [degrees]. angle_resolution : float Angular step size or resolution [degrees]. enel : ndarray or float Electron binding energies [eV], as given at construction. ekin : ndarray or float Kinetic energies [eV], computed as ``enel + hnuminPhi``. hnuminPhi : float Photon energy minus work function [eV]. ekin_range : ndarray Kinetic-energy values of the slices that were actually fitted. Set by :meth:`fit_selection`. individual_properties : dict Nested mapping of fitted parameters and their uncertainties for each component and each energy slice. Populated by :meth:`fit_selection`. Notes ----- After calling :meth:`fit_selection`, :attr:`individual_properties` has the structure:: { label: { class_name: { 'label': label, '_class': class_name, param: [values per energy slice], param_sigma: [1σ per slice or None], ... } } } where ``param`` is typically one of ``'offset'``, ``'slope'``, ``'amplitude'``, ``'peak'``, ``'broadening'``, and ``param_sigma`` stores the corresponding uncertainty for each slice. """ def __init__(self, intensities, angles, angle_resolution, energy_resolution, temperature, enel, hnuminPhi): self._intensities = intensities self._angles = angles self._angle_resolution = angle_resolution self._energy_resolution = energy_resolution self._temperature = temperature self._enel = enel self._hnuminPhi = hnuminPhi # Derived attributes (populated by fit_selection) self._ekin_range = None self._individual_properties = None # combined values + sigmas # -------------------- Immutable physics inputs -------------------- @property def angles(self): """Angular axis for the MDCs.""" return self._angles @property def angle_resolution(self): """Angular resolution (float).""" return self._angle_resolution @angle_resolution.setter def angle_resolution(self, _): """Setter for the angle resolution. This raises an attribute error as the angle resolution needs to be derived from the band map.""" raise AttributeError("`angle_resolution` is read-only; set it via the " "constructor.") @property def energy_resolution(self): """Energy resolution (float).""" return self._energy_resolution @energy_resolution.setter def energy_resolution(self, _): """Setter for the energy resolution. This raises an attribute error as the energy resolution needs to be derived from the band map.""" raise AttributeError("`energy_resolution` is read-only; set it via the " "constructor.") @property def temperature(self): """Temperature (float).""" return self._temperature @temperature.setter def temperature(self, _): """Setter for the temperature. This raises an attribute error as the temperature needs to be derived from the band map.""" raise AttributeError("`temperature` is read-only; set it via the " "constructor.") @property def enel(self): """Photoelectron binding energies (array-like). Read-only.""" return self._enel @enel.setter def enel(self, _): raise AttributeError("`enel` is read-only; set it via the " \ "constructor.") @property def hnuminPhi(self): """Work-function/photon-energy offset. Read-only.""" return self._hnuminPhi @hnuminPhi.setter def hnuminPhi(self, _): raise AttributeError("`hnuminPhi` is read-only; set it via the constructor.") @property def ekin(self): """Kinetic energy array: enel + hnuminPhi (computed on the fly).""" return self._enel + self._hnuminPhi @ekin.setter def ekin(self, _): raise AttributeError("`ekin` is derived and read-only.") # -------------------- Data arrays -------------------- @property def intensities(self): """2D or 3D intensity map (energy × angle).""" return self._intensities def _validate_angle_resolution(self, caller_name): """Require a provided, non-negative angular resolution.""" if self.angle_resolution is None or self.angle_resolution < 0: raise ValueError( 'angle_resolution must be provided and >= 0 for ' f'{caller_name}.' ) @intensities.setter def intensities(self, x): self._intensities = x # -------------------- Results populated by fit_selection -------------------- @property def ekin_range(self): """Kinetic-energy slices that were fitted.""" if self._ekin_range is None: raise AttributeError("`ekin_range` not yet set. Run `.fit_selection()` first.") return self._ekin_range @property def individual_properties(self): """ Aggregated fitted parameter values and uncertainties per component. Returns ------- dict Nested mapping:: { label: { class_name: { 'label': label, '_class': class_name, <param>: [values per slice], <param>_sigma: [1σ per slice or None], ... } } } """ if self._individual_properties is None: raise AttributeError( "`individual_properties` not yet set. Run `.fit_selection()` first." ) return self._individual_properties
[docs] def energy_check(self, energy_value): r""" """ if np.isscalar(self.ekin): if energy_value is not None: raise ValueError("This dataset contains only one " \ "momentum-distribution curve; do not provide energy_value.") else: kinergy = self.ekin counts = self.intensities else: if energy_value is None: raise ValueError("This dataset contains multiple " \ "momentum-distribution curves. Please provide an energy_value " "for which to plot the MDCs.") else: energy_index = np.abs(self.enel - energy_value).argmin() kinergy = self.ekin[energy_index] counts = self.intensities[energy_index, :] if not (self.enel.min() <= energy_value <= self.enel.max()): raise ValueError( f"Selected energy_value={energy_value:.3f} " f"is outside the available energy range " f"({self.enel.min():.3f}{self.enel.max():.3f}) " "of the MDC collection." ) return counts, kinergy
[docs] def plot(self, energy_value=None, energy_range=None, ax=None, **kwargs): """ Interactive or static plot with optional slider and full wrapper support. Behavior consistent with Jupyter and CLI based on show / fig_close. """ import matplotlib.pyplot as plt from matplotlib.widgets import Slider import string import sys import warnings # Wrapper kwargs title = kwargs.pop("title", None) savefig = kwargs.pop("savefig", None) show = kwargs.pop("show", True) fig_close = kwargs.pop("fig_close", False) tight_layout = kwargs.pop("tight_layout", False) ax_grid = kwargs.pop("ax_grid", None) ax_annotate = kwargs.pop("ax_annotate", False) size_kwargs = kwargs.pop("size_kwargs", None) if energy_value is not None and energy_range is not None: raise ValueError( "Provide at most energy_value or energy_range, not both.") ax, fig, plt = get_ax_fig_plt(ax=ax) angles = self.angles energies = self.enel if np.isscalar(energies): if energy_value is not None or energy_range is not None: raise ValueError( "This dataset contains only one momentum-distribution " "curve; do not provide energy_value or energy_range." ) intensities = self.intensities ax.scatter(angles, intensities, label="Data") ax.set_title(f"Energy slice: {energies * KILO:.3f} meV") # --- y-only autoscale, preserve x --- x0, x1 = ax.get_xlim() # keep current x-range ax.relim(visible_only=True) # recompute data limits ax.autoscale_view(scalex=False, scaley=True) ax.set_xlim(x0, x1) # restore x (belt-and-suspenders) else: if (energy_value is not None) and (energy_range is not None): raise ValueError("Provide either energy_value or energy_range, not both.") emin, emax = energies.min(), energies.max() # ---- Single-slice path (no slider) ---- if energy_value is not None: if energy_value < emin or energy_value > emax: raise ValueError( f"Requested energy_value {energy_value:.3f} eV is " f"outside the available energy range " f"[{emin:.3f}, {emax:.3f}] eV." ) idx = int(np.abs(energies - energy_value).argmin()) intensities = self.intensities[idx] ax.scatter(angles, intensities, label="Data") ax.set_title(f"Energy slice: {energies[idx] * KILO:.3f} meV") # --- y-only autoscale, preserve x --- x0, x1 = ax.get_xlim() # keep current x-range ax.relim(visible_only=True) # recompute data limits ax.autoscale_view(scalex=False, scaley=True) ax.set_xlim(x0, x1) # restore x (belt-and-suspenders) # ---- Multi-slice path (slider) ---- else: if energy_range is not None: e_min, e_max = energy_range mask = (energies >= e_min) & (energies <= e_max) else: mask = np.ones_like(energies, dtype=bool) indices = np.where(mask)[0] if len(indices) == 0: raise ValueError("No energies found in the specified selection.") intensities = self.intensities[indices] fig.subplots_adjust(bottom=0.25) idx = 0 scatter = ax.scatter(angles, intensities[idx], label="Data") ax.set_title(f"Energy slice: " f"{energies[indices[idx]] * KILO:.3f} meV") # Suppress single-point slider warning (when len(indices) == 1) warnings.filterwarnings( "ignore", message="Attempting to set identical left == right", category=UserWarning ) slider_ax = fig.add_axes([0.2, 0.08, 0.6, 0.04]) slider = Slider( slider_ax, "Index", 0, len(indices) - 1, valinit=idx, valstep=1 ) def update(val): i = int(slider.val) yi = intensities[i] scatter.set_offsets(np.c_[angles, yi]) x0, x1 = ax.get_xlim() yv = np.asarray(yi, dtype=float).ravel() mask = np.isfinite(yv) if mask.any(): y_min = float(yv[mask].min()) y_max = float(yv[mask].max()) span = y_max - y_min frac = plt.rcParams['axes.ymargin'] if span <= 0 or not np.isfinite(span): scale = max(abs(y_max), 1.0) pad = frac * scale else: pad = frac * span ax.set_ylim(y_min - pad, y_max + pad) # Keep x unchanged ax.set_xlim(x0, x1) # Update title and redraw ax.set_title(f"Energy slice: " f"{energies[indices[i]] * KILO:.3f} meV") fig.canvas.draw_idle() slider.on_changed(update) self._slider = slider self._line = scatter ax.set_xlabel("Angle (°)") ax.set_ylabel("Counts (-)") ax.legend() self._fig = fig if size_kwargs: fig.set_size_inches(size_kwargs.pop("w"), size_kwargs.pop("h"), **size_kwargs) if title: fig.suptitle(title) if tight_layout: fig.tight_layout() if savefig: fig.savefig(savefig) if ax_grid is not None: for axis in fig.axes: axis.grid(bool(ax_grid)) if ax_annotate: tags = string.ascii_lowercase for i, axis in enumerate(fig.axes): axis.annotate(f"({tags[i]})", xy=(0.05, 0.95), xycoords="axes fraction") is_interactive = hasattr(sys, 'ps1') or 'ipykernel' in sys.modules is_cli = not is_interactive if show: if is_cli: plt.show() if fig_close: plt.close(fig) if not show and (fig_close or is_cli): return None return fig
[docs] @add_fig_kwargs def visualize_guess(self, distributions, energy_value=None, matrix_element=None, matrix_args=None, ax=None): r""" """ self._validate_angle_resolution('visualize_guess') counts, kinergy = self.energy_check(energy_value) ax, fig, plt = get_ax_fig_plt(ax=ax) ax.set_xlabel('Angle ($\\degree$)') ax.set_ylabel('Counts (-)') ax.set_title(f"Energy slice: " f"{(kinergy - self.hnuminPhi) * KILO:.3f} meV") ax.scatter(self.angles, counts, label='Data') final_result = self._merge_and_plot(ax=ax, distributions=distributions, kinetic_energy=kinergy, matrix_element=matrix_element, matrix_args=dict(matrix_args) if matrix_args else None, plot_individual=True, ) residual = counts - final_result ax.scatter(self.angles, residual, label='Residual') ax.legend() return fig
[docs] def fit_selection(self, distributions, energy_value=None, energy_range=None, matrix_element=None, matrix_args=None, ax=None, **kwargs): r""" """ import matplotlib.pyplot as plt from matplotlib.widgets import Slider from copy import deepcopy import string import sys import warnings from lmfit import Minimizer from scipy.ndimage import gaussian_filter from .functions import construct_parameters, build_distributions, \ residual, resolve_param_name # Wrapper kwargs title = kwargs.pop("title", None) savefig = kwargs.pop("savefig", None) show = kwargs.pop("show", True) fig_close = kwargs.pop("fig_close", False) tight_layout = kwargs.pop("tight_layout", False) ax_grid = kwargs.pop("ax_grid", None) ax_annotate = kwargs.pop("ax_annotate", False) size_kwargs = kwargs.pop("size_kwargs", None) self._validate_angle_resolution('fit_selection') ax, fig, plt = get_ax_fig_plt(ax=ax) energies = self.enel new_distributions = deepcopy(distributions) if energy_value is not None and energy_range is not None: raise ValueError( "Provide at most energy_value or energy_range, not both.") if np.isscalar(energies): if energy_value is not None or energy_range is not None: raise ValueError( "This dataset contains only one momentum-distribution " "curve; do not provide energy_value or energy_range." ) kinergies = np.atleast_1d(self.ekin) intensities = np.atleast_2d(self.intensities) else: if energy_value is not None: if (energy_value < energies.min() or energy_value > energies.max()): raise ValueError( f"Requested energy_value {energy_value:.3f} eV is " f"outside the available energy range " f"[{energies.min():.3f}, {energies.max():.3f}] eV." ) idx = np.abs(energies - energy_value).argmin() indices = np.atleast_1d(idx) kinergies = self.ekin[indices] intensities = self.intensities[indices, :] elif energy_range is not None: e_min, e_max = energy_range indices = np.where((energies >= e_min) & (energies <= e_max))[0] if len(indices) == 0: raise ValueError("No energies found in the specified energy_range.") kinergies = self.ekin[indices] intensities = self.intensities[indices, :] else: # Without specifying a range, all MDCs are plotted kinergies = self.ekin intensities = self.intensities # Final shape guard kinergies = np.atleast_1d(kinergies) intensities = np.atleast_2d(intensities) all_final_results = [] all_residuals = [] all_individual_results = [] # List of (n_individuals, n_angles) aggregated_properties = {} # map class_name -> parameter names to extract param_spec = { 'Constant': ('offset',), 'Linear': ('offset', 'slope'), 'SpectralLinear': ('amplitude', 'peak', 'broadening'), 'SpectralQuadratic': ('amplitude', 'peak', 'broadening'), } order = np.argsort(kinergies)[::-1] for idx in order: kinergy = kinergies[idx] intensity = intensities[idx] if matrix_element is not None: parameters, element_names = construct_parameters( new_distributions, matrix_args) new_distributions = build_distributions(new_distributions, parameters) mini = Minimizer( residual, parameters, fcn_args=(self.angles, intensity, self.angle_resolution, new_distributions, kinergy, self.hnuminPhi, matrix_element, element_names) ) else: parameters = construct_parameters(new_distributions) new_distributions = build_distributions(new_distributions, parameters) mini = Minimizer( residual, parameters, fcn_args=(self.angles, intensity, self.angle_resolution, new_distributions, kinergy, self.hnuminPhi) ) outcome = mini.minimize('least_squares') pcov = outcome.covar var_names = getattr(outcome, 'var_names', None) if not var_names: var_names = [n for n, p in outcome.params.items() if p.vary] var_idx = {n: i for i, n in enumerate(var_names)} param_sigma_full = {} for name, par in outcome.params.items(): sigma = None if pcov is not None and name in var_idx: d = pcov[var_idx[name], var_idx[name]] if np.isfinite(d) and d >= 0: sigma = float(np.sqrt(d)) if sigma is None: s = getattr(par, 'stderr', None) sigma = float(s) if s is not None else None param_sigma_full[name] = sigma # Rebuild the *fitted* distributions from optimized params fitted_distributions = build_distributions(new_distributions, outcome.params) # If using a matrix element, extract slice-specific args from the fit if matrix_element is not None: new_matrix_args = {key: outcome.params[key].value for key in matrix_args} else: new_matrix_args = None # individual curves (smoothed, cropped) and final sum (no plotting here) extend, step, numb = extend_function(self.angles, self.angle_resolution) total_result_ext = np.zeros_like(extend) indiv_rows = [] # (n_individuals, n_angles) individual_labels = [] for dist in fitted_distributions: # evaluate each component on the extended grid if getattr(dist, 'class_name', None) == 'SpectralQuadratic': if (getattr(dist, 'center_angle', None) is not None) and ( kinergy is None or self.hnuminPhi is None ): raise ValueError( 'Spectral quadratic function is defined in terms ' 'of a center angle. Please provide a kinetic energy ' 'and hnuminPhi.' ) extended_result = dist.evaluate(extend, kinergy, self.hnuminPhi) else: extended_result = dist.evaluate(extend) if matrix_element is not None and hasattr(dist, 'index'): args = new_matrix_args or {} extended_result *= matrix_element(extend, **args) total_result_ext += extended_result # smoothed & cropped individual individual_curve = gaussian_filter(extended_result, sigma=step)[ numb:-numb if numb else None ] indiv_rows.append(np.asarray(individual_curve)) # label label = getattr(dist, 'label', str(dist)) individual_labels.append(label) # ---- collect parameters for this distribution # (Aggregated over slices) cls = getattr(dist, 'class_name', None) wanted = param_spec.get(cls, ()) # ensure dicts exist label_bucket = aggregated_properties.setdefault(label, {}) class_bucket = label_bucket.setdefault( cls, {'label': label, '_class': cls} ) # store center_wavevector (scalar) for SpectralQuadratic if ( cls == 'SpectralQuadratic' and hasattr(dist, 'center_wavevector') ): class_bucket.setdefault( 'center_wavevector', dist.center_wavevector ) # ensure keys for both values and sigmas for pname in wanted: class_bucket.setdefault(pname, []) class_bucket.setdefault(f"{pname}_sigma", []) # append values and sigmas in the order of slices for pname in wanted: param_key = resolve_param_name(outcome.params, label, pname) if param_key is not None and param_key in outcome.params: class_bucket[pname].append(outcome.params[param_key].value) class_bucket[f"{pname}_sigma"].append(param_sigma_full.get(param_key, None)) else: # Not fitted in this slice → keep the value if present on the dist, sigma=None class_bucket[pname].append(getattr(dist, pname, None)) class_bucket[f"{pname}_sigma"].append(None) # final (sum) curve, smoothed & cropped final_result_i = gaussian_filter(total_result_ext, sigma=step)[ numb:-numb if numb else None] final_result_i = np.asarray(final_result_i) # Residual for this slice residual_i = np.asarray(intensity) - final_result_i # Store per-slice results all_final_results.append(final_result_i) all_residuals.append(residual_i) all_individual_results.append(np.vstack(indiv_rows)) # --- after the reversed-order loop, restore original (ascending) order --- inverse_order = np.argsort(np.argsort(kinergies)[::-1]) # Reorder per-slice arrays/lists computed in the loop all_final_results[:] = [all_final_results[i] for i in inverse_order] all_residuals[:] = [all_residuals[i] for i in inverse_order] all_individual_results[:] = [all_individual_results[i] for i in inverse_order] # Reorder all per-slice lists in aggregated_properties for label_dict in aggregated_properties.values(): for cls_dict in label_dict.values(): for key, val in cls_dict.items(): if isinstance(val, list) and len(val) == len(kinergies): cls_dict[key] = [val[i] for i in inverse_order] self._ekin_range = kinergies self._individual_properties = aggregated_properties if np.isscalar(energies): # One slice only: plot MDC, Fit, Residual, and Individuals ydata = np.asarray(intensities).squeeze() yfit = np.asarray(all_final_results[0]).squeeze() yres = np.asarray(all_residuals[0]).squeeze() yind = np.asarray(all_individual_results[0]) ax.scatter(self.angles, ydata, label="Data") # plot individuals with their labels for j, lab in enumerate(individual_labels or []): ax.plot(self.angles, yind[j], label=str(lab)) ax.plot(self.angles, yfit, label="Fit") ax.scatter(self.angles, yres, label="Residual") ax.set_title(f"Energy slice: {energies * KILO:.3f} meV") ax.relim() # recompute data limits from all artists ax.autoscale_view() # apply autoscaling + axes.ymargin padding else: if energy_value is not None: _idx = int(np.abs(energies - energy_value).argmin()) energies_sel = np.atleast_1d(energies[_idx]) elif energy_range is not None: e_min, e_max = energy_range energies_sel = energies[(energies >= e_min) & (energies <= e_max)] else: energies_sel = energies # Number of slices must match n_slices = len(all_final_results) assert intensities.shape[0] == n_slices == len(all_residuals) \ == len(all_individual_results), (f"Mismatch: data \ {intensities.shape[0]}, fits {len(all_final_results)}, " f"residuals {len(all_residuals)}, \ individuals {len(all_individual_results)}." ) n_individuals = all_individual_results[0].shape[0] \ if n_slices else 0 fig.subplots_adjust(bottom=0.25) idx = 0 # Initial draw (MDC + Individuals + Fit + Residual) at slice 0 scatter = ax.scatter(self.angles, intensities[idx], label="Data") individual_lines = [] if n_individuals: for j in range(n_individuals): if individual_labels and j < len(individual_labels): label = str(individual_labels[j]) else: label = f"Comp {j}" yvals = all_individual_results[idx][j] line, = ax.plot(self.angles, yvals, label=label) individual_lines.append(line) result_line, = ax.plot(self.angles, all_final_results[idx], label="Fit") resid_scatter = ax.scatter(self.angles, all_residuals[idx], label="Residual") # Title + limits (use only the currently shown slice) ax.set_title(f"Energy slice: {energies_sel[idx] * KILO:.3f} meV") ax.relim() # recompute data limits from all artists ax.autoscale_view() # apply autoscaling + axes.ymargin padding # Suppress warning when a single MDC is plotted warnings.filterwarnings( "ignore", message="Attempting to set identical left == right", category=UserWarning ) # Slider over slice index (0..n_slices-1) slider_ax = fig.add_axes([0.2, 0.08, 0.6, 0.04]) slider = Slider( slider_ax, "Index", 0, n_slices - 1, valinit=idx, valstep=1 ) def update(val): i = int(slider.val) # Update MDC points scatter.set_offsets(np.c_[self.angles, intensities[i]]) # Update individuals if n_individuals: Yi = all_individual_results[i] # (n_individuals, n_angles) for j, ln in enumerate(individual_lines): ln.set_ydata(Yi[j]) # Update fit and residual result_line.set_ydata(all_final_results[i]) resid_scatter.set_offsets(np.c_[self.angles, all_residuals[i]]) ax.relim() ax.autoscale_view() # Update title and redraw ax.set_title(f"Energy slice: " f"{energies_sel[i] * KILO:.3f} meV") fig.canvas.draw_idle() slider.on_changed(update) self._slider = slider self._line = scatter self._individual_lines = individual_lines self._result_line = result_line self._resid_scatter = resid_scatter ax.set_xlabel("Angle (°)") ax.set_ylabel("Counts (-)") ax.legend() self._fig = fig if size_kwargs: fig.set_size_inches(size_kwargs.pop("w"), size_kwargs.pop("h"), **size_kwargs) if title: fig.suptitle(title) if tight_layout: fig.tight_layout() if savefig: fig.savefig(savefig) if ax_grid is not None: for axis in fig.axes: axis.grid(bool(ax_grid)) if ax_annotate: tags = string.ascii_lowercase for i, axis in enumerate(fig.axes): axis.annotate(f"({tags[i]})", xy=(0.05, 0.95), xycoords="axes fraction") is_interactive = hasattr(sys, 'ps1') or 'ipykernel' in sys.modules is_cli = not is_interactive if show: if is_cli: plt.show() if fig_close: plt.close(fig) if not show and (fig_close or is_cli): return None return fig
[docs] @add_fig_kwargs def fit(self, distributions, energy_value=None, matrix_element=None, matrix_args=None, ax=None): r""" """ from copy import deepcopy from lmfit import Minimizer from .functions import construct_parameters, build_distributions, \ residual counts, kinergy = self.energy_check(energy_value) ax, fig, plt = get_ax_fig_plt(ax=ax) ax.set_xlabel('Angle ($\\degree$)') ax.set_ylabel('Counts (-)') ax.set_title(f"Energy slice: " f"{(kinergy - self.hnuminPhi) * KILO:.3f} meV") ax.scatter(self.angles, counts, label='Data') new_distributions = deepcopy(distributions) if matrix_element is not None: parameters, element_names = construct_parameters(distributions, matrix_args) new_distributions = build_distributions(new_distributions, \ parameters) mini = Minimizer( residual, parameters, fcn_args=(self.angles, counts, self.angle_resolution, new_distributions, kinergy, self.hnuminPhi, matrix_element, element_names)) else: parameters = construct_parameters(distributions) new_distributions = build_distributions(new_distributions, parameters) mini = Minimizer(residual, parameters, fcn_args=(self.angles, counts, self.angle_resolution, new_distributions, kinergy, self.hnuminPhi)) outcome = mini.minimize('least_squares') pcov = outcome.covar # If matrix params were fitted, pass the fitted values to plotting if matrix_element is not None: new_matrix_args = {key: outcome.params[key].value for key in matrix_args} else: new_matrix_args = None final_result = self._merge_and_plot(ax=ax, distributions=new_distributions, kinetic_energy=kinergy, matrix_element=matrix_element, matrix_args=new_matrix_args, plot_individual=True) residual_vals = counts - final_result ax.scatter(self.angles, residual_vals, label='Residual') ax.legend() if matrix_element is not None: return fig, new_distributions, pcov, new_matrix_args else: return fig, new_distributions, pcov
def _merge_and_plot(self, ax, distributions, kinetic_energy, matrix_element=None, matrix_args=None, plot_individual=True): r""" Evaluate distributions on the extended grid, apply optional matrix element, smooth, plot individuals and the summed curve. Returns ------- final_result : np.ndarray Smoothed, cropped total distribution aligned with self.angles. """ from scipy.ndimage import gaussian_filter # Build extended grid extend, step, numb = extend_function(self.angles, self.angle_resolution) total_result = np.zeros_like(extend) for dist in distributions: # Special handling for SpectralQuadratic if getattr(dist, 'class_name', None) == 'SpectralQuadratic': if (getattr(dist, 'center_angle', None) is not None) and ( kinetic_energy is None or self.hnuminPhi is None ): raise ValueError( 'Spectral quadratic function is defined in terms ' 'of a center angle. Please provide a kinetic energy ' 'and hnuminPhi.' ) extended_result = dist.evaluate(extend, kinetic_energy, \ self.hnuminPhi) else: extended_result = dist.evaluate(extend) # Optional matrix element (only for components that advertise an index) if matrix_element is not None and hasattr(dist, 'index'): args = matrix_args or {} extended_result *= matrix_element(extend, **args) total_result += extended_result if plot_individual and ax: individual = gaussian_filter(extended_result, sigma=step)\ [numb:-numb if numb else None] ax.plot(self.angles, individual, label=getattr(dist, \ 'label', str(dist))) # Smoothed, cropped total curve aligned to self.angles final_result = gaussian_filter(total_result, sigma=step)[numb:-numb \ if numb else None] if ax: ax.plot(self.angles, final_result, label='Distribution sum') return final_result
[docs] def expose_parameters(self, select_label, fermi_wavevector=None, fermi_velocity=None, bare_mass=None, side=None): r""" Select and return fitted parameters for a given component label, plus a flat export dictionary containing values **and** 1σ uncertainties. Parameters ---------- select_label : str Label to look for among the fitted distributions. fermi_wavevector : float, optional Optional Fermi wave vector to include. fermi_velocity : float, optional Optional Fermi velocity to include. bare_mass : float, optional Optional bare mass to include (used for SpectralQuadratic dispersions). side : {'left','right'}, optional Optional side selector for SpectralQuadratic dispersions. Returns ------- ekin_range : np.ndarray Kinetic-energy grid corresponding to the selected label. hnuminPhi : float Photoelectron work-function offset. energy_resolution : float Energy resolution associated with the extracted self-energy data. temperature : float Temperature [K] associated with the extracted self-energy data. label : str Label of the selected distribution. selected_properties : dict or list of dict Nested dictionary (or list thereof) containing <param> and <param>_sigma arrays. For SpectralQuadratic components, a scalar `center_wavevector` is also present. exported_parameters : dict Flat dictionary of parameters and their uncertainties, plus optional Fermi quantities and `side`. For SpectralQuadratic components, `center_wavevector` is included and taken directly from the fitted distribution. """ if self._ekin_range is None: raise AttributeError( "ekin_range not yet set. Run `.fit_selection()` first." ) store = getattr(self, "_individual_properties", None) if not store or select_label not in store: all_labels = (sorted(store.keys()) if isinstance(store, dict) else []) raise ValueError( f"Label '{select_label}' not found in available labels: " f"{all_labels}" ) # Convert lists → numpy arrays within the selected label’s classes. # Keep scalar center_wavevector as a scalar. per_class_dicts = [] for cls, bucket in store[select_label].items(): dct = {} for k, v in bucket.items(): if k in ("label", "_class"): dct[k] = v elif k == "center_wavevector": # keep scalar as-is, do not wrap in np.asarray dct[k] = v else: dct[k] = np.asarray(v) per_class_dicts.append(dct) selected_properties = ( per_class_dicts[0] if len(per_class_dicts) == 1 else per_class_dicts ) # Flat export dict: simple keys, includes optional extras exported_parameters = { "fermi_wavevector": fermi_wavevector, "fermi_velocity": fermi_velocity, "bare_mass": bare_mass, "side": side, } # Collect parameters without prefixing by class. This will also include # center_wavevector from the fitted SpectralQuadratic class, and since # there is no function argument with that name, it cannot be overridden. if isinstance(selected_properties, dict): for key, val in selected_properties.items(): if key not in ("label", "_class"): exported_parameters[key] = val else: # If multiple classes, merge sequentially # (last overwrites same-name keys). for cls_bucket in selected_properties: for key, val in cls_bucket.items(): if key not in ("label", "_class"): exported_parameters[key] = val return (self._ekin_range, self.hnuminPhi, self.energy_resolution, self.temperature, select_label, selected_properties, exported_parameters)