Source code for xarpes.functions

# Copyright (C) 2025 xARPES Developers
# This program is free software under the terms of the GNU GPLv3 license.

"""Separate functions mostly used in conjunction with various classes."""

import numpy as np

[docs] def resolve_param_name(params, label, pname): """ Try to find the lmfit param key corresponding to this component `label` and bare parameter name `pname` (e.g., 'amplitude', 'peak', 'broadening'). Works with common token separators. """ import re names = list(params.keys()) # Fast exact candidates candidates = ( f"{pname}_{label}", f"{label}_{pname}", f"{pname}:{label}", f"{label}:{pname}", f"{label}.{pname}", f"{label}|{pname}", f"{label}-{pname}", f"{pname}-{label}", ) for c in candidates: if c in params: return c # Regex fallback: label and pname as tokens in any order esc_l = re.escape(str(label)) esc_p = re.escape(str(pname)) tok = r"[.:/_\-]" # common separators pat = re.compile(rf"(^|{tok}){esc_l}({tok}|$).*({tok}){esc_p}({tok}|$)") for n in names: if pat.search(n): return n # Last resort: unique tail match on pname that also contains the label somewhere tails = [n for n in names if n.endswith(pname) and str(label) in n] if len(tails) == 1: return tails[0] # Give up return None
[docs] def build_distributions(distributions, parameters): r"""TBD """ for dist in distributions: if dist.class_name == 'Constant': dist.offset = parameters['offset_' + dist.label].value elif dist.class_name == 'Linear': dist.offset = parameters['offset_' + dist.label].value dist.slope = parameters['slope_' + dist.label].value elif dist.class_name == 'SpectralLinear': dist.amplitude = parameters['amplitude_' + dist.label].value dist.peak = parameters['peak_' + dist.label].value dist.broadening = parameters['broadening_' + dist.label].value elif dist.class_name == 'SpectralQuadratic': dist.amplitude = parameters['amplitude_' + dist.label].value dist.peak = parameters['peak_' + dist.label].value dist.broadening = parameters['broadening_' + dist.label].value return distributions
[docs] def construct_parameters(distribution_list, matrix_args=None): r"""TBD """ from lmfit import Parameters parameters = Parameters() for dist in distribution_list: if dist.class_name == 'Constant': parameters.add(name='offset_' + dist.label, value=dist.offset) elif dist.class_name == 'Linear': parameters.add(name='offset_' + dist.label, value=dist.offset) parameters.add(name='slope_' + dist.label, value=dist.slope) elif dist.class_name == 'SpectralLinear': parameters.add(name='amplitude_' + dist.label, value=dist.amplitude, min=0) parameters.add(name='peak_' + dist.label, value=dist.peak) parameters.add(name='broadening_' + dist.label, value=dist.broadening, min=0) elif dist.class_name == 'SpectralQuadratic': parameters.add(name='amplitude_' + dist.label, value=dist.amplitude, min=0) parameters.add(name='peak_' + dist.label, value=dist.peak) parameters.add(name='broadening_' + dist.label, value=dist.broadening, min=0) if matrix_args is not None: element_names = list() for key, value in matrix_args.items(): parameters.add(name=key, value=value) element_names.append(key) return parameters, element_names else: return parameters
[docs] def residual(parameters, xdata, ydata, angle_resolution, new_distributions, kinetic_energy, hnuminPhi, matrix_element=None, element_names=None): r""" """ from scipy.ndimage import gaussian_filter from xarpes.distributions import Dispersion if matrix_element is not None: matrix_parameters = {} for name in element_names: if name in parameters: matrix_parameters[name] = parameters[name].value new_distributions = build_distributions(new_distributions, parameters) extend, step, numb = extend_function(xdata, angle_resolution) model = np.zeros_like(extend) for dist in new_distributions: if getattr(dist, 'class_name', type(dist).__name__) == \ 'SpectralQuadratic': part = dist.evaluate(extend, kinetic_energy, hnuminPhi) else: part = dist.evaluate(extend) if (matrix_element is not None) and isinstance(dist, Dispersion): part *= matrix_element(extend, **matrix_parameters) model += part model = gaussian_filter(model, sigma=step)[numb:-numb if numb else None] return model - ydata
[docs] def extend_function(abscissa_range, abscissa_resolution): r"""TBD """ from .constants import FWHM2STD from . import settings_parameters as xprs step_size = np.abs(abscissa_range[1] - abscissa_range[0]) step = abscissa_resolution / (step_size * FWHM2STD) numb = int(xprs.sigma_extend * step) extend = np.linspace(abscissa_range[0] - numb * step_size, abscissa_range[-1] + numb * step_size, len(abscissa_range) + 2 * numb) return extend, step, numb
[docs] def error_function(p, xdata, ydata, function, resolution, yerr, *extra_args): r"""The error function used inside the fit_least_squares function. Parameters ---------- p : ndarray Array of parameters during the optimization. xdata : ndarray Abscissa values the function is evaluated on. ydata : ndarray Measured values to compare to. function : callable Function or class with __call__ method to evaluate. resolution : float or None Convolution resolution (sigma), if applicable. yerr : ndarray Standard deviations of ydata. extra_args : tuple Additional arguments passed to function. Returns ------- residual : ndarray Normalized residuals between model and ydata. """ from scipy.ndimage import gaussian_filter if resolution: extend, step, numb = extend_function(xdata, resolution) model = gaussian_filter(function(extend, *p, *extra_args), sigma=step) model = model[numb:-numb if numb else None] else: model = function(xdata, *p, *extra_args) residual = (model - ydata) / yerr return residual
[docs] def fit_least_squares(p0, xdata, ydata, function, resolution=None, yerr=None, bounds=None, extra_args=None): r"""Least-squares fit using `scipy.optimize.least_squares`. Default behavior is Levenberg–Marquardt (`method="lm"`) when unbounded. If `bounds` is provided, switches to trust-region reflective (`"trf"`). Returns (pfit, pcov, success) in the same style as the old `leastsq` wrapper, with an additional boolean `success` from SciPy. """ from scipy.optimize import least_squares if yerr is None: yerr = np.ones_like(ydata) if extra_args is None: extra_args = () def _residuals(p): return error_function( p, xdata, ydata, function, resolution, yerr, *extra_args ) if bounds is None: res = least_squares(_residuals, p0, method="lm") else: res = least_squares(_residuals, p0, method="trf", bounds=bounds) pfit = res.x success = bool(getattr(res, "success", False)) m = len(ydata) n = pfit.size if (m > n) and (res.jac is not None) and res.jac.size: resid = res.fun s_sq = (resid ** 2).sum() / (m - n) try: jtj = res.jac.T @ res.jac pcov = np.linalg.inv(jtj) * s_sq except np.linalg.LinAlgError: pcov = np.inf else: pcov = np.inf return pfit, pcov, success
[docs] def download_examples(): """Downloads the examples folder from the main xARPES repository only if it does not already exist in the current directory. Prints executed steps and a final cleanup/failure message. Returns ------- 0 or 1 : int Returns 0 if the execution succeeds, 1 if it fails. """ import requests import zipfile import os import shutil import io import jupytext import tempfile import re # importlib.metadata is stdlib from 3.8; use backport if on 3.7 try: from importlib.metadata import version, PackageNotFoundError except ImportError: # Python 3.7 + backport from importlib_metadata import version, PackageNotFoundError # Main xARPES repo (examples live under /examples there) repo_url = "https://github.com/xARPES/xARPES" output_dir = "." # Directory from which the function is called # Target 'examples' directory in the user's current location final_examples_path = os.path.join(output_dir, "examples") if os.path.exists(final_examples_path): print("Warning: 'examples' folder already exists. " "No download will be performed.") return 1 # Exit the function if 'examples' directory exists # --- Determine version from installed package metadata ------------------- tag_version = None raw_version = None try: raw_version = version("xarpes") raw_version = str(raw_version) # Strip dev/local suffixes so that '0.3.3.dev1' or '0.3.3+0.gHASH' # maps to the tag 'v0.3.3'. If you use plain '0.3.3' already, this is # a no-op. m = re.match(r"(\d+\.\d+\.\d+)", raw_version) if m: tag_version = m.group(1) else: tag_version = raw_version print("Determined installed xARPES version from package metadata: " f"{raw_version} (using tag version '{tag_version}').") except PackageNotFoundError: print("Warning: could not determine installed 'xarpes' package version; " "will skip tag-based download and try the main branch only.") # --- Build refs and use for–else to try them in order ------------------- repo_parts = repo_url.replace("https://github.com/", "").rstrip("/") refs_to_try = [] if tag_version is not None: refs_to_try.append(f"tags/v{tag_version}") # version-matched examples refs_to_try.append("heads/main") # fallback: latest examples response = None selected_ref = None for ref in refs_to_try: zip_url = f"https://github.com/{repo_parts}/archive/refs/{ref}.zip" print(f"Attempting to download examples from '{ref}':\n {zip_url}") response = requests.get(zip_url) if response.status_code == 200: selected_ref = ref if ref.startswith("tags/"): print(f"Successfully downloaded examples from tagged release " f"'v{tag_version}'.") else: print("Tagged release not available; using latest examples " "from the 'main' branch instead.") break else: print("Failed to download from this ref. HTTP status code: " f"{response.status_code}") else: # for–else: only executed if we never hit 'break' print("Error: could not download examples from any ref " f"(tried: {', '.join(refs_to_try)}).") return 1 # At this point, 'response' holds a successful download zip_file_bytes = io.BytesIO(response.content) # --- Extract into a temporary directory to avoid polluting CWD ---------- with tempfile.TemporaryDirectory() as tmpdir: with zipfile.ZipFile(zip_file_bytes, "r") as zip_ref: zip_ref.extractall(tmpdir) # First member gives us the top-level directory in the archive, # typically something like 'xARPES-0.3.3/' or 'xARPES-main/'. first_member = zip_ref.namelist()[0] top_level_dir = first_member.split("/")[0] main_folder_path = os.path.join(tmpdir, top_level_dir) examples_path = os.path.join(main_folder_path, "examples") if not os.path.exists(examples_path): print("Error: downloaded archive does not contain an 'examples' " "directory.") return 1 print(f"Using examples from ref: {selected_ref}") # Move the 'examples' directory to the target location in the CWD shutil.move(examples_path, final_examples_path) print(f"'examples' subdirectory moved to {final_examples_path}") # Convert all .Rmd files in the examples directory to .ipynb # and delete the .Rmd files for dirpath, dirnames, filenames in os.walk(final_examples_path): for filename in filenames: if filename.endswith(".Rmd"): full_path = os.path.join(dirpath, filename) jupytext.write( jupytext.read(full_path), full_path.replace(".Rmd", ".ipynb") ) os.remove(full_path) # Deletes .Rmd file afterwards print(f"Converted and deleted {full_path}") # Temporary directory is cleaned up automatically print("Cleaned up temporary files.") return 0
[docs] def set_script_dir(): r"""This function sets the directory such that the xARPES code can be executed either inside IPython environments or as .py scripts from arbitrary locations. """ import os import inspect try: # This block checks if the script is running in an IPython environment cfg = get_ipython().config script_dir = os.getcwd() except NameError: # If not in IPython, get the caller's file location frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) script_dir = os.path.dirname(os.path.abspath(module.__file__)) except: # If __file__ isn't defined, fall back to current working directory script_dir = os.getcwd() return script_dir
[docs] def MEM_core(dvec, model_in, uvec, mu, aval, wvec, V_Sigma, U, t_criterion, iter_max): r""" Implementation of Bryan's algorithm (not to be confused with Bryan's 'method' for determining the Lagrange multiplier aval. For details, see Eur. Biophys. J. 18, 165 (1990). """ import numpy as np import warnings spectrum_in = model_in * np.exp(U @ uvec) # Eq. 9 avalmu = aval + mu converged = False iter_count = 0 while not converged and iter_count < iter_max: T = V_Sigma @ (U.T @ spectrum_in) # Below Eq. 7 gvec = V_Sigma.T @ (wvec * (T - dvec)) # Eq. 10 M = V_Sigma.T @ (wvec[:, None] * V_Sigma) # Above Eq. 11 K = U.T @ (spectrum_in[:, None] * U) # Above Eq. 11 xi, P = np.linalg.eigh(K) # Eq. 13 sqrt_xi = np.sqrt(xi) P_sqrt_xi = P * sqrt_xi[None, :] A = P_sqrt_xi.T @ (M @ P_sqrt_xi) # Between Eqs. 13 and 14 Lambda, R = np.linalg.eigh(A) # Eq. 14 Y_inv = R.T @ (sqrt_xi[:, None] * P.T) # Below Eq. 15 # From Eq. 16: Y_inv_du = -(Y_inv @ (aval * uvec + gvec)) / (avalmu + Lambda) d_uvec = ( -aval * uvec - gvec - M @ (Y_inv.T @ Y_inv_du) ) / avalmu # Eq. 20 uvec += d_uvec spectrum_in = model_in * np.exp(U @ uvec) # Eq. 9 # Convergence block: Section 2.3 aval_K_u = aval * (K @ uvec) # Skipping the minus sign twice K_g = K @ gvec tcon = ( 2 * np.linalg.norm(aval_K_u + K_g)**2 / (np.linalg.norm(aval_K_u) + np.linalg.norm(K_g))**2 ) converged = (tcon < t_criterion) iter_count += 1 if not converged: with warnings.catch_warnings(): warnings.simplefilter("always", RuntimeWarning) warnings.warn( f"MEM_core did not converge within iter_max={iter_max} " f"(performed {iter_count} iterations).", category=RuntimeWarning, stacklevel=2, ) return spectrum_in, uvec
[docs] def bose_einstein(omega, k_BT): """Bose-Einstein distribution n_B(omega) for k_BT > 0 and omega >= 0.""" x_over = np.log(np.finfo(float).max) # ~709.78 for float64 x = omega / k_BT out = np.empty_like(omega, dtype=float) momega0 = (omega == 0) if np.any(momega0): out[momega0] = np.inf mpos_big = (x > x_over) & (omega != 0) if np.any(mpos_big): out[mpos_big] = 0.0 mnorm = (omega != 0) & ~mpos_big if np.any(mnorm): out[mnorm] = 1.0 / np.expm1(x[mnorm]) return out
[docs] def fermi(omega, k_BT): """Fermi-Dirac distribution f(omega) for k_BT > 0 and omega >= 0. Could potentially be made a core block of the FermiDirac distribution.""" x_over = np.log(np.finfo(float).max) # ~709.78 for float64 x = omega / k_BT out = np.empty_like(omega, dtype=float) mover = x > x_over out[mover] = 0.0 mnorm = ~mover y = np.exp(-x[mnorm]) out[mnorm] = y / (1.0 + y) return out
[docs] def create_kernel_function(enel, omega, k_BT): r"""Kernel function. Eq. 17 from https://arxiv.org/abs/2508.13845. Returns ------- K : ndarray, complex Shape (enel.size, omega.size) if enel and omega are 1D. """ from scipy.special import digamma enel = enel[:, None] # (Ne, 1) omega = omega[None, :] # (1, Nw) denom = 2.0 * np.pi * k_BT K = (digamma(0.5 - 1j * (enel - omega) / denom) - digamma(0.5 - 1j * (enel + omega) / denom) - 2j * np.pi * (bose_einstein(omega, k_BT) + 0.5)) return K
[docs] def singular_value_decomposition(kernel, sigma_svd): r""" Some papers use kernel = U Sigma V^T; we follow Bryan's algorithm. """ V, Sigma, U_transpose = np.linalg.svd(kernel) U = U_transpose.T Sigma = Sigma[Sigma > sigma_svd] s_reduced = Sigma.size V = V[:, :s_reduced] U = U[:, :s_reduced] V_Sigma = V * Sigma[None, :] uvec = np.zeros(s_reduced) print('Dimensionality has been reduced from a matrix of rank ' + str(min(kernel.shape)) + ' to ' + str(int(s_reduced)) + ' in the singular space.') return V_Sigma, U, uvec
[docs] def create_model_function(omega, omega_I, omega_M, omega_S, h_n): r"""Piecewise model m_n(omega) defined on the omega grid. Implements the piecewise definition in the figure, interpreting omega_min/max as omega.min()/omega.max(). Parameters ---------- omega : ndarray Frequency grid (assumed sorted, but only min/max are used). omega_I : float ω_n^I omega_M : float ω_n^M omega_S : float ω_n^S h_n : float h_n in the prefactor m_n(omega) = 2 h_n * ( ... ). Returns ------- model : ndarray m_n(omega) evaluated on the omega grid. """ w_min = omega.min() w_max = omega.max() if omega_I <= 0: raise ValueError("omega_I must be > 0.") denom = w_max + omega_S - omega_M if denom == 0: raise ValueError("omega_max + omega_S - omega_M must be nonzero.") w_I_half = 0.5 * omega_I w_mid = 0.5 * (w_max + omega_S + omega_M) domains = np.empty_like(omega) m1 = (omega >= w_min) & (omega < w_I_half) domains[m1] = (omega[m1] / omega_I) ** 2 m2 = (omega >= w_I_half) & (omega < omega_I) domains[m2] = 0.5 - (omega[m2] / omega_I - 1.0) ** 2 m3 = (omega >= omega_I) & (omega < omega_M) domains[m3] = 0.5 m4 = (omega >= omega_M) & (omega < w_mid) domains[m4] = 0.5 - ((omega[m4] - omega_M) / denom) ** 2 m5 = (omega >= w_mid) & (omega <= w_max) domains[m5] = ((omega[m5] - omega_M) / denom - 1.0) ** 2 return 2.0 * h_n * domains
[docs] def chi2kink_logistic(x, a, b, c, d): """Four-parameter logistic (scaled sigmoid), evaluated stably. Parameters ---------- x : array_like Input values. a : float Lower asymptote. b : float Amplitude (upper - lower). c : float Midpoint (inflection point). d : float Slope parameter (steepness). Returns ------- phi : ndarray Logistic curve evaluated at x. """ z = d * (x - c) phi = np.empty_like(z, dtype=float) mpos = z >= 0 if np.any(mpos): phi[mpos] = a + b / (1.0 + np.exp(-z[mpos])) mneg = ~mpos if np.any(mneg): expz = np.exp(z[mneg]) phi[mneg] = a + b * expz / (1.0 + expz) return phi
def _in_ipython(): try: from IPython import get_ipython except Exception: return False return get_ipython() is not None class _LineBuffer: """Capture text as lines, keeping only head + tail and total count.""" def __init__(self, head=10, tail=10): from collections import deque self.head = int(head) self.tail = int(tail) self._head_lines = [] self._tail_lines = deque(maxlen=self.tail) self._partial = "" self._n_lines = 0 def feed(self, text): if not text: return # Normalize carriage returns (progress bars etc.) text = str(text).replace("\r", "\n") # Prepend any partial line from previous write. text = self._partial + text parts = text.split("\n") # If text does not end with '\n', last part is partial. self._partial = parts.pop() # may be "" if ended with newline for line in parts: # Record completed line self._n_lines += 1 if len(self._head_lines) < self.head: self._head_lines.append(line) else: if self.tail > 0: self._tail_lines.append(line) def finalize(self): # If there is an unfinished line, treat it as a line. if self._partial.strip() != "": self.feed("\n") # forces it into completed lines self._partial = "" def summary(self): self.finalize() n = self._n_lines if n == 0: return "" # If total small, reconstruct from head + tail if possible if n <= self.head + self.tail: # We didn't keep all middle lines, but if n <= head+tail we kept all. # For n <= head, tail may be empty; for n > head, tail contains rest. out = list(self._head_lines) if n > len(self._head_lines): out.extend(list(self._tail_lines)) return "\n".join(out) omitted = n - self.head - self.tail head_txt = "\n".join(self._head_lines) tail_txt = "\n".join(list(self._tail_lines)) return ( f"{head_txt}\n" f"... ({omitted} lines omitted) ...\n" f"{tail_txt}" )
[docs] def trim_notebook_output(print_lines=10, *, enabled=True, clear=True, capture_stderr=True, sleep_s=0.05): """Context manager: show live prints, then keep only head/tail in notebooks. In plain scripts (no IPython), this is a no-op. """ from contextlib import contextmanager @contextmanager def _ctx(): if (not enabled) or (print_lines is None): yield return n = int(print_lines) if n <= 0: yield return # Only do this in IPython environments; scripts should print normally. if not _in_ipython(): yield return import sys import time buf = _LineBuffer(head=n, tail=n) stdout_orig = sys.stdout stderr_orig = sys.stderr class _TeeStream: def __init__(self, real_stream, buffer): self._real = real_stream self._buf = buffer def write(self, text): # Live output try: self._real.write(text) self._real.flush() except Exception: pass # Capture self._buf.feed(text) def flush(self): try: self._real.flush() except Exception: pass # Some libraries ask for these: def isatty(self): return False @property def encoding(self): return getattr(self._real, "encoding", "utf-8") sys.stdout = _TeeStream(stdout_orig, buf) if capture_stderr: sys.stderr = _TeeStream(stderr_orig, buf) try: yield finally: # Restore streams first. sys.stdout = stdout_orig sys.stderr = stderr_orig # Let Jupyter finish rendering any queued output. if sleep_s: time.sleep(float(sleep_s)) # Clear cell output and print summary. summary = buf.summary() if clear: try: from IPython.display import clear_output clear_output(wait=True) except Exception: pass if summary: print(summary) return _ctx()