Source code for dynpy.dynsys

"""Module implementing some base classes useful for implementing dynamical
systems"""

from __future__ import division, print_function, absolute_import
import six
range = six.moves.range
map   = six.moves.map

import numpy as np

from . import mx
from . import caching
from .mx import hashable_state
from .utils import readonlydict

# Constants for finding attractors
MAX_ATTRACTOR_LENGTH = 5
TRANSIENT_LENGTH = 30

[docs] class DynamicalSystem(object): """Base class for dynamical systems. Parameters ---------- discrete_time : bool, optional Whether updating should be done using discrete (default) or continuous time dynamics. """ #: Whether the dynamical system obeys discrete- or continuous-time dynamics discrete_time = True def __init__(self, discrete_time=True): self.discrete_time = discrete_time if self.discrete_time: self.iterate = self._iterate_discrete self.iterate_1step = self._iterate_1step_discrete else: self.iterate = self._iterate_continuous
[docs] def iterate(self, start_state, max_time): """This method runs the dynamical system for `max_time` starting from `start_state` and returns the result. In fact, this method is set at run-time by the constructor to either `_iterateDiscrete` or `_iterateContinuous` depending on whether the dynamical system object is initialized with `discrete_time=True` or `discrete_time=False`. Thus, sub-classes should override `_iterateDiscrete` and `_iterateContinuous` instead of this method. See also :meth:`dynpy.dynsys.DynamicalSystem.iterateOneStep` Parameters ---------- start_state : numpy array or scipy.sparse matrix Which state to start from max_time : float Until which point to run the dynamical system (number of iterations for discrete-time systems or time limit for continuous-time systems) Returns ------- numpy array or scipy.sparse matrix End state """ raise NotImplementedError
[docs] def iterate_1step(self, start_state): """ This method runs a discrete-time dynamical system for 1 timestep. At run-time, the construct either repoints this method to `_iterateOneStep` for discrete-time systems, or removes it for continuous time systems. Parameters ---------- start_state : numpy array or scipy.sparse matrix Which state to start from Returns ------- numpy array or scipy.sparse matrix Iterated state """ raise NotImplementedError
def _iterate_1step_discrete(self, start_state): raise NotImplementedError def _iterate_continuous(self, start_state, max_time=1.0): raise NotImplementedError def _iterate_discrete(self, start_state, max_time=1.0): if max_time == 1.0: return self.iterate_1step(start_state) elif max_time == 0.0: return start_state else: cur_state = start_state for i in range(int(max_time)): cur_state = self.iterate_1step(cur_state) return cur_state
[docs] def get_trajectory(self, start_state, max_time, num_points=None, logscale=False): """This method get a trajectory of a dynamical system starting from a particular starting state. Parameters ---------- start_state : object Which state to start from max_time : float Until which point to run the dynamical system (number of iterations for discrete-time systems or time limit for continuous-time systems) num_points : int, optional How many timepoints to sample the trajectory at. This determines how big each 'step size' is. By default, equal to ``int(max_time)`` logscale : bool, optional Whether to sample the timepoints on a logscale or not (default) Returns ------- trajectory: numpy array Array of states corresponding to trajectory """ # TODO: would this accumulate error for continuous case? if num_points is None: num_points = int(max_time) if logscale: timepoints = np.logspace(0, np.log10(max_time), num=num_points, endpoint=True, base=10.0) else: timepoints = np.linspace(0, max_time, num=num_points, endpoint=True) cur_state = start_state trajectory = [cur_state,] start_state_cls = mx.get_state_cls(start_state) for t in range(1, len(timepoints)): run_time = timepoints[t]-timepoints[t-1] next_state = self.iterate(cur_state, max_time=run_time) cur_state = start_state_cls.format_obj(next_state) trajectory.append( cur_state ) return start_state_cls.vstack(trajectory)
[docs] class DeterministicDynamicalSystem(DynamicalSystem): pass
[docs] class StochasticDynamicalSystem(DynamicalSystem): pass
[docs] class DiscreteStateDynamicalSystem(DynamicalSystem):
[docs] def states(self): raise NotImplementedError
[docs] def get_attractor_basins(self, sort=False, start_state_iter=None): """Computes the attractors and basins of the current discrete-state dynamical system. Parameters ---------- sort : bool, optional Whether to sort attractors and basin states (slower). start_state_iter : iterator, optional Iterator to indicate which start-states to start from. If not specified, tries all states. Returns ------- basin_atts : list of lists A list of the the attractor states for each basin (basin order is from largest basin to smallest). basin_states : list of lists A list of all the states in each basin (basin order is from largest basin to smallest). """ state_basins = {} attractors = {} iteratefunc = self.iterate if start_state_iter is None: start_state_iter = self.states() for raw_startstate in start_state_iter: startstate = hashable_state(raw_startstate) if startstate in state_basins: continue traj = set() cstate = startstate while True: traj.add(cstate) cstate = hashable_state(iteratefunc(cstate)) if cstate in traj: # cycle closed cur_cycle = [] cyclestate = cstate while True: cur_cycle.append(cyclestate) cyclestate = hashable_state(iteratefunc(cyclestate)) if cyclestate == cstate: break cur_cycle = tuple(sorted(cur_cycle)) if cur_cycle not in attractors: cndx = len(attractors) attractors[cur_cycle] = cndx state_basins[cstate] = attractors[cur_cycle] if cstate in state_basins: for s in traj: state_basins[s] = state_basins[cstate] break basins = [ [] for _ in range(len(attractors))] for state, basin in six.iteritems(state_basins): basins[basin].append(state) keyfunc = lambda k: (-len(basins[attractors[k]]),k) attractor_states = attractors.keys() if sort: attractor_states = sorted(attractor_states, key=keyfunc) basins_states = [] for att in attractor_states: cbasin = basins[attractors[att]] if sort: cbasin = sorted(cbasin) basins_states.append(cbasin) return attractor_states, basins_states
[docs] def print_attractor_basins(self): """Prints the attractors and basin of the dynamical system >>> import dynpy >>> rules = [ ['a',['a','b'],[1,1,1,0]],['b',['a','b'],[1,0,0,0]]] >>> bn = dynpy.bn.BooleanNetwork(rules=rules) >>> bn.print_attractor_basins() * BASIN 1 : 2 States ATTRACTORS: a b 1 0 -------------------------------------------------------------------------------- * BASIN 2 : 1 States ATTRACTORS: a b 0 0 -------------------------------------------------------------------------------- * BASIN 3 : 1 States ATTRACTORS: a b 1 1 -------------------------------------------------------------------------------- """ basin_atts, basin_states = self.get_attractor_basins(sort=True) for cur_basin_ndx in range(len(basin_atts)): print("* BASIN %d : %d States" % (cur_basin_ndx+1, len(basin_states[cur_basin_ndx]))) print("ATTRACTORS:") print(self._get_state_row_title()) for att in basin_atts[cur_basin_ndx]: print(self._get_state_row_repr(att)) print("".join(['-', ] * 80))
def _get_state_row_repr(self, state): return state def _get_state_row_title(self): return 'State'
[docs] class VectorDynamicalSystem(DynamicalSystem): """Mix-in for classes implementing dynamics over multivariate systems. Parameters ---------- num_vars : int, optional How many variables (i.e., how many 'dimensions' or 'nodes' are in the dynamical system). Default is 1 var_names : list, optional Names for the variables (optional). Default is simply the numeric indexes of the variables. discrete_time : bool, optional Whether dynamical system is discrete or continuous time. """ #: The number of variables in the dynamical system num_vars = None def __init__(self, num_vars, var_names=None, discrete_time=True): super(VectorDynamicalSystem,self).__init__(discrete_time) self.num_vars = num_vars self.var_names = tuple(var_names if var_names is not None else range(self.num_vars)) """The names of the variables in the dynamical system""" # Make this a cached property so its not necessarily run every time a # dynamical systems object is created, whether we need it or not @caching.cached_data_prop def var_name_ndxs(self): """A mapping from variables names to their indexes """ return dict((l, ndx) for ndx, l in enumerate(self.var_names)) def _get_state_row_repr(self, state): row_format = "{:>7}" * self.num_vars return row_format.format(*state) def _get_state_row_title(self): row_format = "{:>7}" * self.num_vars return row_format.format(*self.var_names)
[docs] class DiscreteStateVectorDynamicalSystem(VectorDynamicalSystem, DiscreteStateDynamicalSystem):
[docs] @caching.cached_data_method def get_ndx2state_mx(self): #: ``(num_states, num_vars)``-shaped matrix that maps from state indexes #: to representations in terms of activations of the variables. return np.vstack(list(self.states()))
[docs] @caching.cached_data_method def get_ndx2state_map(self): return readonlydict( enumerate(self.states()) )
[docs] @caching.cached_data_method def get_state2ndx_map(self): return readonlydict( { hashable_state(v):k for k, v in six.iteritems(self.get_ndx2state_map()) })
[docs] class ProjectedStateSpace(DiscreteStateVectorDynamicalSystem): # TODO: Document def __init__(self, keep_vars, base_sys, *kargs, **kwargs): self.keep_vars = keep_vars self.base_sys = base_sys var_names = None if base_sys.var_names is not None: var_names = [base_sys.var_names[i] for i in keep_vars] super(ProjectedStateSpace, self).__init__( num_vars=len(keep_vars), var_names=var_names, *kargs, **kwargs)
[docs] def states(self): done = set() n2smx = self.base_sys.get_ndx2state_mx() for c in map(hashable_state, n2smx[:,self.keep_vars]): if c not in done: done.add(c) yield c
[docs] class LinearDynamicalSystem(VectorDynamicalSystem): # TODO: TESTS """This class implements linear dynamical systems, whether continuous or discrete-time. It is also used by :class:`dynpy.dynsys.MarkovChain` to implement Markov Chain (discrete-case) or master equation (continuous-case) dynamics. For attribute definitions, see documentation of :class:`dynpy.dynsys.DynamicalSystem`. Parameters ---------- transition_matrix : numpy array or scipy.sparse matrix Matrix defining the evolution of the dynamical system, i.e. the :math:`\\mathbf{A}` in :math:`\\mathbf{x_{t+1}} = \\mathbf{x_{t}}\\mathbf{A}` (in the discrete-time case) or :math:`\\dot{\\mathbf{x}} = \\mathbf{x}\\mathbf{A}` (in the continuous-time case) discrete_time : bool, optional Whether updating should be done using discrete (default) or continuous time dynamics. """ #: Transition matrix for linear system. transition_matrix = None def __init__(self, transition_matrix, discrete_time=True): super(LinearDynamicalSystem, self).__init__( num_vars=transition_matrix.shape[0], discrete_time=discrete_time) self.transition_matrix = transition_matrix self.stable_eigenvalue = 1.0 if discrete_time else 0.0
[docs] def get_equilibrium_distribution(self): """Get equilibrium state of dynamical system using eigen-decomposition Returns ------- numpy array or scipy.sparse matrix Equilibrium state """ vals, vecs = mx.get_largest_left_eigs(self.transition_matrix) equil_evals = np.flatnonzero(np.abs(vals-self.stable_eigenvalue) < 1e-8) if len(equil_evals) != 1: raise Exception("Expected one stable eigenvalue, but found " + "%d instead (%s)" % (len(equil_evals), equil_evals)) dist = np.real_if_close(np.ravel(vecs[equil_evals, :])) if np.any(np.iscomplex(equil_evals)): raise Exception("Expect equilibrium state to be real! %s" % equil_evals) return mx.format_mx(dist)
def _iterate_1step_discrete(self, start_state): # For discrete time systems, one step r = mx.format_mx(start_state).dot(self.transition_matrix) return r def _iterate_discrete(self, start_state, max_time=1.0): # For discrete time systems if max_time == 0.0: return start_state cls = mx.get_matrix_cls(self.transition_matrix) r = cls.format_mx(start_state).dot( cls.pow(self.transition_matrix, int(max_time))) return r def _iterate_continuous(self, start_state, max_time=1.0): if max_time == 0.0: return start_state cls = mx.get_matrix_cls(self.transition_matrix) curStartStates = cls.format_mx(start_state) r = curStartStates.dot( cls.expm(max_time * (self.transition_matrix))) return r
# TODO # def getMultistepDynsys(self, num_iters): # import copy # rObj = copy.copy(self) # rObj.trans = self.transition_matrixCls.pow(self.transition_matrix, num_iters) # return rObj