Source code for dynpy.mx

"""Module which provides a consistent interface for working with both dense
arrays and sparse matrices. Also constains some utility functions for working
with matrices.
"""
from __future__ import division, print_function, absolute_import
import six
range = six.moves.range
map   = six.moves.map

import numpy as np
from numpy.core import asarray

import numpy.linalg
import scipy.linalg
import scipy.sparse as ss
import scipy.sparse.linalg

import hashlib
import functools

from .utils import is_int


[docs] class StateBase(object): """Base class for managing objects which hold state of dynamical system """
[docs] @classmethod def format_obj(cls, obj): """Format object `obj` into the current class's preferred type (e.g., convert a dense matrix to sparse, or vice-versa, as appropriate) """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def vstack(cls, data): """Stack states vertically""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] class IntScalar(StateBase):
[docs] @classmethod def format_obj(cls, obj): return obj
[docs] @classmethod def vstack(cls, data): return np.array(data)
[docs] class MxBase(StateBase): """Base class from which sparse and dense matrix operation classes inherit """
[docs] @classmethod def format_obj(cls, obj): return cls.format_mx(obj)
[docs] @classmethod def create_editable_zeros_mx(cls, shape, dtype=None): """Create blank editable transition matrix, of size specified by `shape` """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def finalize_mx(cls, mx): """Finalize processing of editable transition matrix `mx` """ pass
[docs] @classmethod def get_largest_right_eigs(cls, mx): """Get largest right eigenvectors and eigenvalues of matrix `mx` """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def get_largest_left_eigs(cls, mx): """Get largest left eigenvectors and eigenvalues of matrix `mx` Returns ------- 1-dimensional numpy array Eigenvalues 2-dimensional numpy array Eigenvectors """ vals, vecs = cls.get_largest_right_eigs(mx.T) return vals, vecs.T
[docs] @classmethod def pow(cls, mx, exponent): """Raise matrix `mx` to a power `exponent` """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def expm(cls, mx): """Return matrix exponential of matrix `mx` """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def make2d(cls, mx): """Transform matrix `mx` to be 2-dimensional""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def todense(cls, mx): """Convert matrix `mx` to dense format""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def tosparse(cls, mx): """Convert matrix `mx` to sparse format""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def isnan(cls, mx): """Element-wise isnan operation on matrix elements""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def array_equal(cls, mx, other_mx): """Return True if mx and other_mx are equal (including nan's, unlike numpy's array_equal """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def from_coords(cls, rows, cols, data, shape): """Initialize matrix using data and row, column coordinates. Duplicates should be summed""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def get_coords(cls, mx): """Return matrix in terms of data and row, column coordinates""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def diag(cls, data): """Returns matrix with diagonals set to data""" raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def getdiag(cls, data): """Returns diagonals of matrix """ raise NotImplementedError # virtual class, sublcasses should implement
[docs] @classmethod def multiplyrows(cls, mx, multiplier): """TODO: Document""" return NotImplementedError
[docs] class SparseMatrix(MxBase): """Class for sparse matrix operations. See documentation for :class:`dynpy.mx.MxBase` for description of methods. """
[docs] @classmethod def create_editable_zeros_mx(cls, shape, dtype=None): return ss.lil_matrix(shape, dtype=None)
[docs] @classmethod def format_mx(cls, mx): mx = ss.csc_matrix(mx) return mx
[docs] @classmethod def finalize_mx(cls, mx): #if not ss.issparse(mx): # raise Exception('Transition matrix for this class should be sparse') return cls.format_mx(mx)
[docs] @classmethod def pow(cls, mx, exponent): rMx = ss.eye(mx.shape[0], mx.shape[0]) for i in range(exponent): rMx = rMx.dot(mx) return rMx
[docs] @classmethod def expm(cls, mx): return scipy.sparse.linalg.expm(mx)
[docs] @classmethod def get_largest_right_eigs(cls, mx): vals, vecsR = scipy.sparse.linalg.eigs(mx, k=mx.shape[0]-2, which='LR') return vals, vecsR
[docs] @classmethod def make2d(cls, mx): return mx
[docs] @classmethod def todense(cls, mx): return mx.todense()
[docs] @classmethod def tosparse(cls, mx): return mx
[docs] @classmethod def isnan(cls, mx): r = mx.copy() r.data[:] = np.isnan(r.data) return r
[docs] @classmethod def from_coords(cls, rows, cols, data, shape): return ss.coo_matrix((data, (rows, cols)), shape=shape)
[docs] @classmethod def get_coords(cls, mx): mx = mx.tocoo() return mx.row, mx.col, mx.data
[docs] @classmethod def diag(cls, data): return ss.diags(data, 0)
[docs] @classmethod def getdiag(cls, mx): ix = np.arange(mx.shape[0], dtype='int') return np.ravel(todense(mx[ix, ix]))
[docs] @classmethod def vstack(cls, data): return ss.vstack(data)
[docs] @classmethod def multiplyrows(cls, mx, multiplier): r = cls.diag(np.ravel(multiplier)).dot(mx) r = cls.finalize_mx(r) r[np.ravel(np.isnan(multiplier)),:] = np.nan return r
[docs] class DenseMatrix(MxBase): """Class for dense matrix operations. See documentation for :class:`dynpy.mx.MxBase` for description of methods. """
[docs] @classmethod def expm(cls, mx): return scipy.linalg.expm(mx)
[docs] @classmethod def get_largest_right_eigs(cls, mx): vals, vecsR = scipy.linalg.eig(mx, right=True, left=False) vals, vecsR = scipy.linalg.eig(mx, right=True, left=False) return vals, vecsR
[docs] @classmethod def create_editable_zeros_mx(cls, shape, dtype=None): return np.zeros(shape, dtype=dtype)
[docs] @classmethod def format_mx(cls, mx): if ss.issparse(mx): mx = mx.todense() mx = np.array(mx) return mx
# Convert transition matrix to finalized format
[docs] @classmethod def finalize_mx(cls, mx): #if ss.issparse(mx): # raise Exception('Trans mx for this class should not be sparse') return cls.format_mx(mx)
[docs] @classmethod def pow(cls, mx, exponent): return numpy.linalg.matrix_power(mx, exponent)
[docs] @classmethod def make2d(cls, mx): return np.atleast_2d(mx)
[docs] @classmethod def todense(cls, mx): return mx
[docs] @classmethod def tosparse(cls, mx): return SparseMatrix.format_mx(mx)
[docs] @classmethod def isnan(cls, mx): return np.isnan(mx)
[docs] @classmethod def array_equal(cls, mx, other_mx): try: mx, other_mx = asarray(mx), asarray(other_mx) except: return False if mx.shape != other_mx.shape: return False both_equal = np.equal(mx, other_mx) both_nan = np.logical_and(np.isnan(mx), np.isnan(other_mx)) return bool(np.logical_or(both_equal, both_nan).all())
[docs] @classmethod def from_coords(cls, rows, cols, data, shape): mx = cls.create_editable_zeros_mx(shape, data.dtype) for r, c, d in zip(rows, cols, data): mx[r, c] += d # mx[rows, cols] = data # Doesn't sum duplicates return mx
[docs] @classmethod def get_coords(cls, mx): mx = ss.coo_matrix(mx) return mx.row, mx.col, mx.data
[docs] @classmethod def diag(cls, data): return np.diag(data)
[docs] @classmethod def getdiag(cls, mx): return np.diagonal(mx)
[docs] @classmethod def vstack(cls, data): return np.vstack(data)
[docs] @classmethod def multiplyrows(cls, mx, multiplier): m = np.atleast_2d(multiplier) if len([d for d in m.shape if d>1]) > 1: raise Exception('multiplier must be row or column ' + 'vector') if m.shape[0] == 1: m = m.T return np.multiply(mx, m)
[docs] def issparse(mx): if ss.issparse(mx): return True elif isinstance(mx, (np.ndarray, np.generic) ): return False else: raise ValueError('mx does not appear to be a matrix')
[docs] def get_state_cls(obj): if is_int(obj): return IntScalar else: return get_matrix_cls(obj)
[docs] def get_matrix_cls(mx): if issparse(mx): return SparseMatrix else: return DenseMatrix
[docs] def format_mx(mx): return get_matrix_cls(mx).format_mx(mx)
[docs] def finalize_mx(mx): return get_matrix_cls(mx).finalize_mx(mx)
[docs] def pow(mx, exponent): return get_matrix_cls(mx).pow(mx, exponent)
[docs] def expm(mx): return get_matrix_cls(mx).expm(mx)
[docs] def make2d(mx): return get_matrix_cls(mx).make2d(mx)
[docs] def todense(mx): return get_matrix_cls(mx).todense(mx)
[docs] def tosparse(mx): return get_matrix_cls(mx).tosparse(mx)
[docs] def get_largest_right_eigs(mx): return get_matrix_cls(mx).get_largest_right_eigs(mx)
[docs] def get_largest_left_eigs(mx): return get_matrix_cls(mx).get_largest_left_eigs(mx)
[docs] def isnan(mx): return get_matrix_cls(mx).isnan(mx)
[docs] def array_equal(mx, other_mx): return get_matrix_cls(mx).array_equal(mx, other_mx)
[docs] def getdiag(mx): return get_matrix_cls(mx).getdiag(mx)
[docs] def multiplyrows(mx, multiplier): return get_matrix_cls(mx).multiplyrows(mx, multiplier)
[docs] @functools.total_ordering class hashable_array(np.ndarray): """This class provides a hashable and sortable np.array. This is useful for using np.array as dicitionary keys, for example. Notice that hashable arrays change the default behavior of numpy arrays for equality and comparison operators. Instead of performing element-wise tests, hashable arrays return a value for the array as a whole """ # We inherit from np.ndarray but don't want to include that documentation # These attributes are parsed in Sphinx's conf.py SPHINXDOC_INHERITED_MEMBERS = False SPHINXDOC_UNDOC_MEMBERS = False def __new__(cls, data): r = np.array(data, copy=False).view(type=cls) r.flags.writeable = False return r def __init__(self, values): if len(values) < 1000: if len(values.shape) == 1: tpl = tuple(values.tolist()) else: tpl = tuple(map(tuple, values.tolist())) self.__hash = hash(tpl) else: self.__hash = int(hashlib.sha1(self).hexdigest(), 16) def __hash__(self): return self.__hash def __eq__(self, other): # equality test return DenseMatrix.array_equal(self, other) def __lt__(self, other): # comparison test if self.size < other.size: return True nonequal = ~np.equal(self, other) if not len(nonequal): return False firstnonequal = np.flatnonzero(nonequal)[0] return self[firstnonequal] < other[firstnonequal]
[docs] def hashable_state(x): if not isinstance(x, np.ndarray): return x else: return hashable_array(x)