Source code for msm_we.utils

"""
Adapted from the original NMpathAnalysis package,
https://github.com/ZuckermanLab/NMpathAnalysis
"""
from copy import deepcopy
import numpy as np
import operator


[docs]class Interval: """Intervals are in general defined as half-open interval [start,end). in any case, in each dimension the interval is specified using a list [a,b] where a < b - For 1D (single) interval a single list in the form [a,b] has to be given - The union of multiple (1D) intervals can be specified as: [[a,b],[c,d],...] - A list of lists [[a,b],[c,d],...] are used for a n-dimensional intervals, one for each dimension, i.e, len(interval) = n_variables - A list-of-lists-of-lists for the mathematical union of n-dimensional intervals' [ [[a,b],[c,d],...], [[e,f],[g,h],...], ... ] """ def __init__(self, interval_set, n_variables): self.interval_set = interval_set self.n_variables = n_variables def __contains__(self, item): shape = np.array(self.interval_set).shape len_shape = len(shape) if (self.n_variables == 1) and (len_shape == 1): # single 1D interval return self.interval_set[0] <= item < self.interval_set[1] elif (self.n_variables == 1) and (len_shape == 2): # union of multiple 1D intervals return any([(item in Interval(self.interval_set[i], 1)) for i in range(shape[0])]) elif (self.n_variables > 1) and len_shape == 2: # n-dimensional interval return all([(item[i] in Interval(self.interval_set[i], 1)) for i in range(shape[0])]) elif len(shape) == 3: # union of n-dimensional intervals return any([(item in Interval(self.interval_set[i], self.n_variables)) for i in range(shape[0])]) else: raise Exception("The given interval has not the expected shape")
[docs]def reverse_sort_lists(list_1, list_2): """Reverse sorting two list based on the first one""" list_1_sorted, list_2_sorted = zip(*sorted(zip(list_1, list_2), key=operator.itemgetter(0), reverse=True)) return list_1_sorted, list_2_sorted
[docs]def weighted_choice(list_, weights=None): """Select an element from a list with probability from weights""" size = len(list_) if weights is not None: assert size == len(weights) if weights is None: probs = np.array([1 / float(size) for i in range(size)]) else: probs = np.array(weights) / sum(weights) # just in case rand = np.random.random() _sum = 0 for i in range(size): if _sum <= rand < _sum + probs[i]: choice = i break else: _sum += probs[i] return list_[choice]
[docs]def get_shape(trajectory): """Get the shape of a trajectory array in tuple (n_snapshots, n_variables)""" shape = np.array(trajectory).shape if len(shape) == 1: n_snapshots = shape[0] n_variables = 1 if n_variables == 0: raise Exception( "The shape {} of the trajectory/array \ given is not as expected".format( shape ) ) elif len(shape) == 2: n_snapshots = shape[0] n_variables = shape[1] else: raise Exception( "The shape {} of the trajectory/array given is not \ as expected".format( shape ) ) return n_snapshots, n_variables
[docs]def num_of_nonzero_elements(my_vector): """Returns the number of non-zero elements in a vector""" counter = 0 for element in my_vector: if element != 0: counter += 1 return counter
[docs]def normalize_markov_matrix(transition_matrix, reversible=False): """Transform a matrix of positive elements to a markov-like matrix by dividing each row by the sum of the elements of the row. """ t_matrix = np.array(transition_matrix, dtype=np.float64) if reversible: t_matrix = t_matrix.T + t_matrix n_states = len(t_matrix) assert n_states == len(t_matrix[0]) for i in range(n_states): if (t_matrix[i, :] < 0).any(): raise ValueError( "All the elements in the input \ matrix must be non-negative" ) t_matrix[i, :] = normalize(t_matrix[i, :]) return t_matrix
[docs]def normalize(my_vector): """Normalize a vector by dividing each element by the total sum of all its elements """ my_vector = np.array(my_vector) size = len(my_vector) sum_ = sum(my_vector) if sum_ != 0.0: for i in range(size): my_vector[i] = my_vector[i] / sum_ return my_vector
[docs]def random_markov_matrix(n_states=5, seed=None): """Returns a random transition markov matrix""" if seed is not None: np.random.seed(seed) t_matrix = np.random.random((n_states, n_states)) return normalize_markov_matrix(t_matrix)
[docs]def check_tmatrix(t_matrix, accept_null_rows=True): """Check if the given matrix is actually a row-stochastic transition matrix i.e, all the elements are non-negative and the rows add to one. If the keyword argument accept_null_rows is True, is going to accept rows where all the elements are zero. Those "problematic" states are going to be removed later if necessary by clean_tmatrix. """ def value_error(): raise ValueError("The object given is not a transition matrix") n_states = len(t_matrix) if not (n_states == len(t_matrix[0])): value_error() for index, row in enumerate(t_matrix): sum_ = 0.0 for element in row: if element < 0.0: value_error() sum_ += element if accept_null_rows: if not (np.isclose(sum_, 1.0, atol=1e-6) or sum_ == 0.0): value_error() else: if not np.isclose(sum_, 1.0, atol=1e-6): value_error() return False
[docs]def clean_tmatrix(transition_matrix, rm_absorbing=True): """Removes the states/indexes with no transitions and that are absorbing if the the keyword argument rm_absorbing is true Returns the "clean" transition matrix and a list with the removed states/indexes (clean_tmatrix, removed_states) """ t_matrix = deepcopy(transition_matrix) n_states = len(transition_matrix) # Removing the non-visited states and absorbing states removed_states = [] for index in range(n_states - 1, -1, -1): if not any(t_matrix[index]): # non-visited t_matrix = np.delete(t_matrix, index, axis=1) t_matrix = np.delete(t_matrix, index, axis=0) removed_states.append(index) elif t_matrix[index, index] == 1.0: # absorbing state if not all([t_matrix[index, j] == 0.0 for j in range(n_states) if j != index]): raise ValueError( "The sum of the elements in a row of the \ transition matrix must be one" ) t_matrix = np.delete(t_matrix, index, axis=1) t_matrix = np.delete(t_matrix, index, axis=0) removed_states.append(index) # Renormalizing just in case t_matrix = normalize_markov_matrix(t_matrix) return t_matrix, removed_states
[docs]def pops_from_tmatrix(transition_matrix): """Calculate the eigen values and eigen vectors of the transposed transition matrix Parameters ---------- transition_matrix : ndarray with shape = (n_states, n_states) Returns ------- the solution, p, of K.T p = p where K.T is the transposed transition matrix """ check_tmatrix(transition_matrix) n_states = len(transition_matrix) # Cleaning the transition matrix cleaned_matrix, removed_states = clean_tmatrix(transition_matrix) # Computing eig_vals, eig_vecs = np.linalg.eig(cleaned_matrix.T) eig_vecs = eig_vecs.T # for convenience, now every row is an eig_vector eig_vals_close_to_one = np.isclose(eig_vals, 1.0, atol=1e-6) real_eig_vecs = [not np.iscomplex(row).any() for row in eig_vecs] new_n_states = n_states - len(removed_states) ss_solution = np.zeros(new_n_states) # steady-state solution for is_close_to_one, is_real, eigv in zip(eig_vals_close_to_one, real_eig_vecs, eig_vecs): if ( is_close_to_one and is_real and num_of_nonzero_elements(eigv) > num_of_nonzero_elements(ss_solution) and ((eigv <= 0).all() or (eigv >= 0).all()) ): ss_solution = eigv if (ss_solution == 0.0).all(): raise Exception( "No steady-state solution found for \ the given transition matrix" ) ss_solution = normalize(ss_solution).real # Now we have to insert back in the solution, the missing # elements with zero probabilities for index in sorted(removed_states): ss_solution = np.insert(ss_solution, index, 0.0) return ss_solution
[docs]def pops_from_nm_tmatrix(transition_matrix): """Computes the populations of the real/physical states from a non-Markovian transtion matrix with shape (2*n_states, 2*n_states) """ check_tmatrix(transition_matrix, accept_null_rows=True) size = len(transition_matrix) if size % 2 != 0: raise ValueError("The non-Markovian transition matrix has to " "have an even number of columns/rows") n_states = size // 2 # Real/physical microstates pops_nm = pops_from_tmatrix(transition_matrix) pops = np.zeros(n_states) for i in range(n_states): pops[i] = pops_nm[2 * i] + pops_nm[2 * i + 1] return pops
[docs]def map_to_integers(sequence, mapping_dict=None): """Map a sequence of elements to a sequence of integers for intance, maps [1, 'a', 1, 'b', 2.2] to [0, 1, 0, 2, 3] """ if mapping_dict is None: mapping_dict = {} new_sequence = np.zeros(len(sequence), dtype="int64") counter = 0 for i, element in enumerate(sequence): if element not in mapping_dict.keys(): mapping_dict[element] = counter counter += 1 new_sequence[i] = mapping_dict[element] return new_sequence, mapping_dict
[docs]def pseudo_nm_tmatrix(markovian_tmatrix, stateA, stateB): """Obtain a pseudo non-Markovian transition matrix from a Markovian transiton matrix The pseudo Markovian matrix has a shape of (2 n_states, 2 n_states) """ check_tmatrix(markovian_tmatrix) n_states = len(markovian_tmatrix) # pseudo non-Markovian transition matrix p_nm_tmatrix = np.zeros((2 * n_states, 2 * n_states)) for i in range(2 * n_states): for j in range(2 * n_states): p_nm_tmatrix[i, j] = markovian_tmatrix[i // 2, j // 2] for i in range(n_states): for j in range(n_states): if (i in stateB) or (j in stateB): p_nm_tmatrix[2 * i, 2 * j] = 0.0 if (i in stateA) or (j in stateA): p_nm_tmatrix[2 * i + 1, 2 * j + 1] = 0.0 if (not (j in stateA)) or (i in stateA): p_nm_tmatrix[2 * i + 1, 2 * j] = 0.0 if (not (j in stateB)) or (i in stateB): p_nm_tmatrix[2 * i, 2 * j + 1] = 0.0 check_tmatrix(p_nm_tmatrix) # just in case return p_nm_tmatrix