Source code for talon.opencl

from collections import namedtuple
from typing import Iterable
from typing import Optional
from typing import Tuple
from typing import Type

import numpy as np
import pyopencl as cl

import talon.core
from talon.core import AbstractLinearOperator
from talon.core import DATATYPE


# The size of the device data type.
TYPE_SIZE = 4

# Create the global OpenCL context.
_context = cl.create_some_context(interactive=False)
_queue = cl.CommandQueue(_context)

# Get the number of compute units for the selected device.
_device = _context.devices[0]
_nb_units = _device.max_compute_units

# The OpenCL program used by the product.
_programs = cl.Program(_context, """
__kernel void fast_dot(__global const float *x,
                       __global const uint *x_indices,
                       __global const float *weights,
                       __global const float *generators, 
                       __global const uint *generator_indices,
                       __global const uint *row_ends,
                       __global float *b,
                       uint generator_length, 
                       uint b_offset)
{
    uint gid = get_global_id(0);
    uint group_id = (gid + b_offset) / generator_length;
    uint lid = (gid + b_offset) % generator_length;
    
    uint x_end = row_ends[group_id];
    uint x_start = 0;
    if (group_id != 0) {
        x_start = row_ends[group_id - 1];
    }
    
    float current_result = 0;
    uint generator_id;
    
    for (uint j = x_start; j < x_end; j++) {
        generator_id = generator_indices[j];
        current_result += 
            generators[lid + generator_id * generator_length] *
            x[x_indices[j]] * weights[j];
    }
    b[gid] = current_result;
}

__kernel void transpose_dot(__global const float4 *y,
                            __global const uint *y_indices,
                            __global const float4 *generators, 
                            __global const uint *generator_indices,
                            __global const float *weights,
                            __global const uint *row_starts,
                            __global float *x,
                            uint generator_length,
                            uint y_offset)
{
    uint gid = get_global_id(0);
    uint group_id = gid / 8;
    uint lid = get_local_id(0);
    
    uint start = 0;
    uint end = row_starts[group_id];
    
    // If we are not on the first row, use the previous end as a start.
    if (group_id != 0) {
        start = row_starts[group_id - 1];
    }
    
    // The number of products per local thread.
    uint nb_products = generator_length / 4 / 8 + 1;
    
    float current_result = 0;
    local float current_gen_result[8];
    uint generator_start;
    uint y_start;
    uint k;
    for (uint i = start; i < end; i++) {
    
        current_gen_result[lid] = 0;
        generator_start = generator_indices[i] * generator_length / 4;
        y_start = (y_indices[i] - y_offset) / 4;
        for (uint j = 0; j < nb_products; j++) {
            k = lid * nb_products + j;
            if (k < generator_length / 4) {
                current_gen_result[lid] += 
                    dot(generators[generator_start + k], y[y_start + k]);
            }
        }
        
        barrier(CLK_LOCAL_MEM_FENCE);
        
        if (lid == 0) {
            float weight = weights[i];
            for (uint m = 0; m < 8; m++) {
                current_result += current_gen_result[m] * weight;
            }
        }
    }
    
    if (lid == 0) { 
        x[group_id] = current_result;
    }
}
""").build()


