Spaces:
Runtime error
Runtime error
| import numpy as np | |
| from scipy.sparse.csgraph import connected_components | |
| from scipy.special import softmax | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def degree_centrality_scores( | |
| similarity_matrix, | |
| threshold=None, | |
| increase_power=True, | |
| ): | |
| if not ( | |
| threshold is None | |
| or isinstance(threshold, float) | |
| and 0 <= threshold < 1 | |
| ): | |
| raise ValueError( | |
| '\'threshold\' should be a floating-point number ' | |
| 'from the interval [0, 1) or None', | |
| ) | |
| if threshold is None: | |
| markov_matrix = create_markov_matrix(similarity_matrix) | |
| else: | |
| markov_matrix = create_markov_matrix_discrete( | |
| similarity_matrix, | |
| threshold, | |
| ) | |
| scores = stationary_distribution( | |
| markov_matrix, | |
| increase_power=increase_power, | |
| normalized=False, | |
| ) | |
| return scores | |
| def _power_method(transition_matrix, increase_power=True, max_iter=10000): | |
| eigenvector = np.ones(len(transition_matrix)) | |
| if len(eigenvector) == 1: | |
| return eigenvector | |
| transition = transition_matrix.transpose() | |
| for _ in range(max_iter): | |
| eigenvector_next = np.dot(transition, eigenvector) | |
| if np.allclose(eigenvector_next, eigenvector): | |
| return eigenvector_next | |
| eigenvector = eigenvector_next | |
| if increase_power: | |
| transition = np.dot(transition, transition) | |
| logger.warning("Maximum number of iterations for power method exceeded without convergence!") | |
| return eigenvector_next | |
| def connected_nodes(matrix): | |
| _, labels = connected_components(matrix) | |
| groups = [] | |
| for tag in np.unique(labels): | |
| group = np.where(labels == tag)[0] | |
| groups.append(group) | |
| return groups | |
| def create_markov_matrix(weights_matrix): | |
| n_1, n_2 = weights_matrix.shape | |
| if n_1 != n_2: | |
| raise ValueError('\'weights_matrix\' should be square') | |
| row_sum = weights_matrix.sum(axis=1, keepdims=True) | |
| # normalize probability distribution differently if we have negative transition values | |
| if np.min(weights_matrix) <= 0: | |
| return softmax(weights_matrix, axis=1) | |
| return weights_matrix / row_sum | |
| def create_markov_matrix_discrete(weights_matrix, threshold): | |
| discrete_weights_matrix = np.zeros(weights_matrix.shape) | |
| ixs = np.where(weights_matrix >= threshold) | |
| discrete_weights_matrix[ixs] = 1 | |
| return create_markov_matrix(discrete_weights_matrix) | |
| def stationary_distribution( | |
| transition_matrix, | |
| increase_power=True, | |
| normalized=True, | |
| ): | |
| n_1, n_2 = transition_matrix.shape | |
| if n_1 != n_2: | |
| raise ValueError('\'transition_matrix\' should be square') | |
| distribution = np.zeros(n_1) | |
| grouped_indices = connected_nodes(transition_matrix) | |
| for group in grouped_indices: | |
| t_matrix = transition_matrix[np.ix_(group, group)] | |
| eigenvector = _power_method(t_matrix, increase_power=increase_power) | |
| distribution[group] = eigenvector | |
| if normalized: | |
| distribution /= n_1 | |
| return distribution | |