Source code for msm_we.nmm

"""
Adapted from the original NMpathAnalysis package,
https://github.com/ZuckermanLab/NMpathAnalysis
"""
import numpy as np

from msm_we.fpt import DirectFPT, MarkovFPT, NonMarkovFPT
from msm_we.ensembles import DiscreteEnsemble, DiscretePathEnsemble
from msm_we.utils import map_to_integers, normalize_markov_matrix
from msm_we.utils import pops_from_nm_tmatrix, pops_from_tmatrix
from msm_we.utils import pseudo_nm_tmatrix, weighted_choice


[docs]class NonMarkovModel(DiscreteEnsemble): """Define a class for analyzing MD trajectories using Markovian or non-Markovian Model from a list of 1D trajectories of integers representing macrostates For example: trajectories = [ [1 , 2, 0, ...], [2, 2, 1, ...], [3, 1, 2, ...], ...] If only one sequence is given in trajectories, the format is the same: trajectories = [ [1 , 2, 0, ...] ] Parameters ---------- lag_time (integer, default: 1) Lag time of the model. sliding_window (boolean) Use a sliding window of length lag_time to compute the count matrix stateA, stateB (python lists) Define the initial and final macrostates in form of python lists for example: stateA=[0,2,5], stateB = [1] Attributes ---------- n_states : int nm_cmatrix: array, with shape (2 n_states, 2 n_states) Stores the number of transitions between states, the i,j element cij stores the number of transitions observed from i to j. populations: array, shape (n_states,) Equilibrium population, the steady state solution of of the transition matrix """ def __init__( self, trajectories, stateA, stateB, lag_time=1, clean_traj=False, sliding_window=True, reversible=True, markovian=False, coarse_macrostates=False, **kwargs ): """Initialize an object for Non Markovian Model Class""" if coarse_macrostates: for traj in trajectories: for i, _ in enumerate(traj): if traj[i] in stateA: traj[i] = stateA[0] elif traj[i] in stateB: traj[i] = stateB[0] stateA = [stateA[0]] stateB = [stateB[0]] self._lag_time = lag_time self.trajectories = trajectories self.stateA = stateA self.stateB = stateB self.sliding_window = sliding_window self.reversible = reversible self.markovian = markovian self.n_variables = 1 # by construction self.discrete = True # by construction if (self._lag_time < 1) or (int(self._lag_time) != int(self._lag_time)): raise ValueError( "The lag time should be an integer \ greater than 1" ) if clean_traj: self.n_states = max([max(traj) for traj in self.trajectories]) + 1 else: self._map_trajectories_to_integers() self.fit() def _map_trajectories_to_integers(self): # Clean the sequences seq_map = {} new_trajs = [] for seq in self.trajectories: newseq, m_dict = map_to_integers(seq, seq_map) new_trajs.append(newseq) self.stateA = [seq_map[i] for i in self.stateA] self.stateB = [seq_map[i] for i in self.stateB] self.n_states = len(seq_map) self.trajectories = new_trajs self.seq_map = seq_map
[docs] def fit(self): """Fits the non-Markovian model from a list of sequences""" # Non-Markovian count matrix nm_cmatrix = np.zeros((2 * self.n_states, 2 * self.n_states)) # Markovian count matrix markov_cmatrix = np.zeros((self.n_states, self.n_states)) lag = self._lag_time if not self.sliding_window: step = lag else: step = 1 for traj in self.trajectories: for start in range(lag, 2 * lag, step): prev_color = None for i in range(start, len(traj), lag): # Color determination if traj[i] in self.stateA: color = "A" elif traj[i] in self.stateB: color = "B" else: color = prev_color # Count matrix for the given lag time if prev_color == "A" and color == "B": nm_cmatrix[2 * traj[i - lag], 2 * traj[i] + 1] += 1.0 elif prev_color == "B" and color == "A": nm_cmatrix[2 * traj[i - lag] + 1, 2 * traj[i]] += 1.0 elif prev_color == "A" and color == "A": nm_cmatrix[2 * traj[i - lag], 2 * traj[i]] += 1.0 elif prev_color == "B" and color == "B": nm_cmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] += 1.0 prev_color = color markov_cmatrix[traj[i - lag], traj[i]] += 1.0 nm_tmatrix = normalize_markov_matrix(nm_cmatrix) markov_tmatrix = normalize_markov_matrix(markov_cmatrix, reversible=True) self.nm_tmatrix = nm_tmatrix self.nm_cmatrix = nm_cmatrix self.markov_cmatrix = markov_cmatrix self.markov_tmatrix = markov_tmatrix
[docs] @classmethod def from_nm_tmatrix(cls, transition_matrix, stateA, stateB, sim_length=None, initial_state=0): """Generates a discrete ensemble from the transition matrix""" if sim_length is None: raise Exception("The simulation length must be given") if not isinstance(transition_matrix, np.ndarray): transition_matrix = np.array(transition_matrix) n_states = len(transition_matrix) assert n_states == len(transition_matrix[0]) current_state = initial_state discrete_traj = [initial_state // 2] for i in range(sim_length): next_state = weighted_choice([k for k in range(n_states)], transition_matrix[current_state, :]) discrete_traj.append(next_state // 2) current_state = next_state return cls([np.array(discrete_traj)], stateA, stateB, clean_traj=True)
@property def lag_time(self): return self._lag_time @lag_time.setter def lag_time(self, lag_time): self._lag_time = lag_time self.fit() def mfpts(self): if self.markovian: return MarkovFPT.mean_fpts(self.markov_tmatrix, self.stateA, self.stateB, lag_time=self._lag_time) else: return NonMarkovFPT.mean_fpts(self.nm_tmatrix, self.stateA, self.stateB, lag_time=self._lag_time)
[docs] def empirical_mfpts(self): return DirectFPT.mean_fpts(self.trajectories, self.stateA, self.stateB, lag_time=self._lag_time)
def empirical_fpts(self): return DirectFPT.fpts(self.trajectories, self.stateA, self.stateB, lag_time=self._lag_time)
[docs] def populations(self): # In this case the results are going to be the same if self.markovian: return pops_from_tmatrix(self.markov_tmatrix) else: return pops_from_nm_tmatrix(self.nm_tmatrix)
@property def popA(self): pop_A = 0 pops = self.populations() for i, p in enumerate(pops): if i in self.stateA: pop_A += p return pop_A @property def popB(self): pop_B = 0 pops = self.populations() for i, p in enumerate(pops): if i in self.stateB: pop_B += p return pop_B def tmatrixAB(self): if self.markovian: return self.markov_tmatrix matrixAB = [] for i in range(0, 2 * self.n_states, 2): for j in range(0, 2 * self.n_states, 2): if (i // 2 in self.stateB) and not (j // 2 in self.stateB): matrixAB.append(0.0) elif (i // 2 in self.stateB) and (j // 2 in self.stateB): if i // 2 == j // 2: matrixAB.append(1.0) else: matrixAB.append(0.0) elif not (i // 2 in self.stateB) and (j // 2 in self.stateB): matrixAB.append(self.nm_tmatrix[i, j + 1]) else: matrixAB.append(self.nm_tmatrix[i, j]) matrixAB = np.array(matrixAB) matrixAB = matrixAB.reshape((self.n_states, self.n_states)) return matrixAB def tmatrixBA(self): if self.markovian: return self.markov_tmatrix matrixBA = [] for i in range(1, 2 * self.n_states + 1, 2): for j in range(1, 2 * self.n_states + 1, 2): if (i // 2 in self.stateA) and not (j // 2 in self.stateA): matrixBA.append(0.0) elif (i // 2 in self.stateA) and (j // 2 in self.stateA): if i // 2 == j // 2: matrixBA.append(1.0) else: matrixBA.append(0.0) elif not (i // 2 in self.stateA) and (j // 2 in self.stateA): matrixBA.append(self.nm_tmatrix[i, j - 1]) else: matrixBA.append(self.nm_tmatrix[i, j]) matrixBA = np.array(matrixBA) matrixBA = matrixBA.reshape((self.n_states, self.n_states)) return matrixBA def fluxAB_distribution_on_B(self): if self.markovian: t_matrix = pseudo_nm_tmatrix(self.markov_tmatrix, self.stateA, self.stateB) else: t_matrix = self.nm_tmatrix distrib_on_B = np.zeros(len(self.stateB)) labeled_pops = pops_from_tmatrix(t_matrix) for i in range(0, 2 * self.n_states, 2): for j in range(2 * self.n_states): if j // 2 in self.stateB: distrib_on_B[self.stateB.index(j // 2)] += labeled_pops[i] * t_matrix[i, j] return distrib_on_B def fluxBA_distribution_on_A(self): if self.markovian: t_matrix = pseudo_nm_tmatrix(self.markov_tmatrix, self.stateA, self.stateB) else: t_matrix = self.nm_tmatrix distrib_on_A = np.zeros(len(self.stateA)) labeled_pops = pops_from_tmatrix(t_matrix) for i in range(1, 2 * self.n_states + 1, 2): for j in range(2 * self.n_states): if j // 2 in self.stateA: distrib_on_A[self.stateA.index(j // 2)] += labeled_pops[i] * t_matrix[i, j] return distrib_on_A def fpt_distrib_AB(self, max_x=1000, dt=1): return MarkovFPT.fpt_distribution( self.tmatrixAB(), self.stateA, self.stateB, self.fluxBA_distribution_on_A(), max_n_lags=max_x, lag_time=self._lag_time, dt=dt, ) def fpt_distrib_BA(self, max_x=1000, dt=1): return MarkovFPT.fpt_distribution( self.tmatrixBA(), self.stateB, self.stateA, self.fluxAB_distribution_on_B(), max_n_lags=max_x, lag_time=self._lag_time, dt=dt, )
[docs] def corr_function(self, times): """Compute the correlation function for a set of times. Parameters ---------- times (list of integers): List of dt values used to compute the correlation function. Returns ------- List of floats with the correlation values for the dt given in times """ pAA = [] pAB = [] pBA = [] pBB = [] t_matrix = self.markov_tmatrix if self.markovian else self.nm_tmatrix tot_n_states = self.n_states if self.markovian else (2 * self.n_states) for dt in times: if dt % self.lag_time != 0: raise ValueError("The times given should be " "multiple of the lag time") n = int(dt / self.lag_time) pops_eq = self.populations() t_matrixT_to_n = np.linalg.matrix_power(t_matrix.T, n) popsA_to_propagate = np.zeros(tot_n_states) popsB_to_propagate = np.zeros(tot_n_states) if self.markovian: for index in self.stateA: popsA_to_propagate[index] = pops_eq[index] for index in self.stateB: popsB_to_propagate[index] = pops_eq[index] final_dist_from_A = np.dot(t_matrixT_to_n, popsA_to_propagate) final_dist_from_B = np.dot(t_matrixT_to_n, popsB_to_propagate) pAA.append(sum([final_dist_from_A[i] for i in self.stateA])) pBB.append(sum([final_dist_from_B[i] for i in self.stateB])) pAB.append(sum([final_dist_from_B[i] for i in self.stateA])) pBA.append(sum([final_dist_from_A[i] for i in self.stateB])) else: for index in self.stateA: popsA_to_propagate[2 * index] = pops_eq[index] for index in self.stateB: popsB_to_propagate[2 * index + 1] = pops_eq[index] final_dist_from_A = np.dot(t_matrixT_to_n, popsA_to_propagate) final_dist_from_B = np.dot(t_matrixT_to_n, popsB_to_propagate) pAA.append(sum([final_dist_from_A[2 * i] for i in self.stateA])) pBB.append(sum([final_dist_from_B[2 * i + 1] for i in self.stateB])) pAB.append(sum([final_dist_from_B[2 * i] for i in self.stateA])) pBA.append(sum([final_dist_from_A[2 * i + 1] for i in self.stateB])) return pAA, pAB, pBA, pBB
def empirical_weighted_FS(self, tmatrix_for_classification=None, symmetric=True): if tmatrix_for_classification is None: tmatrix_for_classification = self.markov_tmatrix ens = DiscretePathEnsemble.from_ensemble(self, self.stateA, self.stateB) return ens.weighted_fundamental_sequences(tmatrix_for_classification, symmetric) def weighted_FS(self, tmatrix_for_classification=None, n_paths=1000, symmetric=True): if tmatrix_for_classification is None: tmatrix_for_classification = self.markov_tmatrix if self.markovian: tmatrix_to_generate_paths = self.markov_tmatrix else: tmatrix_to_generate_paths = self.tmatrixAB() ens = DiscretePathEnsemble.from_transition_matrix(tmatrix_to_generate_paths, self.stateA, self.stateB, n_paths) return ens.weighted_fundamental_sequences(tmatrix_for_classification, symmetric)
[docs]class MarkovPlusColorModel(NonMarkovModel): """Define a class for analyzing MD trajectories using Markovian Plus Color Model""" def __init__(self, trajectories, stateA, stateB, lag_time=1, clean_traj=False, sliding_window=True, hist_length=0, **kwargs): self.hist_length = hist_length super().__init__(trajectories, stateA, stateB, lag_time, clean_traj, sliding_window, **kwargs)
[docs] def fit(self): """Fits the markov plus color model from a list of sequences""" # Non-Markovian count matrix nm_tmatrix = np.zeros((2 * self.n_states, 2 * self.n_states)) # Markovian transition matrix markov_tmatrix = np.zeros((self.n_states, self.n_states)) start = self._lag_time step = 1 lag = self._lag_time hlength = self.hist_length if not self.sliding_window: step = lag # Markov first for traj in self.trajectories: for i in range(start, len(traj), step): markov_tmatrix[traj[i - lag], traj[i]] += 1.0 # counting markov_tmatrix = markov_tmatrix + markov_tmatrix.T markov_tmatrix = normalize_markov_matrix(markov_tmatrix) p_nm_tmatrix = pseudo_nm_tmatrix(markov_tmatrix, self.stateA, self.stateB) pops = pops_from_tmatrix(p_nm_tmatrix) # Pseudo-Markov Flux matrix fmatrix = p_nm_tmatrix for i, _ in enumerate(fmatrix): fmatrix[i] *= pops[i] for traj in self.trajectories: for i in range(start, len(traj), step): # Previous color determination (index i - lag) prev_color = "U" for k in range(i - lag, max(i - lag - hlength, 0) - 1, -1): if traj[k] in self.stateA: prev_color = "A" break elif traj[k] in self.stateB: prev_color = "B" break # Current Color (in index i) if traj[i] in self.stateA: color = "A" elif traj[i] in self.stateB: color = "B" else: color = prev_color if prev_color == "A" and color == "B": nm_tmatrix[2 * traj[i - lag], 2 * traj[i] + 1] += 1.0 elif prev_color == "B" and color == "A": nm_tmatrix[2 * traj[i - lag] + 1, 2 * traj[i]] += 1.0 elif prev_color == "A" and color == "A": nm_tmatrix[2 * traj[i - lag], 2 * traj[i]] += 1.0 elif prev_color == "B" and color == "B": nm_tmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] += 1.0 elif prev_color == "U" and color == "B": temp_sum = fmatrix[2 * traj[i - lag], 2 * traj[i] + 1] + fmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] nm_tmatrix[2 * traj[i - lag], 2 * traj[i] + 1] += fmatrix[2 * traj[i - lag], 2 * traj[i] + 1] / temp_sum nm_tmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] += fmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] / temp_sum elif prev_color == "U" and color == "A": temp_sum = fmatrix[2 * traj[i - lag], 2 * traj[i]] + fmatrix[2 * traj[i - lag] + 1, 2 * traj[i]] nm_tmatrix[2 * traj[i - lag]][2 * traj[i]] += fmatrix[2 * traj[i - lag], 2 * traj[i]] / temp_sum nm_tmatrix[2 * traj[i - lag] + 1][2 * traj[i]] += fmatrix[2 * traj[i - lag] + 1, 2 * traj[i]] / temp_sum elif prev_color == "U" and color == "U": temp_sum = ( fmatrix[2 * traj[i - lag], 2 * traj[i] + 1] + fmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] + fmatrix[2 * traj[i - lag], 2 * traj[i]] + fmatrix[2 * traj[i - lag] + 1, 2 * traj[i]] ) nm_tmatrix[2 * traj[i - lag], 2 * traj[i] + 1] += fmatrix[2 * traj[i - lag], 2 * traj[i] + 1] / temp_sum nm_tmatrix[2 * traj[i - lag] + 1][2 * traj[i] + 1] += fmatrix[2 * traj[i - lag] + 1, 2 * traj[i] + 1] / temp_sum nm_tmatrix[2 * traj[i - lag]][2 * traj[i]] += fmatrix[2 * traj[i - lag], 2 * traj[i]] / temp_sum nm_tmatrix[2 * traj[i - lag] + 1][2 * traj[i]] += fmatrix[2 * traj[i - lag] + 1, 2 * traj[i]] / temp_sum self.nm_cmatrix = nm_tmatrix # not normalized, it is like count matrix nm_tmatrix = normalize_markov_matrix(nm_tmatrix) self.nm_tmatrix = nm_tmatrix self.markov_tmatrix = markov_tmatrix
def populations(self): return NotImplementedError( "You should use a regular Markov model or " "a non-Markovian model for estimating " "populations" )