Spaces:
Runtime error
Runtime error
| # Copyright 2021 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import json | |
| import logging | |
| import numpy as np | |
| import os | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import powerlaw | |
| from os.path import join as pjoin | |
| import utils | |
| from scipy.stats import ks_2samp | |
| from scipy.stats import zipf as zipf_lib | |
| # treating inf values as NaN as well | |
| pd.set_option("use_inf_as_na", True) | |
| logs = utils.prepare_logging(__file__) | |
| class Zipf: | |
| def __init__(self, vocab_counts_df, count_str="count", | |
| proportion_str="prop"): | |
| self.vocab_counts_df = vocab_counts_df | |
| # Strings used in the input dictionary | |
| self.cnt_str = count_str | |
| self.prop_str = proportion_str | |
| self.alpha = None | |
| self.xmin = None | |
| self.xmax = None | |
| self.p = None | |
| self.ks_distance = None | |
| self.observed_counts = None | |
| self.word_counts_unique = None | |
| self.word_ranks_unique = None | |
| if self.vocab_counts_df is not None: | |
| self.observed_counts = self.vocab_counts_df[self.cnt_str].values | |
| self.word_counts_unique = list(set(self.observed_counts)) | |
| self.word_ranks_unique = list( | |
| np.arange(1, len(self.word_counts_unique) + 1)) | |
| self.zipf_dict = {"xmin": None, "xmax": None, "alpha": None, | |
| "ks_distance": None, "p-value": None, | |
| "word_ranks_unique": self.word_ranks_unique, | |
| "word_counts_unique": self.word_counts_unique} | |
| self.fit = None | |
| self.predicted_counts = None | |
| def load(self, zipf_dict): | |
| self.zipf_dict = zipf_dict | |
| self.xmin = zipf_dict["xmin"] | |
| self.xmax = zipf_dict["xmax"] | |
| self.alpha = zipf_dict["alpha"] | |
| self.ks_distance = zipf_dict["ks_distance"] | |
| self.p = zipf_dict["p-value"] | |
| self.word_ranks_unique = zipf_dict["word_ranks_unique"] | |
| self.word_counts_unique = zipf_dict["word_counts_unique"] | |
| def get_zipf_dict(self): | |
| zipf_dict = {"xmin": int(self.xmin), "xmax": int(self.xmax), | |
| "alpha": float(self.alpha), | |
| "ks_distance": float(self.ks_distance), | |
| "p-value": float(self.ks_test.pvalue), | |
| "word_counts_unique": [int(count) for count in | |
| self.word_counts_unique], | |
| "word_ranks_unique": [int(rank) for rank in | |
| self.word_ranks_unique]} | |
| return zipf_dict | |
| def calc_fit(self): | |
| """ | |
| Uses the powerlaw package to fit the observed frequencies | |
| to a zipfian distribution. | |
| We use the KS-distance to fit, as that seems more appropriate that MLE. | |
| """ | |
| logs.info("Fitting based on input vocab counts.") | |
| self._make_rank_column() | |
| # Note another method for determining alpha might be defined by | |
| # (Newman, 2005): alpha = 1 + n * sum(ln( xi / xmin )) ^ -1 | |
| self.fit = powerlaw.Fit(self.observed_counts, fit_method="KS", | |
| discrete=True) | |
| # This should probably be a pmf (not pdf); using discrete=True above. | |
| # original_data=False uses only the fitted data (within xmin and xmax). | |
| # pdf_bin_edges: The portion of the data within the bin. | |
| # observed_pdf: The probability density function (normalized histogram) | |
| # of the data. | |
| pdf_bin_edges, observed_pdf = self.fit.pdf(original_data=False) | |
| # See the 'Distribution' class described here for info: | |
| # https://pythonhosted.org/powerlaw/#powerlaw.Fit.pdf | |
| theoretical_distro = self.fit.power_law | |
| # The probability density function (normalized histogram) of the | |
| # theoretical distribution. | |
| predicted_pdf = theoretical_distro.pdf() | |
| self._set_fit_vars(observed_pdf, predicted_pdf, theoretical_distro) | |
| def _set_fit_vars(self, observed_pdf, predicted_pdf, theoretical_distro): | |
| # !!!! CRITICAL VALUE FOR ZIPF !!!! | |
| self.alpha = theoretical_distro.alpha | |
| # Exclusive xmin: The optimal xmin *beyond which* the scaling regime of | |
| # the power law fits best. | |
| self.xmin = int(theoretical_distro.xmin) | |
| self.xmax = theoretical_distro.xmax | |
| # Can be None if there isn't an xmax returned; | |
| # this handles that. | |
| self._set_xmax() | |
| self.ks_distance = theoretical_distro.KS() | |
| self.ks_test = ks_2samp(observed_pdf, predicted_pdf) | |
| self.p = self.ks_test[1] | |
| logs.info("KS test:") | |
| logs.info(self.ks_test) | |
| self.predicted_counts = self._calc_zipf_counts() | |
| def _make_rank_column(self): | |
| # TODO: These proportions may have already been calculated. | |
| prop_denom = float(sum(self.vocab_counts_df[self.cnt_str])) | |
| count_prop = self.vocab_counts_df[self.cnt_str] / prop_denom | |
| self.vocab_counts_df[self.prop_str] = count_prop | |
| rank_column = self.vocab_counts_df[self.cnt_str].rank( | |
| method="dense", numeric_only=True, ascending=False | |
| ) | |
| self.vocab_counts_df["rank"] = rank_column.astype("int64") | |
| def _calc_zipf_counts(self): | |
| """ | |
| The fit is based on an optimal xmin (minimum rank) | |
| Let's use this to make count estimates for the zipf fit, | |
| by multiplying the fitted pmf value by the sum of counts above xmin. | |
| :return: array of count values following the fitted pmf. | |
| """ | |
| logs.info("Getting predicted counts.") | |
| if not self.alpha: | |
| logs.warning("Have not yet fit -- need the alpha value.") | |
| logs.warning("Fitting now...") | |
| self.calc_fit() | |
| logs.info(self.word_counts_unique) | |
| logs.info(self.xmin) | |
| logs.info(self.xmax) | |
| # The subset of words that fit | |
| word_counts_fit_unique = self.word_counts_unique[ | |
| self.xmin + 1: self.xmax] | |
| pmf_mass = float(sum(word_counts_fit_unique)) | |
| zipf_counts = np.array( | |
| [self._estimate_count(rank, pmf_mass) for rank in | |
| self.word_ranks_unique] | |
| ) | |
| return zipf_counts | |
| def _estimate_count(self, rank, pmf_mass): | |
| return int(round(zipf_lib.pmf(rank, self.alpha) * pmf_mass)) | |
| def _set_xmax(self): | |
| """ | |
| xmax is usually None, so we add some handling to set it as the | |
| maximum rank in the dataset. | |
| :param xmax: | |
| :return: | |
| """ | |
| if self.xmax is not None: | |
| self.xmax = int(xmax) | |
| elif self.word_counts_unique: | |
| self.xmax = int(len(self.word_counts_unique)) | |
| elif self.word_ranks_unique: | |
| self.xmax = int(len(self.word_ranks_unique)) | |
| # TODO: This might fit better in its own file handling class? | |
| def get_zipf_fids(cache_path): | |
| zipf_cache_dir = pjoin(cache_path, "zipf") | |
| os.makedirs(zipf_cache_dir, exist_ok=True) | |
| # Zipf cache files | |
| zipf_fid = pjoin(zipf_cache_dir, "zipf_basic_stats.json") | |
| zipf_fig_fid = pjoin(zipf_cache_dir, "zipf_fig.json") | |
| zipf_fig_html_fid = pjoin(zipf_cache_dir, "zipf_fig.html") | |
| return zipf_fid, zipf_fig_fid, zipf_fig_html_fid | |
| def make_unique_rank_word_list(z): | |
| """ | |
| Function to help with the figure, creating strings for the hovertext. | |
| """ | |
| ranked_words = {} | |
| word_counts = z.word_counts_unique | |
| word_ranks = z.word_ranks_unique | |
| for count, rank in zip(word_counts, word_ranks): | |
| z.vocab_counts_df[z.vocab_counts_df[z.cnt_str] == count]["rank"] = rank | |
| ranked_words[rank] = ",".join( | |
| z.vocab_counts_df[ | |
| z.vocab_counts_df[z.cnt_str] == count].index.astype(str) | |
| ) # Use the hovertext kw argument for hover text | |
| ranked_words_list = [wrds for rank, wrds in | |
| sorted(ranked_words.items())] | |
| return ranked_words_list | |
| def make_zipf_fig(z): | |
| xmin = z.xmin | |
| word_ranks_unique = z.word_ranks_unique | |
| observed_counts = z.observed_counts | |
| zipf_counts = z.predicted_counts # "] #self.calc_zipf_counts() | |
| ranked_words_list = make_unique_rank_word_list(z) | |
| layout = go.Layout(xaxis=dict(range=[0, 100])) | |
| fig = go.Figure( | |
| data=[ | |
| go.Bar( | |
| x=word_ranks_unique, | |
| y=observed_counts, | |
| hovertext=ranked_words_list, | |
| name="Word Rank Frequency", | |
| ) | |
| ], | |
| layout=layout, | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=word_ranks_unique[xmin: len(word_ranks_unique)], | |
| y=zipf_counts[xmin: len(word_ranks_unique)], | |
| hovertext=ranked_words_list[xmin: len(word_ranks_unique)], | |
| line=go.scatter.Line(color="crimson", width=3), | |
| name="Zipf Predicted Frequency", | |
| ) | |
| ) | |
| # Customize aspect | |
| # fig.update_traces(marker_color='limegreen', | |
| # marker_line_width=1.5, opacity=0.6) | |
| fig.update_layout( | |
| title_text="Word Counts, Observed and Predicted by Zipf") | |
| fig.update_layout(xaxis_title="Word Rank") | |
| fig.update_layout(yaxis_title="Frequency") | |
| fig.update_layout( | |
| legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.10)) | |
| return fig | |