Source code for talon.optimization

# -*- coding: utf-8 -*-
from abc import ABC
from enum import Enum

import numpy as np
from pyunlocbox import acceleration, functions, solvers
from scipy.optimize import OptimizeResult

import talon

# These keys are lambda functions in the original pyunlocbox.functions.func
# class that must be removed before serialization.
PYUNLOCBOX_KEYS_TO_REMOVE = ['y', 'A', 'At']


[docs]class RegularizationTerm(functions.func, ABC):
[docs] def __init__(self, regularization_parameter: float): """Abstract base class for all regularization terms The optimization problem solved by `talon` is .. math:: \min_x 0.5 \|A x - y\|^2 + \Omega(x) where :math:`\Omega` is a regularization term. This class is the base for all concrete implementations of this term. Args: regularization_parameter: float The scaling factor of the regularization term. Must be a float greater or equal to zero. Raises: TypeError: If the regularization parameter is not a float and cannot be converted to a float. ValueError: If the regularization parameter is negative. """ super().__init__() # Verify that the regularization parameter is a non-negative float. try: regularization_parameter = float(regularization_parameter) except (TypeError, ValueError): raise TypeError( f'The regularization parameter must be a float, not a ' f'{type(regularization_parameter)}.') if regularization_parameter < 0: raise ValueError( f'The regularization parameter must be non-negative ' f'({regularization_parameter} < 0).') self._regularization_parameter = regularization_parameter self._groups = None self._non_negativity = False self._weights = None
def __call__(self, x: np.ndarray) -> float: """ Call the function that defines the regularization term. Args: x: np.ndarray The vector on which the function must be evaluated. Returns: """ return self._eval(x) def __getstate__(self): clean_dict = self.__dict__.copy() for k in PYUNLOCBOX_KEYS_TO_REMOVE: clean_dict.pop(k, None) return clean_dict def __repr__(self): return f'{self.__class__.__name__}()' @property def groups(self) -> list: """ Get the group structure associated to the regularization term. Returns: list List of lists of streamline indices. """ return self._groups @property def non_negativity(self) -> bool: """ Get the activation of the non-negativity constraint. Returns: bool True if the non-negativity constraint is employed, False otherwise. """ return self._non_negativity @property def regularization_parameter(self) -> float: """ Get the regularization parameter. Returns: float The value of the regularization parameter. """ return self._regularization_parameter @property def weights(self) -> np.ndarray: """ Get the weight associated to each group. Returns: np.ndarray 1-dimensional numpy array with one weight per group. """ return self._weights
[docs]class NoRegularization(RegularizationTerm):
[docs] def __init__(self): """ Instantiates the zero-valued regularization term. .. math:: \Omega(x) = 0 """ super().__init__(0.0)
def _eval(self, x: np.ndarray): return 0.0 def _grad(self, x: np.ndarray): raise NotImplementedError( 'The gradient is not available for this regularization term.') def _prox(self, x: np.ndarray, _=None): return x
[docs]class NonNegativity(NoRegularization):
[docs] def __init__(self): """ Instantiates the non-negativity constraint regularization function. .. math:: \Omega(x) = \iota_{\ge 0}(x) """ super().__init__() self._non_negativity = True
def _grad(self, x: np.ndarray): raise NotImplementedError( 'The gradient is not available for this regularization term.') def _prox(self, x: np.ndarray, _=None): return np.maximum(x, 0.0)
[docs]class StructuredSparsity(RegularizationTerm):
[docs] def __init__(self, regularization_parameter: float, groups: list, weights: np.ndarray): """ Instantiates the structured sparsity regularization term. .. math:: \Omega(x) = \lambda \cdot \sum_{g\in G}w_g\cdot \|x_g\|_2 Args: regularization_parameter: float Value of the regularization parameter. groups: list List of lists of streamline indices. weights: np.ndarray 1-dimensional numpy array with one weight per group. """ super().__init__(regularization_parameter) self._groups = groups self._weights = weights
def _eval(self, x: np.ndarray): costs = [w * np.sqrt(np.sum(x[g] ** 2)) for g, w in zip(self._groups, self._weights)] return self._regularization_parameter * np.sum(costs) def _grad(self, x: np.ndarray): raise NotImplementedError( 'The gradient is not available for this regularization term.') def _prox(self, x: np.ndarray, mu: float): v = x.copy() for g, w in zip(self._groups, self._weights): xn = np.sqrt(np.dot(v[g], v[g])) r = mu * self.regularization_parameter * w if xn > r: v[g] -= v[g] * r / xn else: v[g] = 0.0 return v def __repr__(self): return f'{self.__class__.__name__}(' \ f'regularization_parameter={self.regularization_parameter}, ' \ f'n_groups={len(self._groups)})'
[docs]class NonNegativeStructuredSparsity(StructuredSparsity):
[docs] def __init__(self, regularization_parameter, groups, weights): """ Instantiates the non-negative structured sparsity regularization term. .. math:: \Omega(x) = \iota_{\ge 0}(x) + \ \lambda \cdot \sum_{g\in G}w_g\cdot \|x_g\|_2 Args: regularization_parameter: float Value of the regularization parameter. groups: list List of lists of streamline indices. weights: np.ndarray 1-dimensional numpy array with one weight per group. """ super().__init__(regularization_parameter, groups, weights) self._non_negativity = True
def _grad(self, x: np.ndarray): raise NotImplementedError( 'The gradient is not available for this regularization term.') def _prox(self, x: np.ndarray, mu: float): v = np.maximum(x, 0.0) return super()._prox(v, mu) def __repr__(self): return f'{self.__class__.__name__}(' \ f'regularization_parameter={self.regularization_parameter}, ' \ f'n_groups={len(self._groups)}, ' \ f'non_negativity=True)'
[docs]def regularization(non_negativity=False, regularization_parameter=None, groups=None, weights=None): """Get regularization term for the optimization problem. By default this method returns an object encoding the regularization term .. math:: \Omega(x) = 0 . If `regularization_parameter`, `groups` and `weights` are all not None it returns the structured sparsity regularization. .. math:: \Omega(x) = \lambda \sum_{g\in G} w_g \|x_g\|_2 where :math:`\lambda` is `regularization_parameter`, :math:`w_g` is the entry of `w` associated to `g`, :math:`x_g` is the subset of entries of *x* encoded by the indices of *g* and `G` is the list of groups. If non_negativity is True it adds the non-negativity constraint to the regularization term. .. math:: \Omega(x) \leftarrow \Omega(x) + \iota_{\ge 0}(x) . Args: non_negativity: boolean (default = False) regularization_parameter: float. Must be >= 0 (default = None) groups: list of lists where each element encodes the indices of the streamlines belonging to a single group. (default = None). E.g.: ``groups = [[0,2,5], [1,3,4,6], [7,8,9]]``. weights: ndarray of the same length as `groups`. Weight associated to each group. (default = None) Returns: instance of one between * ``talon.optimize.NoRegularization``; * ``talon.optimize.NonNegativity``; * ``talon.optimize.StructuredSparsity``; * ``talon.optimize.NonNegativeStructuredSparsity``. Raises: ValueError: If weights and groups do not have the same length. ValueError: If regularization_parameter < 0 . """ if regularization_parameter is None or groups is None or weights is None: is_structure_sparsity = False elif regularization_parameter == 0.0: is_structure_sparsity = False elif regularization_parameter < 0.0: raise ValueError('Negative regularization parameter not allowed.') elif len(groups) == len(weights): is_structure_sparsity = True else: raise ValueError('`groups` and `weights` are not coherent objects. ' 'Check the type and the length.') if non_negativity: if is_structure_sparsity: return NonNegativeStructuredSparsity(regularization_parameter, groups, weights) else: return NonNegativity() else: if is_structure_sparsity: return StructuredSparsity(regularization_parameter, groups, weights) else: return NoRegularization()
[docs]class ExitStatus(Enum): """ Exit criteria of the optimization routine. """ UNKNOWN = -1 MAXIMUM_NUMBER_ITERATIONS = 0 ABSOLUTE_TOLERANCE_COST = 1 RELATIVE_TOLERANCE_COST = 2 ABSOLUTE_TOLERANCE_X = 3
def _pyunlocbox_to_scipy_result(pub_result: dict, **kwargs): sp_result = OptimizeResult() if pub_result['crit'] == 'DTOL': new_status = ExitStatus.ABSOLUTE_TOLERANCE_COST elif pub_result['crit'] == 'RTOL': new_status = ExitStatus.RELATIVE_TOLERANCE_COST elif pub_result['crit'] == 'XTOL': new_status = ExitStatus.ABSOLUTE_TOLERANCE_X elif pub_result['crit'] == 'MAXIT': new_status = ExitStatus.MAXIMUM_NUMBER_ITERATIONS else: new_status = ExitStatus.UNKNOWN sp_result['status'] = new_status sp_result['success'] = sp_result['status'].value > 0 sp_result['message'] = pub_result['crit'] sp_result['fun'] = pub_result['objective'][-1] sp_result['nit'] = pub_result['niter'] sp_result['x'] = pub_result['sol'] for key, value in kwargs.items(): sp_result[key] = value return sp_result
[docs]def solve(linear_operator: talon.core.LinearOperator, data: np.ndarray, reg_term: RegularizationTerm = None, cost_reltol: float = 1e-6, x_abstol: float = 1e-6, max_nit: int = 1000, x0: np.ndarray = None, verbose: str = 'LOW') -> OptimizeResult: """Fit the solution. This routine finds the `x` that solves the problem .. math:: \min_x 0.5 \|A x - y\|^2 + \Omega(x) where `x` is the vector of coefficients to be retrieved, `A` is the linear operator, `y` is the data vector and :math:`\Omega` is defined as in ``talon.regularization``. Args: linear_operator: linear operator endowed with the @ operation. data: ndarray of data to be fit. First dimension must be compatible with the second of `linear_operator`. reg_term: regularization term defined by talon.regularization. (default: :math:`\Omega(x) = 0.0`) cost_reltol: float relative tolerance on the cost (default = 1e-6). x_abstol: float mean abs tolerance on the variable (default = 1e-6). max_nit: int maximum number of iterations (default = 1000). x0: ndarray starting value for the optimization. The length must be the equal to the second dimension of `linear_operator`. (default=zeros) verbose : {'NONE', 'LOW', 'HIGH', 'ALL'} The log level : ``'NONE'`` for no log, ``'LOW'`` for resume at convergence, ``'HIGH'`` for info at all solving steps, ``'ALL'`` for all possible outputs, including at each steps of the proximal operators computation (default='LOW'). Return: scipy.optimize.OptimizeResult: dictionary with the following fields * x : estimated minimizer of the cost function. * status : attribute of talon.optimization.ExitStatus enumeration. * message : string that explains the reason for termination. * fun : evaluation of each term at the minimizer. * nit : number of performed iterations. * reg_param: value of the regularization parameter. """ fit_term = functions.norm_l2(y=data, lambda_=0.5) fit_term.A = lambda x: linear_operator @ x fit_term.At = lambda x: linear_operator.T @ x if reg_term is None: reg_term = regularization() my_acceleration = acceleration.fista_backtracking() the_solver = solvers.forward_backward(accel=my_acceleration, step=0.5) if x0 is None: x0 = np.zeros(linear_operator.shape[1], dtype=talon.core.DATATYPE) solution = solvers.solve([fit_term, reg_term], x0, the_solver, rtol=cost_reltol, xtol=x_abstol, maxit=max_nit, verbosity=verbose) return _pyunlocbox_to_scipy_result(solution, regularization_term=reg_term)