def _assert_enough_memory(generators, indices, chunk_size) -> None:
    """Asserts that the device has enough memory to hold a linear operator

    Asserts that the device has enough memory to hold a linear operator and
    its transpose (some memory is shared by the two). If there is not enough
    memory an exception is raised.

    Args:
        generators: The generators of the linear operator.
        indices: The generator indices of the linear operator.
        chunk_size : The product is computed by splitting the linear
            operator into chunks. This parameter determines the approximate
            chunk size. Reducing this value reduces the amount of memory
            required on the device.

    Raises:
        MemoryError: If the memory on the device is not sufficient.

    """

    # The input and output (x and y) are shared between the linear operator
    # and its transpose.
    nb_rows, nb_columns = indices.shape
    nb_generators, gen_length = generators.shape
    x_memory = nb_rows * TYPE_SIZE
    y_chunks = _chunk_sizes(chunk_size, gen_length, nb_rows) * TYPE_SIZE
    y_memory = np.sum(y_chunks)
    y_chunk_memory = np.max(y_chunks)

    # The memory for the generators is also shared.
    gen_memory = generators.size * TYPE_SIZE

    # The indices, weights, and the [x y] indices have the same size. They are
    # kept once for the linear operator and once for the transpose.
    indices_memory = len(indices.data) * TYPE_SIZE

    # We need to keep where each row starts (cols for the transpose).
    rows_start_memory = indices.shape[0] * TYPE_SIZE
    nb_chunks = len(y_chunks)
    cols_start_memory = indices.shape[1] * nb_chunks * TYPE_SIZE

    # The total amount of memory needed.
    required_memory = (
        x_memory + y_memory + gen_memory + indices_memory * 6 +
        rows_start_memory + cols_start_memory)

    # Verify if the buffer can be created individually.
    single_object_max = _device.max_mem_alloc_size
    if x_memory > single_object_max:
        raise MemoryError(
            f'The memory required for the coefficient vector is greater '
            f'than the memory than can be allocated to a single object on '
            f'your device ({x_memory} > {single_object_max}).')

    # The memory for the y is always split. We make sure we can store each
    # chunk.
    if y_chunk_memory > single_object_max:
        raise MemoryError(
            f'The memory required for the result of the product is greater '
            f'than the memory than can be allocated to a single object on '
            f'your device ({y_memory} > {single_object_max}). This may be '
            f'fixed by reducing the chunk size.')

    if indices_memory > single_object_max:
        raise MemoryError(
            f'The memory required for the index array is greater that the '
            f'memory than can be allocated to a single object on your device '
            f'({indices_memory} > {single_object_max}).')

    # There is no need to check the rows and cols start because they are always
    # smaller or equal to indices_memory.

    # Verify the global memory.
    if required_memory > _device.global_mem_size:
        raise MemoryError(
            f'Your device does not have enough memory to store the matrix and'
            f'its transpose ({required_memory} > {_device.global_mem_size}).')


