Source code for talon.core

# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod

import numpy as np
import scipy.sparse as sp

DATATYPE = np.float64


[docs]def concatenate(operators, axis=0): """Concatenate a sequence of linear operator along axis 0 or 1. This method defines the object that acts as the concatenation of the linear operators contained in the list/tuple `operators` along the chosen `axis`. The syntax is consistent with the one of `np.concatenate`. Args: operators: list or tuple of LinearOperator objects to be concatenated in the same axis. axis: int direction in which we want to concatenate the LinearOperator or ConcatenatedLinearOperator objects that we want to concatenate. Vertical concatenation is obtained for `axis = 0` and horizontal concatenation is obtained for `axis = 1` as in np.concatenate. (Default: 0) Returns: talon.core.ConcatenatedLinearOperator: the concatenated linear operator. """ return ConcatenatedLinearOperator(operators, axis)
def normalize_atoms(generators, indices_of_generators, weights): """ Prepare the input of talon.operator to get a dictionary with normalized atoms. Given a triplet ``(generators, indices_of_generators, weights)``, this function returns a new triplet where the ``weights`` matrix is scaled in such a way that the resulting linear operator has columns with norm equal to 1. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. Returns: generators : the same as the input. indices_of_generators : the same as the input. weights : COO sparse matrix where each column is scaled to get a normalized set of atoms. Raises: ValueError : if there are empty columns in `indices_of_generators`. """ if not (len(np.unique(indices_of_generators.col)) == indices_of_generators.shape[1]): raise ValueError( 'There are empty columns in the `indices_of_generators` matrix.') norms = np.zeros(indices_of_generators.shape[1]) # squared norm of each generator gg = np.square(np.linalg.norm(generators, axis=1)) # squared weights ww = np.square(weights.data) for r, c, i, w in zip( indices_of_generators.row, indices_of_generators.col, indices_of_generators.data, ww): norms[c] += w * gg[i] norms = np.sqrt(norms) new_data = np.zeros(weights.data.size, dtype=DATATYPE) for i, (c, w) in enumerate(zip(weights.col, weights.data)): new_data[i] = w / norms[c] if norms[c] > 0 else 1.0 normalized_weights = sp.coo_matrix( (new_data, (weights.row, weights.col)), shape=weights.shape ) return generators, indices_of_generators, normalized_weights
[docs]def operator(generators, indices_of_generators, weights, operator_type='fast'): """Create a LinearOperator object. This method defines the object that describes the linear operator by means of its fundamental components. These components are a set of generators, a table that encodes the non-zero entries of the operator and indexes the proper generator in each entry and another table that encodes the weight applied to each called generator in the linear operator. Each block of entries of the linear operator A is given by .. math:: A[k\cdot i\dots k\cdot(i+1), j] = g_{T_{i,j}} \cdot w_{i,j} where `k` is the length of the generators, `T` is the table of indices and `w` is the table of weights. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. operator_type (optional): string Operator type to use. Accepted values are ``'fast'`` and ``'reference'``. The latter is intended to be used only for testing purposes. (default = `fast`). Returns: talon.core.LinearOperator: the wanted linear operator. Raises: ValueError: If `reference_type` is not ``'fast'`` or ``'reference'``. """ args = (generators, indices_of_generators, weights) if operator_type == 'fast': return FastLinearOperator(*args) elif operator_type == 'reference': return LinearOperator(*args) raise ValueError('Invalid reference type {}. Should be "fast" or ' '"reference"'.format(operator_type))
class AbstractLinearOperator(ABC): """Abstract class for all linear operators This abstract class defines the interface that all linear operators in talon must implement. """ @property @abstractmethod def shape(self): """Returns the shape of the matrix.""" pass @property @abstractmethod def todense(self): """Returns a dense matrix representation of the linear operator.""" pass @property @abstractmethod def transpose(self): """Returns the transpose of the linear operator.""" pass @property def T(self): """Returns the transpose of the linear operator.""" return self.transpose @abstractmethod def __matmul__(self, x): """Dot product between a linear operator and a vector. The __matmul__ method is expected to compute the dot product between a linear operator and a vector. It is not required to support matrix matrix product. """ pass def convert_x(self, x): """Converts x so that it can be used on the right of a dot product. This method converts x so that it has the right dimensions and type to be used as a right operand of a dot product with a linear operator. That is, it asserts that A @ x will work. Raises exceptions if the input cannot be converted to the correct format. Args: x: The vector to test. Returns: x: A numpy array that can be used in the dot product. Raises: TypeError : If x is not a numpy array. ValueError : If the length of x does not match the number of columns of the linear operator. """ x = np.squeeze(np.asarray(x, dtype=DATATYPE)) # It needs to be a vector. if np.ndim(x) != 1: raise ValueError( f'x must be a 1D vector, but its shape is {x.shape}') if not len(x) == self.shape[1]: raise ValueError( f'Dimension mismatch ({len(x)} != {self.shape[1]})') return x
[docs]class LinearOperator(AbstractLinearOperator):
[docs] def __init__(self, generators, indices_of_generators, weights): """Linear operator that maps tractography to signal space. The linear operator can be used to compute products with a vector. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. Raises: TypeError: If `generators` is not a numpy ndarray of float. TypeError: If `indices_of_generators` is not a COO scipy matrix. TypeError: If `weights` is not a COO scipy matrix of float64. ValueError: If `weights` does not have the same dimension as indices_of_generators. ValueError: If `weights` and `indices_of_generators` don't have the same sparsity pattern. """ if not isinstance(generators, np.ndarray): raise TypeError('Expected type for "generators" is np.ndarray.') if not generators.dtype == DATATYPE: raise TypeError( 'Expected dtype for "generators" is {}.'.format(str(DATATYPE))) self._generators = generators if not sp.isspmatrix_coo(indices_of_generators): raise (TypeError( 'Expected type for "indices_of_generators" is ' 'scipy.sparse.coo_matrix.')) self._indices_of_generators = indices_of_generators.astype(int) if not sp.isspmatrix_coo(weights): raise (TypeError('Expected type for "weights" is np.ndarray.')) if not weights.dtype == DATATYPE: raise TypeError( 'Expected dtype for "weights" is {}.'.format(str(DATATYPE))) if not weights.shape == indices_of_generators.shape: raise ValueError( '"indices_of_generators" and "weights" must have the same' ' dimension') if not ( len(weights.data) == len(indices_of_generators.data) and np.array_equal( sorted(zip(weights.row, weights.col)), sorted(zip(indices_of_generators.row, indices_of_generators.col)))): raise ValueError( '"indices_of_generators" and "weights" must have the same' ' sparsity pattern') self._weights = weights
@property def columns(self): """int: Returns the indices of the nonzero columns.""" return self._indices_of_generators.col @property def nb_generators(self): """int: Number of generators.""" return self._generators.shape[0] @property def generator_length(self): """int: length of each generator (constant across generators).""" return self._generators.shape[1] @property def generators(self): """np.ndarray: Returns the generators of the linear operator.""" return self._generators @property def indices(self): """np.ndarray: Returns the generator indices.""" return self._indices_of_generators.data @property def nb_data(self): """int: Number of data points.""" return self._indices_of_generators.shape[0] @property def nb_atoms(self): """int: Number of atoms (columns) in the linear operator.""" return self._indices_of_generators.shape[1] @property def rows(self): """int: Returns the indices of the nonzero rows.""" return self._indices_of_generators.row @property def shape(self): """:tuple of int: Shape of the linear operator. The shape is given by the number of rows and columns of the linear operator. The number of rows is equal to the number of data points times the length of the generators. The number of columns is equal to the number of atoms. """ return self.nb_data * self.generator_length, self.nb_atoms @property def transpose(self): """TransposedLinearOperator: the transpose of the linear operator.""" return TransposedLinearOperator(self) @property def weights(self): """np.ndarray: The weights of the nonzero elements""" return self._weights.data def __matmul__(self, x): """Matrix vector product (A @ x) Args: x: The right operand of the product. It's length must match the number of columns of the linear operator. Raises: TypeError : If x is not a numpy array. ValueError : If the length of x does not match the number of columns of the linear operator. """ x = self.convert_x(x) product = np.zeros(self.shape[0], dtype=DATATYPE) for row, column, weighted_generator in self: tmp = weighted_generator * x[column] product[self.generator_length * row: self.generator_length * (row + 1)] += tmp return product
[docs] def todense(self): """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: ndarray: full matrix representing the linear operator. """ dense = np.zeros(self.shape, dtype=DATATYPE) length = self.generator_length for row, column, generator in self: dense[length * row: length * (row + 1), column] = generator return dense
def __iter__(self): indices = self._indices_of_generators rows, cols, data = indices.row, indices.col, indices.data weights = self._weights.data for r, c, idx, w in zip(rows, cols, data, weights): yield r, c, self._generators[idx, :] * w
class FastLinearOperator(LinearOperator): def __init__(self, generators, indices_of_generators, weights): """A LinearOperator that computes products quickly. The FastLinearOperator class implements a linear operator optimized to compute matrix-vector products quickly. It is single threaded and written in pure Python, which makes it a good default choice for linear operators. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. Raises: TypeError: If generators is not a numpy ndarray of float64. TypeError: If indices_of_generators is not a COO scipy matrix. TypeError: If weights is not a COO scipy matrix of float64. ValueError: if weights does not have the same dimension as indices_of_generators. ValueError: if weights and indices_of_generators don't have the same sparsity pattern. """ super().__init__(generators, indices_of_generators, weights) # Find the indices of the row which are not empty. This allows the # linear performance to be independent of the number of empty rows. row_indices = np.unique(self.rows) # The product is computed row by row. Here, we precompute which # generators are multiplied by which weight and x, and where the # result is placed. row_elements = [[] for _ in range(self.nb_data)] for i, r in enumerate(self.rows): row_elements[r].append(i) row_elements = [np.array(re) for re in row_elements if len(re) != 0] # The indices of the generator, for each row. row_generators = [self.indices[r] for r in row_elements] # The indices of nonzero columns for each row. row_columns = [self.columns[r] for r in row_elements] # The weights of the nonzero elements for each row. row_weights = [self.weights[r] for r in row_elements] length = self.generator_length def row_slice(row): return slice(length * row, length * (row + 1)) row_slices = [row_slice(r) for r in row_indices] self._row = list(zip(row_columns, row_generators, row_weights, row_slices)) @property def transpose(self): """TransposedFastLinearOperator: transpose of the linear operator.""" return TransposedFastLinearOperator(self) def __matmul__(self, x): """Matrix vector product (A @ x) Args: x: The right operand of the product. It's length must match the number of columns of the linear operator. Raises: TypeError : If x is not a numpy array. ValueError : If the length of x does not match the number of columns of the linear operator. """ x = self.convert_x(x) product = np.zeros(self.shape[0], dtype=DATATYPE) for elements, generator_indices, weights, row_slice in self._row: row_x = x[elements] * weights row_generators = self.generators[generator_indices, :] product[row_slice] = np.dot(row_generators.T, row_x) return product class TransposedLinearOperator(AbstractLinearOperator): def __init__(self, linear_operator): """Transposed of a LinearOperator object. Args: linear_operator : the LinearOperator object of which the transpose is wanted. """ self._linear_operator = linear_operator @property def shape(self): return self._linear_operator.shape[::-1] def __matmul__(self, y): """Matrix vector product (A.T @ y) Args: y: The right operand of the product. It's length must match the number of columns of the transposed linear operator. Raises: TypeError if y is not a numpy array. ValueError if the length of y does not match the number of columns of the transposed linear operator. """ y = self.convert_x(y) genlen = self._linear_operator.generator_length product = np.zeros(self.shape[0], dtype=DATATYPE) for row, col, weighted_generator in self._linear_operator: indices_range = range(genlen * row, genlen * (row + 1)) product[col] += weighted_generator.dot(y[indices_range]) return product def todense(self): """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: ndarray: full matrix representing the linear operator. """ return self._linear_operator.todense().T @property def transpose(self): """LinearOperator: transpose of the transposed linear operator.""" return self._linear_operator class TransposedFastLinearOperator(TransposedLinearOperator): def __init__(self, linear_operator): """Transposed of a LinearOperator object. Args: linear_operator : the LinearOperator object of which the transpose is wanted. """ super().__init__(linear_operator) def __matmul__(self, y): """Matrix vector product (A.T @ y) Args: y: The right operand of the product. It's length must match the number of columns of the transposed linear operator. Raises: TypeError if y is not a numpy array. ValueError if the length of y does not match the number of columns of the transposed linear operator. """ y = self.convert_x(y) product = np.zeros(self.shape[0], dtype=DATATYPE) for (elements, generator_indices, weights, row_slice) in self._linear_operator._row: row_y = y[row_slice] row_generators = self._linear_operator.generators[ generator_indices, :] product[elements] += row_generators.dot(row_y) * weights return product
[docs]class ConcatenatedLinearOperator(AbstractLinearOperator):
[docs] def __init__(self, operators, axis): """Concatenated LinearOperator object The ConcatenatedLinearOperator class implements the vertical or horizontal concatenation of LinearOperator objects. It is endowed with the multiplication operation (@). Args: operators: list or tuple of LinearOperator objects to be concatenated in the same axis. axis: int direction in which we want to concatenate the LinearOperator or ConcatenatedLinearOperator objects that we want to concatenate. Vertical concatenation is obtained for `axis = 0` and horizontal concatenation is obtained for `axis = 1` as in np.concatenate. (Default: 0) Raises: TypeError: If any element of `operator` is not an instance of LinearOperator or ConcatenatedLinearOperator. TypeError: If `operators` is not a list or a tuple. ValueError: If `axis` is not 0 or 1. ValueError: If `operators` is an empty list or tuple. ValueError: If the operators do not have compatible dimensions. """ if not type(operators) in [list, tuple]: raise TypeError('Expected type for `operators` is list or tuple.') if axis not in [0, 1]: raise ValueError('Expected value for `axis` is 0 or 1.') if len(operators) < 1: raise ValueError('List of operators must be non-empty.') def good_instance(op): return (isinstance(op, LinearOperator) or isinstance(op, TransposedLinearOperator) or isinstance(op, ConcatenatedLinearOperator)) if not all(map(good_instance, operators)): raise TypeError('All concatenated operators must be either ' 'LinearOperator objects or ' 'ConcatenatedLinearOperator objects.') if len(np.unique([op.shape[int(not axis)] for op in operators])) != 1: raise ValueError('Trying to concatenate linear operators with ' 'non compatible dimensions.') self._axis = axis self._operators = operators self._slices = [] self._transposed_operators = [op.T for op in self._operators] start_index = 0 for linear_operator in self._operators: stop_index = start_index + linear_operator.shape[self._axis] self._slices.append(slice(start_index, stop_index)) start_index = stop_index
def __matmul__(self, x): """Matrix vector product (A @ x) Args: x: The right operand of the product. It's length must match the number of columns of the concatenated linear operator. Raises: TypeError if x is not a numpy array. ValueError if the length of x does not match the number of columns of the concatenated linear operator. """ x = self.convert_x(x) product = np.zeros(self.shape[0], dtype=DATATYPE) if self._axis == 0: for indices, linear_operator in zip(self._slices, self.operators): product[indices] = linear_operator @ x else: for indices, linear_operator in zip(self._slices, self.operators): product += linear_operator @ x[indices] return product @property def axis(self): """int: axis in which the concatenation was performed.""" return self._axis @property def operators(self): """list: list of concatenated operators.""" return self._operators @property def shape(self): """:tuple of int: Shape of the concatenated linear operator. """ n_rows = np.sum([block.shape[self.axis] for block in self._operators]) n_columns = self._operators[0].shape[int(not self.axis)] the_shape = n_rows, n_columns if self.axis: the_shape = the_shape[::-1] return the_shape @property def transpose(self): """TransposedConcatenatedLinearOperator: transpose of the linear operator.""" return TransposedConcatenatedLinearOperator( self, self._transposed_operators)
[docs] def todense(self): """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: ndarray: full matrix representing the linear operator. """ all_dense = [] for op in self.operators: all_dense.append(op.todense()) return np.concatenate(all_dense, self.axis)
class TransposedConcatenatedLinearOperator(ConcatenatedLinearOperator): def __init__(self, concatenated_operator, transposed_operators): """Transposed of a ConcatenatedLinearOperator object. Args: concatenated_operator: the ConcatenatedLinearOperator object of which the transpose is wanted. """ self._concatenated_linear_operator = concatenated_operator axis = int(not self._concatenated_linear_operator.axis) super().__init__(transposed_operators, axis) @property def transpose(self): """LinearOperator: transpose of the transposed linear operator.""" return self._concatenated_linear_operator