"""Utility functions.
The ``phasorpy.utils`` module provides auxiliary and convenience functions
that do not naturally fit into other modules.
"""
from __future__ import annotations
__all__ = [
'anscombe_transformation',
'anscombe_transformation_inverse',
'number_threads',
'spectral_vector_denoise',
]
import math
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ._typing import Any, NDArray, ArrayLike, DTypeLike, Literal, Sequence
import numpy
from ._phasorpy import (
_anscombe,
_anscombe_inverse,
_anscombe_inverse_approx,
_phasor_from_signal_vector,
_signal_denoise_vector,
)
from ._utils import parse_harmonic
[docs]
def spectral_vector_denoise(
signal: ArrayLike,
/,
spectral_vector: ArrayLike | None = None,
*,
axis: int = -1,
harmonic: int | Sequence[int] | Literal['all'] | str | None = None,
sigma: float = 0.05,
vmin: float | None = None,
dtype: DTypeLike | None = None,
num_threads: int | None = None,
) -> NDArray[Any]:
"""Return spectral-vector-denoised signal.
The spectral vector denoising algorithm is based on a Gaussian weighted
average calculation, with weights obtained in n-dimensional Chebyshev or
Fourier space [4]_.
Parameters
----------
signal : array_like
Hyperspectral data to be denoised.
A minimum of three samples are required along `axis`.
The samples must be uniformly spaced.
spectral_vector : array_like, optional
Spectral vector.
For example, phasor coordinates, PCA projected phasor coordinates,
or Chebyshev coefficients.
Must be of same shape as `signal` with `axis` removed and axis
containing spectral space appended.
If None (default), phasor coordinates are calculated at specified
`harmonic`.
axis : int, optional, default: -1
Axis over which `spectral_vector` is computed if not provided.
The default is the last axis (-1).
harmonic : int, sequence of int, or 'all', optional
Harmonics to include in calculating `spectral_vector`.
If `'all'`, include all harmonics for `signal` samples along `axis`.
Else, harmonics must be at least one and no larger than half the
number of `signal` samples along `axis`.
The default is the first harmonic (fundamental frequency).
A minimum of `harmonic * 2 + 1` samples are required along `axis`
to calculate correct phasor coordinates at `harmonic`.
sigma : float, default: 0.05
Width of Gaussian filter in spectral vector space.
Weighted averages are calculated using the spectra of signal items
within an spectral vector Euclidean distance of `3 * sigma` and
intensity above `vmin`.
vmin : float, optional
Signal intensity along `axis` below which not to include in denoising.
dtype : dtype_like, optional
Data type of output arrays. Either float32 or float64.
The default is float64 unless the `signal` is float32.
num_threads : int, optional
Number of OpenMP threads to use for parallelization.
By default, multi-threading is disabled.
If zero, up to half of logical CPUs are used.
OpenMP may not be available on all platforms.
Returns
-------
ndarray
Denoised signal of `dtype`.
Spectra with integrated intensity below `vmin` are unchanged.
References
----------
.. [4] Harman RC, Lang RT, Kercher EM, Leven P, and Spring BQ.
`Denoising multiplexed microscopy images in n-dimensional spectral space
<https://doi.org/10.1364/BOE.463979>`_.
*Biomedical Optics Express*, 13(8): 4298-4309 (2022)
Examples
--------
Denoise a hyperspectral image with a Gaussian filter width of 0.1 in
spectral vector space using first and second harmonic:
>>> signal = numpy.random.randint(0, 255, (8, 16, 16))
>>> spectral_vector_denoise(signal, axis=0, sigma=0.1, harmonic=[1, 2])
array([[[...]]])
"""
num_threads = number_threads(num_threads)
signal = numpy.asarray(signal)
if axis == -1 or axis == signal.ndim - 1:
axis = -1
else:
signal = numpy.moveaxis(signal, axis, -1)
shape = signal.shape
samples = shape[-1]
if harmonic is None:
harmonic = 1
harmonic, _ = parse_harmonic(harmonic, samples // 2)
num_harmonics = len(harmonic)
if vmin is None or vmin < 0.0:
vmin = 0.0
sincos = numpy.empty((num_harmonics, samples, 2))
for i, h in enumerate(harmonic):
phase = numpy.linspace(
0,
h * math.pi * 2.0,
samples,
endpoint=False,
dtype=numpy.float64,
)
sincos[i, :, 0] = numpy.cos(phase)
sincos[i, :, 1] = numpy.sin(phase)
signal = numpy.ascontiguousarray(signal).reshape(-1, samples)
size = signal.shape[0]
if dtype is None:
if signal.dtype.char == 'f':
dtype = signal.dtype
else:
dtype = numpy.float64
dtype = numpy.dtype(dtype)
if dtype.char not in {'d', 'f'}:
raise ValueError('dtype is not floating point')
if spectral_vector is None:
spectral_vector = numpy.zeros((size, num_harmonics * 2), dtype=dtype)
_phasor_from_signal_vector(
spectral_vector, signal, sincos, num_threads
)
else:
spectral_vector = numpy.ascontiguousarray(spectral_vector, dtype=dtype)
if spectral_vector.shape[:-1] != shape[:-1]:
raise ValueError('signal and spectral_vector shape mismatch')
spectral_vector = spectral_vector.reshape(
-1, spectral_vector.shape[-1]
)
if dtype == signal.dtype:
denoised = signal.copy()
else:
denoised = numpy.zeros(signal.shape, dtype=dtype)
denoised[:] = signal
integrated = numpy.zeros(size, dtype=dtype)
_signal_denoise_vector(
denoised, integrated, signal, spectral_vector, sigma, vmin, num_threads
)
denoised = denoised.reshape(shape)
if axis != -1:
denoised = numpy.moveaxis(denoised, -1, axis)
return denoised
[docs]
def number_threads(
num_threads: int | None = None,
max_threads: int | None = None,
/,
) -> int:
"""Return number of threads for parallel computations on CPU cores.
This function is used to parse ``num_threads`` parameters.
Parameters
----------
num_threads : int, optional
Number of threads to use for parallel computations on CPU cores.
If None (default), return 1, disabling multi-threading.
If greater than zero, return value up to `max_threads` if set.
If zero, return the value of the ``PHASORPY_NUM_THREADS`` environment
variable if set, else half the CPU cores up to `max_threads` or 32.
max_threads : int, optional
Maximum number of threads to return.
Examples
--------
>>> number_threads()
1
>>> number_threads(0) # doctest: +SKIP
8
"""
if num_threads is None or num_threads < 0:
# disable multi-threading by default
return 1
if num_threads == 0:
# return default number of threads
if max_threads is None:
max_threads = 32
else:
max_threads = max(max_threads, 1)
if 'PHASORPY_NUM_THREADS' in os.environ:
return min(
max_threads, max(1, int(os.environ['PHASORPY_NUM_THREADS']))
)
cpu_count: int | None
if hasattr(os, 'sched_getaffinity'):
cpu_count = len(os.sched_getaffinity(0))
else:
# sched_getaffinity not available on Windows
cpu_count = os.cpu_count()
if cpu_count is None:
return 1
return min(max_threads, max(1, cpu_count // 2))
# return num_threads up to max_threads
if max_threads is None:
return num_threads
return min(num_threads, max(max_threads, 1))