def _chunk_sizes(
        desired_chunk_size: int,
        gen_length: int,
        nb_rows: int) -> np.ndarray:
    """Generate chunk sizes to split linear operator

    To reduce memory consumption, the linear operators may be split into
    chunks. However, these chunks should not cut generators and should cover
    the entire operator. This function generates valid chunk sizes which
    respect these requirements an are as close as possible to the desired
    chunk size.

    Args:
        desired_chunk_size: The target chunk size in number of rows. The
            returned chunk sizes will be close to this value.
        gen_length: The length of the generators.
        nb_rows: The number of rows of the linear operator.

    """

    # Make sure the chunks to do not cut a generator.
    chunk_size = (desired_chunk_size // gen_length + 1) * gen_length

    # Determine the number of full chunks.
    nb_chunks = int(nb_rows // chunk_size)

    # Add the remainder if any.
    remainder = int(nb_rows % chunk_size)
    chunk_sizes = [chunk_size] * nb_chunks
    if remainder != 0:
        chunk_sizes += [remainder]

    return np.array(chunk_sizes, dtype=np.int)


def _read_buffer(
        data: np.ndarray,
        dtype: Type = np.uint32) -> Optional[cl.Buffer]:
    """Creates a read only buffer from a data array.

    Args:
        data: The data to store in the buffer.
        dtype: The data type of the buffer.

    Return:
        buffer: The created buffer. If the size of the data is zero, no buffer
            is created and None is returned.
    """

    flags = cl.mem_flags.READ_ONLY
    size = data.size * TYPE_SIZE

    # Buffers cannot be created with a size of 0.
    if size == 0:
        return None

    buffer = cl.Buffer(_context, flags, size=size)
    cl.enqueue_copy(_queue, buffer, data.astype(dtype))

    return buffer


[docs]class LinearOperator(talon.core.LinearOperator):
[docs] def __init__(self, generators, indices_of_generators, weights, chunk_size=100000000): """Linear operator implemented with OpenCL A linear operator that has a sparse vector structure. The product between this operator and a vector is implemented using OpenCL. Args: generators : np.array where each row is a generator. indices_of_generators : COO sparse matrix that tells which generator is called where in the linear operator. weights : COO sparse matrix that encodes the weight applied to each generator indexed by indices_of_generators. It has the same dimension as indices_of_generators. chunk_size : The product is computed by splitting the linear operator into chunks. This parameter determines the approximate chunk size. Reducing this value reduces the amount of memory required on the device. Raises: TypeError: If `generators` is not a numpy array of float. TypeError: If `indices_of_generators` is not a COO scipy matrix. TypeError: If `weights` is not a COO scipy matrix of float64. ValueError: If `weights` does not have the same dimension as indices_of_generators. ValueError: If `weights` and `indices_of_generators` don't have the same sparsity pattern. """ super().__init__(generators, indices_of_generators, weights) # This implementation requires that the generator length be a multiple # of 4. This allows the use of float4 in the kernel. if generators.shape[1] % 4 != 0: raise ValueError( f'The length of the generators ({generators.shape[1]}) must ' f'be a multiple of 4 to use this linear operator. Consider ' f'padding the generators and the data with zeros.') _assert_enough_memory(generators, indices_of_generators, chunk_size) nb_generators, generator_length = generators.shape nb_data, nb_columns = indices_of_generators.shape self._generator_length = generator_length self._indices = indices_of_generators # Create a buffer of the x and y vectors and for the generators. They # are shared between the linear operator and its transpose and are # thus read/write. flags = cl.mem_flags.READ_WRITE self._x_buffer = cl.Buffer( _context, flags, size=self.shape[1] * TYPE_SIZE) self._chunk_sizes = _chunk_sizes( chunk_size, generator_length, self.shape[0]) self._y_buffers = [cl.Buffer( _context, flags, size=c * TYPE_SIZE) for c in self._chunk_sizes] expanded_generators = generators.ravel().astype(np.float32) self._generators_buffer = _read_buffer(expanded_generators, np.float32) # Flatten all the 2D arrays. sorting_indices = np.argsort(indices_of_generators.row) x_indices = indices_of_generators.col[sorting_indices] generator_indices = indices_of_generators.data[sorting_indices] expanded_weights = weights.data[sorting_indices] # Find where the rows start. sorted_rows = indices_of_generators.row[sorting_indices] bin_count = np.bincount(sorted_rows, minlength=nb_data) row_starts = np.cumsum(bin_count).astype(np.uint32) # Create buffers for each data array used for the product. self._x_indices_buffer = _read_buffer(x_indices) self._generator_indices_buffer = _read_buffer(generator_indices) self._weights_buffer = _read_buffer(expanded_weights, np.float32) self._row_buffer = _read_buffer(row_starts) self._transpose = _TransposedLinearOperator( self, generators, indices_of_generators, weights, self._x_buffer, self._y_buffers, self._generators_buffer, self._chunk_sizes)
@property def dtype(self): """Returns the data type of the linear operator""" return np.float32 @property def transpose(self) -> '_TransposedLinearOperator': """TransposedFastLinearOperator: transpose of the linear operator.""" return self._transpose def __matmul__(self, x: np.ndarray) -> np.ndarray: """Matrix vector product (A @ x) Args: x: The right operand of the product. It's length must match the number of columns of the linear operator. Raises: TypeError : If x is not a numpy array. ValueError : If the length of x does not match the number of columns of the linear operator. """ x = self.convert_x(x) # Transfer the data to the device. cl.enqueue_copy(_queue, self._x_buffer, x.astype(np.float32)) # Prepare the output product = np.zeros((self.shape[0],), dtype=np.float32) # Compute the probabilities on the device. start = 0 for chunk_size, y_buffer in zip(self._chunk_sizes, self._y_buffers): stop = start + chunk_size _programs.fast_dot( _queue, (chunk_size,), None, self._x_buffer, self._x_indices_buffer, self._weights_buffer, self._generators_buffer, self._generator_indices_buffer, self._row_buffer, y_buffer, np.uint32(self._generator_length), np.uint32(start)) cl.enqueue_copy(_queue, product[start:stop], y_buffer) start = stop return product
[docs] def todense(self) -> np.ndarray: """Return the dense matrix associated with the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: Full matrix representing the linear operator. """ dense = np.zeros(self.shape, dtype=DATATYPE) length = self._generator_length zipped = zip( self._indices.row, self._indices.col, self._indices.data, self.weights.data) for row, column, index, weight in zipped: generator = self.generators[index, :] * weight dense[length * row: length * (row + 1), column] = generator return dense
class _TransposedLinearOperator(AbstractLinearOperator): def __init__(self, linear_operator: LinearOperator, generators, indices_of_generators, weights, x_buffer: cl.Buffer, y_buffers: Iterable[cl.Buffer], generators_buffer: cl.Buffer, chunk_sizes: np.ndarray): self._linear_operator = linear_operator nb_rows, nb_columns = indices_of_generators.shape nb_generators, gen_length = generators.shape self._chunk_sizes = chunk_sizes self._shape = nb_columns, nb_rows * gen_length self._generator_length = gen_length # Reuse some buffers from the linear operator. self._generators_buffer = generators_buffer self._y_buffers = y_buffers self._x_buffer = x_buffer sorting_indices = np.argsort(indices_of_generators.col) generator_indices = indices_of_generators.data[sorting_indices] sorted_columns = indices_of_generators.col[sorting_indices] expanded_weights = weights.data[sorting_indices] y_indices = indices_of_generators.row[sorting_indices] * gen_length # Split the linear operator along the columns. fields = ('size', 'y', 'y_indices', 'generator_indices', 'weights', 'rows', 'start', 'stop') ChunkData = namedtuple('ChunkData', fields) self._chunk_data = [] start = 0 for chunk_size, y_buffer in zip(chunk_sizes, y_buffers): # Filter the y indices to keep only those of the current chunk. If # there are none, move on to the next chunk. stop = start + chunk_size keep = np.logical_and(y_indices >= start, y_indices < stop) if not np.any(keep): start = stop continue chunk_generator_indices = generator_indices[keep] chunk_y_indices = y_indices[keep] chunk_weights = expanded_weights[keep] kept_columns = sorted_columns[keep] bin_count = np.bincount(kept_columns, minlength=self.shape[0]) chunk_row_starts = np.cumsum(bin_count).astype(np.uint32) generator_indices_buffer = _read_buffer(chunk_generator_indices) y_indices_buffer = _read_buffer(chunk_y_indices) weights_buffer = _read_buffer(chunk_weights, np.float32) row_buffer = _read_buffer(chunk_row_starts) data = ChunkData( chunk_size, y_buffer, y_indices_buffer, generator_indices_buffer, weights_buffer, row_buffer, start, stop) self._chunk_data.append(data) start = stop self._data_mask = np.ones(self.shape[0], dtype=bool) @property def data_mask(self): """Returns the mask to apply to the data to keep only the entries covered by the linear operator.""" return self._data_mask @property def generator_length(self) -> int: """Returns the length of the generators.""" return self._generator_length @property def shape(self) -> Tuple[int, int]: return self._shape @property def transpose(self) -> LinearOperator: """Returns the transpose of the linear operator.""" return self._linear_operator def __matmul__(self, y: np.ndarray) -> np.ndarray: """""" # Reserve space of the final solution and for each y chunk. x = np.zeros((self.shape[0],), dtype=np.float32) x_chunk = np.empty((self.shape[0],), dtype=np.float32) # Compute the product chunk by chunk. for chunk in self._chunk_data: chunk_y = y[chunk.start:chunk.stop].astype(np.float32) cl.enqueue_copy(_queue, chunk.y, chunk_y) _programs.transpose_dot( _queue, (self._shape[0] * 8,), (8,), chunk.y, chunk.y_indices, self._generators_buffer, chunk.generator_indices, chunk.weights, chunk.rows, self._x_buffer, np.uint32(self.generator_length), np.uint32(chunk.start)) cl.enqueue_copy(_queue, x_chunk, self._x_buffer) x += x_chunk return x def todense(self) -> np.ndarray: """Return the dense matrix associated to the linear operator. Note: The output of this method can be very memory heavy to store. Use at your own risk. Returns: Full matrix representing the linear operator. """ return self._linear_operator.todense().T