Source code for talon.core

# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg

DATATYPE = np.float64


[docs]def concatenate(operators, axis=0): """Concatenate a sequence of linear operator along axis 0 or 1. This method defines the object that acts as the concatenation of the linear operators contained in the list/tuple `operators` along the chosen `axis`. The syntax is consistent with the one of `np.concatenate`. Args: operators: list or tuple of LinearOperator objects to be concatenated in the same axis. axis: int direction in which we want to concatenate the LinearOperator or ConcatenatedLinearOperator objects that we want to concatenate. Vertical concatenation is obtained for `axis = 0` and horizontal concatenation is obtained for `axis = 1` as in np.concatenate. (Default: 0) Returns: talon.core.ConcatenatedLinearOperator: the concatenated linear operator. """ return ConcatenatedLinearOperator(operators, axis)
def normalize_atoms(generators, indices_of_generators, weights): """ Prepare the input of talon.operator to get a dictionary with normalized atoms. Given a triplet ``(generators, indices_of_generators, weights)``, this function returns a new triplet where the ``weights`` matrix is scaled in such a way that the resulting linear operator has columns with norm equal to 1. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. Returns: generators : the same as the input. indices_of_generators : the same as the input. weights : COO sparse matrix where each column is scaled to get a normalized set of atoms. Raises: ValueError : if there are empty columns in `indices_of_generators`. """ if not (len(np.unique(indices_of_generators.col)) == indices_of_generators.shape[1]): raise ValueError( 'There are empty columns in the `indices_of_generators` matrix.') norms = np.zeros(indices_of_generators.shape[1]) # squared norm of each generator gg = np.square(np.linalg.norm(generators, axis=1)) # squared weights ww = np.square(weights.data) for r, c, i, w in zip( indices_of_generators.row, indices_of_generators.col, indices_of_generators.data, ww): norms[c] += w * gg[i] norms = np.sqrt(norms) new_data = np.zeros(weights.data.size, dtype=DATATYPE) for i, (c, w) in enumerate(zip(weights.col, weights.data)): new_data[i] = w / norms[c] if norms[c] > 0 else 1.0 normalized_weights = sp.coo_matrix( (new_data, (weights.row, weights.col)), shape=weights.shape ) return generators, indices_of_generators, normalized_weights
[docs]class AbstractLinearOperator(ABC, sp.linalg.LinearOperator): """Abstract class for all linear operators This abstract class defines the interface that all linear operators in talon must implement. """ @property @abstractmethod def data_mask(self): """Returns the mask to apply to the data to keep only the entries covered by the linear operator.""" pass @property @abstractmethod def shape(self): """Returns the shape of the matrix.""" pass @property @abstractmethod def todense(self): """Returns a dense matrix representation of the linear operator.""" pass @property @abstractmethod def transpose(self): """Returns the transpose of the linear operator.""" pass @property def T(self): """Returns the transpose of the linear operator.""" return self.transpose @abstractmethod def __matmul__(self, x): """Dot product between a linear operator and a vector. The __matmul__ method is expected to compute the dot product between a linear operator and a vector. It is not required to support matrix matrix product. """ pass
[docs] def convert_x(self, x): """Converts x so that it can be used on the right of a dot product. This method converts x so that it has the right dimensions and type to be used as a right operand of a dot product with a linear operator. That is, it asserts that A @ x will work. Raises exceptions if the input cannot be converted to the correct format. Args: x: The vector to test. Returns: x: A numpy array that can be used in the dot product. Raises: TypeError : If x is not a numpy array. ValueError : If the length of x does not match the number of columns of the linear operator. """ x = np.squeeze(np.asarray(x, dtype=DATATYPE)) # It needs to be a vector. if np.ndim(x) != 1: raise ValueError( f'x must be a 1D vector, but its shape is {x.shape}') if not len(x) == self.shape[1]: raise ValueError( f'Dimension mismatch ({len(x)} != {self.shape[1]})') return x
@property def dtype(self): """Returns the data type of the linear operator""" return DATATYPE def __repr__(self): return f'{self.__class__.__name__}(shape={self.shape})' def _adjoint(self): """Conjugate transpose for scipy compatibility""" # Because we only deal with real matrices, just return the transpose. return self.transpose def _matvec(self, v): """Matrix vector product for scipy compatibility""" return self @ v
[docs]class LinearOperator(AbstractLinearOperator):
[docs] def __init__(self, generators, indices_of_generators, weights): """Linear operator that maps tractography to signal space. The linear operator can be used to compute products with a vector. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. Raises: TypeError: If `generators` is not a numpy ndarray of float. TypeError: If `indices_of_generators` is not a COO scipy matrix. TypeError: If `weights` is not a COO scipy matrix of float64. ValueError: If `weights` does not have the same dimension as indices_of_generators. """ if not isinstance(generators, np.ndarray): raise TypeError('Expected type for "generators" is np.ndarray.') if not generators.dtype == DATATYPE: raise TypeError( 'Expected dtype for "generators" is {}.'.format(str(DATATYPE))) self._generators = generators if not sp.isspmatrix_coo(indices_of_generators): raise (TypeError( 'Expected type for "indices_of_generators" is ' 'scipy.sparse.coo_matrix.')) self._indices_of_generators = indices_of_generators.astype(int) if not sp.isspmatrix_coo(weights): raise (TypeError('Expected type for "weights" is np.ndarray.')) if not weights.dtype == DATATYPE: raise TypeError( 'Expected dtype for "weights" is {}.'.format(str(DATATYPE))) if not weights.shape == indices_of_generators.shape: raise ValueError( '"indices_of_generators" and "weights" must have the same' ' dimension') self._weights = weights mask = np.zeros((self._indices_of_generators.shape[0],), dtype=bool) mask[np.unique(self.rows)] = True self._data_mask = np.kron(mask, np.ones(self.generator_length, dtype=bool))
@property def data_mask(self): """Returns the mask to apply to the data to keep only the entries covered by the linear operator.""" return self._data_mask @property def columns(self): """int: Returns the indices of the nonzero columns.""" return self._indices_of_generators.col @property def nb_generators(self): """int: Number of generators.""" return self._generators.shape[0] @property def generator_length(self): """int: length of each generator (constant across generators).""" return self._generators.shape[1] @property def generators(self): """np.ndarray: Returns the generators of the linear operator.""" return self._generators @property def indices(self): """np.ndarray: Returns the generator indices.""" return self._indices_of_generators.data @property def nb_data(self): """int: Number of data points.""" return self._indices_of_generators.shape[0] @property def nb_atoms(self): """int: Number of atoms (columns) in the linear operator.""" return self._indices_of_generators.shape[1] @property def rows(self): """int: Returns the indices of the nonzero rows.""" return self._indices_of_generators.row @property def shape(self): """:tuple of int: Shape of the linear operator. The shape is given by the number of rows and columns of the linear operator. The number of rows is equal to the number of data points times the length of the generators. The number of columns is equal to the number of atoms. """ return self.nb_data * self.generator_length, self.nb_atoms @property def transpose(self): """_TransposedLinearOperator: the transpose of the linear operator.""" return _TransposedLinearOperator(self) @property def weights(self): """np.ndarray: The weights of the nonzero elements""" return self._weights.data def __matmul__(self, x): """Matrix vector product (A @ x) Args: x: The right operand of the product. It's length must match the number of columns of the linear operator. Raises: TypeError : If x is not a numpy array. ValueError : If the length of x does not match the number of columns of the linear operator. """ x = self.convert_x(x) product = np.zeros(self.shape[0], dtype=DATATYPE) for row, column, weighted_generator in self: tmp = weighted_generator * x[column] product[self.generator_length * row: self.generator_length * (row + 1)] += tmp return product
[docs] def todense(self): """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: ndarray: full matrix representing the linear operator. """ dense = np.zeros(self.shape, dtype=DATATYPE) length = self.generator_length for row, column, generator in self: dense[length * row: length * (row + 1), column] = generator return dense
def __iter__(self): indices = self._indices_of_generators rows, cols, data = indices.row, indices.col, indices.data weights = self._weights.data for r, c, idx, w in zip(rows, cols, data, weights): yield r, c, self._generators[idx, :] * w
class _TransposedLinearOperator(AbstractLinearOperator): def __init__(self, linear_operator): """Transposed of a LinearOperator object. Args: linear_operator : the LinearOperator object of which the transpose is wanted. """ self._linear_operator = linear_operator self._data_mask = np.ones(self.shape[0], dtype=bool) @property def data_mask(self): """Returns the mask to apply to the data to keep only the entries covered by the linear operator.""" return self._data_mask @property def shape(self): return self._linear_operator.shape[::-1] def __matmul__(self, y): """Matrix vector product (A.T @ y) Args: y: The right operand of the product. It's length must match the number of columns of the transposed linear operator. Raises: TypeError if y is not a numpy array. ValueError if the length of y does not match the number of columns of the transposed linear operator. """ y = self.convert_x(y) genlen = self._linear_operator.generator_length product = np.zeros(self.shape[0], dtype=DATATYPE) for row, col, weighted_generator in self._linear_operator: indices_range = range(genlen * row, genlen * (row + 1)) product[col] += weighted_generator.dot(y[indices_range]) return product def todense(self): """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: ndarray: full matrix representing the linear operator. """ return self._linear_operator.todense().T @property def transpose(self): """LinearOperator: transpose of the transposed linear operator.""" return self._linear_operator
[docs]class ConcatenatedLinearOperator(AbstractLinearOperator):
[docs] def __init__(self, operators, axis): """Concatenated LinearOperator object The ConcatenatedLinearOperator class implements the vertical or horizontal concatenation of LinearOperator objects. Args: operators: list or tuple of LinearOperator objects to be concatenated in the same axis. axis: int direction in which we want to concatenate the LinearOperator or ConcatenatedLinearOperator objects that we want to concatenate. Vertical concatenation is obtained for `axis = 0` and horizontal concatenation is obtained for `axis = 1` as in np.concatenate. (Default: 0) Raises: TypeError: If any element of `operator` is not an instance of LinearOperator or ConcatenatedLinearOperator. TypeError: If `operators` is not a list or a tuple. ValueError: If `axis` is not 0 or 1. ValueError: If `operators` is an empty list or tuple. ValueError: If the operators do not have compatible dimensions. """ if not type(operators) in [list, tuple]: raise TypeError('Expected type for `operators` is list or tuple.') if axis not in [0, 1]: raise ValueError('Expected value for `axis` is 0 or 1.') if len(operators) < 1: raise ValueError('List of operators must be non-empty.') if not all([isinstance(o, AbstractLinearOperator) for o in operators]): raise TypeError("""All concatenated operators must be linear operators.""") if len(np.unique([op.shape[int(not axis)] for op in operators])) != 1: raise ValueError('Trying to concatenate linear operators with ' 'non compatible dimensions.') self._axis = axis self._operators = operators self._slices = [] self._transposed_operators = [op.T for op in self._operators] start_index = 0 for linear_operator in self._operators: stop_index = start_index + linear_operator.shape[self._axis] self._slices.append(slice(start_index, stop_index)) start_index = stop_index if self.axis == 0: masks = [op.data_mask for op in self.operators] self._data_mask = np.concatenate(masks, axis=0) else: self._data_mask = np.zeros_like(self.operators[0].data_mask, dtype=bool) for op in self.operators: self._data_mask += op.data_mask.astype(bool)
def __matmul__(self, x): """Matrix vector product (A @ x) Args: x: The right operand of the product. It's length must match the number of columns of the concatenated linear operator. Raises: TypeError if x is not a numpy array. ValueError if the length of x does not match the number of columns of the concatenated linear operator. """ x = self.convert_x(x) product = np.zeros(self.shape[0], dtype=DATATYPE) if self._axis == 0: for indices, linear_operator in zip(self._slices, self.operators): product[indices] = linear_operator @ x else: for indices, linear_operator in zip(self._slices, self.operators): product += linear_operator @ x[indices] return product @property def axis(self): """int: axis in which the concatenation was performed.""" return self._axis @property def operators(self): """list: list of concatenated operators.""" return self._operators @property def shape(self): """:tuple of int: Shape of the concatenated linear operator. """ n_rows = np.sum([block.shape[self.axis] for block in self._operators]) n_columns = self._operators[0].shape[int(not self.axis)] the_shape = n_rows, n_columns if self.axis: the_shape = the_shape[::-1] return the_shape @property def transpose(self): """TransposedConcatenatedLinearOperator: transpose of the linear operator.""" return _TransposedConcatenatedLinearOperator( self, self._transposed_operators) @property def data_mask(self): """Returns the mask to apply to the data to keep only the entries covered by the linear operator.""" return self._data_mask
[docs] def todense(self): """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: ndarray: full matrix representing the linear operator. """ all_dense = [] for op in self.operators: all_dense.append(op.todense()) return np.concatenate(all_dense, self.axis)
class _TransposedConcatenatedLinearOperator(ConcatenatedLinearOperator): def __init__(self, concatenated_operator, transposed_operators): """Transposed of a ConcatenatedLinearOperator object. Args: concatenated_operator: the ConcatenatedLinearOperator object of which the transpose is wanted. """ self._concatenated_linear_operator = concatenated_operator axis = int(not self._concatenated_linear_operator.axis) super().__init__(transposed_operators, axis) @property def transpose(self): """LinearOperator: transpose of the transposed linear operator.""" return self._concatenated_linear_operator