Source code for msm_we.ensembles

"""
Adapted from the original NMpathAnalysis package,
https://github.com/ZuckermanLab/NMpathAnalysis
"""
import numpy as np
from copy import deepcopy
import networkx as nx
from math import log

from msm_we.utils import Interval
from msm_we.utils import get_shape, weighted_choice
from msm_we.utils import reverse_sort_lists
from msm_we.fpt import DirectFPT, NonMarkovFPT


[docs]class Ensemble: """ Define a base class to store a list of space-continuous trajectories e.g.: [ trajectory1, trajectory2,...], each trajectory is just a matrix where each row is a snapshot, and each column represents the evolution of the corresponding variable. """ def __init__(self, trajectories=None, verbose=False, dtype="float32", discrete=False, lag_time=1, **kwargs): """Initialize an object of Ensemble class""" super().__init__(**kwargs) self.dtype = dtype self.discrete = discrete self.verbose = verbose self._lag_time = lag_time if (trajectories is None) or (trajectories == []): self.trajectories = [] self.n_variables = 0 if verbose: print("\nEmpty ensemble generated") else: # we have a list/array of trajectories _n_snapshots, _n_variables = get_shape(trajectories[0]) traj_length = 0.0 # Average trajectory length for element in trajectories: traj_length += len(element) n_snapshots, n_variables = get_shape(element) if n_variables != _n_variables: raise Exception("Error: All the trajectories must have the same " "number of variables") self.n_variables = _n_variables self.trajectories = trajectories n_trajs = len(trajectories) traj_length /= n_trajs # Average if verbose: print("Read {} ({}-dimensional) trajectories of " "average length {}.".format(n_trajs, n_variables, traj_length))
[docs] def add_trajectory(self, trajectory): """Add a single trajectory to the ensemble""" if not isinstance(trajectory, np.ndarray): trajectory = np.array(trajectory, dtype=self.dtype) _n_snapshots, _n_variables = get_shape(trajectory) if self.n_variables == 0: # Empty ensemble self.trajectories = [trajectory] self.n_variables = _n_variables if self.verbose: print(self) else: if self.n_variables != _n_variables: raise Exception("All the trajectories in the same ensemble must " "have the same number of variables") else: self.trajectories.append(trajectory) if self.verbose: print(self)
def __len__(self): # returns the number of trajectories in the ensemble return len(self.trajectories) def __str__(self): if self.discrete: feature = "Discrete, " else: feature = "Continuous, " return ( "\n" + feature + "{} with {} ({}-dimensional) trajectories".format(self.__class__.__name__, self.__len__(), self.n_variables) + "\nTotal number of snapshots: {}".format(sum([len(traj) for traj in self])) ) def __add__(self, other): ensemble_sum = deepcopy(self) for traj in other.trajectories: ensemble_sum.add_trajectory(traj) return ensemble_sum def __iadd__(self, other): return self.__add__(other) def __iter__(self): return iter(self.trajectories) def __getitem__(self, arg): return self.trajectories[arg]
[docs] def empirical_mfpts(self, stateA, stateB): """Calculate mean first passage time using direct counting method""" return DirectFPT.mean_fpts( self.trajectories, stateA, stateB, discrete=self.discrete, n_variables=self.n_variables, lag_time=self._lag_time, )
def _count_matrix(self, n_states=None, map_function=None): """Built the count matrix""" if (map_function is None) or (n_states is None): raise Exception("The number of states and a map function " "have to be given as argument") count_matrix = np.zeros((n_states, n_states)) for traj in self.trajectories: previous_state = "Unknown" for snapshot in traj: current_state = map_function(snapshot) if previous_state != "Unknown": count_matrix[previous_state, current_state] += 1.0 previous_state = current_state return count_matrix def _mle_transition_matrix(self, n_states, map_function): """Generate transition matrix from count matrix""" count_matrix = self._count_matrix(n_states, map_function) transition_matrix = count_matrix.copy() # Transforming the count matrix to a transition matrix for i in range(n_states): row_sum = sum(transition_matrix[i, :]) if row_sum != 0.0: transition_matrix[i, :] = transition_matrix[i, :] / row_sum return transition_matrix
[docs] def empirical_corr_function(self, stateA, stateB, times, symmetric=True): """Calculate correlation function for trajectories""" n_dim = self.n_variables stateA = Interval(stateA, n_dim) if not self.discrete else stateA stateB = Interval(stateB, n_dim) if not self.discrete else stateB corr_values = [] for delay in times: assert type(delay) == int assert delay >= 1 sum_ = 0 counts = 0 for traj in self.trajectories: for i in range(len(traj) - delay): sum_ += (traj[i] in stateA) * (traj[i + delay] in stateB) counts += 1 if symmetric: sum_ += (traj[i] in stateB) * (traj[i + delay] in stateA) counts += 1 corr_values.append(sum_ / counts) return corr_values
[docs]class PathEnsemble(Ensemble): """ Derive a class to store trajectories for path analysis""" def __init__( self, trajectories=None, verbose=False, dtype="float32", discrete=False, lag_time=1, stateA=None, stateB=None, **kwargs ): super().__init__(trajectories, verbose, dtype, discrete, lag_time, **kwargs) if (stateA is None) or (stateB is None): raise Exception( "The initial state (stateA) and final state (stateB) \ have to be specified" ) self.stateA = stateA self.stateB = stateB
[docs] @classmethod def from_ensemble( cls, ensemble, stateA=None, stateB=None, map_function=None, discrete=False, dtype="float32", ): """Build a PathEnsemble from an ensemble object or a set of trajectories""" list_of_pathsAB = [] if np.size(ensemble[0][0]): n_variables = np.size(ensemble[0][0]) else: n_variables = 1 if (stateA is None) or (stateB is None): raise Exception( "The initial state (stateA) and final state (stateB) \ have to be specified" ) for traj in ensemble.trajectories: previous_color = "Unknown" pathAB = [] for _snapshot in traj: if map_function is not None: snapshot = map_function(_snapshot) else: snapshot = _snapshot # color determination if not discrete: if snapshot in Interval(stateA, n_variables): color = "A" elif snapshot in Interval(stateB, n_variables): color = "B" else: color = previous_color else: if snapshot in stateA: color = "A" elif snapshot in stateB: color = "B" else: color = previous_color if color == "A": pathAB.append(snapshot) elif (color == "B") and (previous_color == "A"): pathAB.append(snapshot) list_of_pathsAB.append(np.array(pathAB, dtype=dtype)) pathAB = [] previous_color = color return cls(list_of_pathsAB, stateA=stateA, stateB=stateB, dtype=dtype, discrete=discrete,)
def cluster(self, distance_metric, n_cluster=10, method="K-means"): raise NotImplementedError("Not implemented yet")
[docs]class DiscreteEnsemble(Ensemble): """Define a base class to store a list of space-discrete trajectories""" def __init__(self, trajectories=None, verbose=False, dtype="int32", discrete=True, lag_time=1, **kwargs): """Initialize an object of Discrete Ensemble""" super().__init__(trajectories, verbose, dtype, discrete, lag_time, **kwargs) if (self.n_variables != 1) and (self.n_variables != 0): raise Exception( "A discrete trajectory must have a one-dimensional \ index/variable unless is empty" ) self.n_variables = 1 # by definition
[docs] @classmethod def from_ensemble(cls, ens, map_function=None, dtype="int32"): """Build a DiscreteEnsemble from an ensemble object or a set of trajectories""" # TODO: Check the case when ens is a list of trajs if map_function is None: raise ValueError("A map function has to be given as argument") discrete_trajs_list = [] if isinstance(ens, Ensemble): # it is an Ensemble object for traj in ens.trajectories: d_traj = np.array([], dtype=dtype) for snapshot in traj: d_traj = np.append(d_traj, np.array([map_function(snapshot)]), axis=0) discrete_trajs_list.append(d_traj) return cls(discrete_trajs_list) else: # it is a list of trajectories d_traj = [] for traj in ens: d_traj += [map_function(snapshot)] d_traj = np.array(d_traj, dtype=dtype) return cls([d_traj])
[docs] @classmethod def from_transition_matrix(cls, transition_matrix, sim_length=None, initial_state=0): """Generates a DiscreteEnsemble from the transition matrix""" if sim_length is None: raise Exception("The simulation length must be given") if not isinstance(transition_matrix, np.ndarray): transition_matrix = np.array(transition_matrix) n_states = len(transition_matrix) assert n_states == len(transition_matrix[0]) current_state = initial_state discrete_traj = [initial_state] for i in range(sim_length): next_state = weighted_choice([k for k in range(n_states)], transition_matrix[current_state, :]) discrete_traj.append(next_state) current_state = next_state return cls([np.array(discrete_traj)])
[docs]class DiscretePathEnsemble(PathEnsemble, DiscreteEnsemble): """Derive a class to store a list of discrete trajectories for path analysis""" def __init__( self, trajectories=None, verbose=False, dtype="int32", discrete=True, lag_time=1, stateA=None, stateB=None, **kwargs ): super().__init__(trajectories, verbose, dtype, discrete, lag_time, stateA, stateB, **kwargs)
[docs] @classmethod def from_transition_matrix( cls, transition_matrix, stateA=None, stateB=None, n_paths=1000, ini_pops=None, max_iters=1000000000, ): """ Construct a path ensemble from a transition matrix stateA: list, intitial state stateB: list, final state n_paths: integer, with default value of 1000 number of paths to generate ini_pops: list or label, probability distribution over the initial state used to generate the path possible values: ---------------- a) None Use a uniform distribution over the states in stateA c) list A list with the explicit values of the populations in stateA that should be used to generate the ensemble max_iters: integer, with default value of 1000000000 maximum of iterations for generating path trajectories """ if ini_pops is None: ini_pops = [1 / float(len(stateA)) for i in range(len(stateA))] elif ini_pops == "ss": raise NotImplementedError("Sorry: not yet implemented") n_states = len(transition_matrix) assert n_states == len(transition_matrix[0]) d_trajectories = [] for i in range(n_paths): current_state = weighted_choice(stateA, ini_pops) # Initial state path = [current_state] for j in range(max_iters): next_state = weighted_choice([k for k in range(n_states)], transition_matrix[current_state, :]) path += [next_state] current_state = next_state if j + 1 == max_iters: print("\nWARNING: max iteration reached when generating " "the path ensemble, consider to increase max_iters") if current_state in stateB: break path = np.array(path) d_trajectories.append(path) return cls(d_trajectories, stateA=stateA, stateB=stateB)
[docs] @classmethod def from_ensemble(cls, ensemble, stateA, stateB, map_function=None): """Build a DiscreteEnsemble from an ensemble object or a set of trajectories""" ens = PathEnsemble.from_ensemble(ensemble, stateA, stateB, map_function, discrete=True, dtype="int32") return cls(ens.trajectories, stateA=stateA, stateB=stateB)
[docs] def nm_mfpt(self, ini_probs=None, n_states=None): """Compute the mean-first passage time from the transition matrix""" t_matrix = self._mle_transition_matrix(n_states) ini_state = list(self.stateA) final_state = sorted(list(self.stateB)) return NonMarkovFPT.directional_mfpt(t_matrix, ini_state, final_state, ini_probs)
def _fundamental_sequences(self, transition_matrix, symmetric=True): """Divide/classify the path ensemble into fundamental sequences""" fundamental_seqs = [] for path in self.trajectories: if symmetric: cmatrix = self._connectivity_matrix(path, transition_matrix * transition_matrix.T) else: cmatrix = self._connectivity_matrix(path, transition_matrix) path_graph = self._graph_from_matrix(cmatrix) shortest_path = nx.dijkstra_path(path_graph, path[0], path[-1], "distance") fundamental_seqs.append(shortest_path) return fundamental_seqs
[docs] def weighted_fundamental_sequences(self, transition_matrix=None, symmetric=True): """Generate sorted fundamental sequences with weights""" fs_list = self._fundamental_sequences(transition_matrix, symmetric) element_count = {} tot_count = 0 for element in fs_list: pseudo_index = tuple(element) tot_count += 1 if pseudo_index not in element_count: element_count[pseudo_index] = 1 else: element_count[pseudo_index] += 1 weights = [] new_fs_list = [] for key, value in element_count.items(): new_fs_list.append(key) weights.append(value / float(tot_count)) reversed_sorted_weights, reversed_sorted_new_fs_list = reverse_sort_lists(weights, new_fs_list) return reversed_sorted_new_fs_list, reversed_sorted_weights, tot_count
@staticmethod def _graph_from_matrix(matrix): """Builds a directed Graph from a matrix like a transition matrix""" size = len(matrix) assert size == len(matrix[0]) matrix = np.array(matrix) G = nx.DiGraph() for node in range(size): G.add_node(node) for i in range(size): for j in range(size): if (i != j) and (matrix[i, j] != 0.0): G.add_edge(i, j, distance=-log(matrix[i, j])) return G @staticmethod def _connectivity_matrix(path, matrix): """From a given path and a matrix construct a connectivity matrix whose elements ij are zero if the transition i->j is not observed in the path or (i=j), while keep the rest of the elements in the input matrix. This way, from the connectivity matrix we could later create a graph that represents the path, being the "distance" between nodes equal to -log(Tij) Tij --> i,j element in the transition matrix the path must be 1D array of indexes """ matrix = np.array(matrix) path = np.array(path, dtype="int32") n_states = len(matrix) assert n_states == len(matrix[0]) c_matrix = np.zeros((n_states, n_states)) for i in range(len(path) - 1): c_matrix[path[i], path[i + 1]] = matrix[path[i], path[i + 1]] return c_matrix