Source code for qumin.predictability.distribution

# -*- coding: utf-8 -*-
# !/usr/bin/env python
"""author: Sacha Beniamine.

Encloses distribution of patterns on paradigms.
"""

import logging
from collections import Counter, defaultdict
from functools import reduce
from itertools import combinations
from operator import mul
import pandas as pd

from . import cond_entropy, cond_entropy_slow, entropy, cond_psuccess
from ..representations.frequencies import Frequencies
from ..utils.metadata import Metadata

log = logging.getLogger("Qumin")


[docs] class PatternDistribution(object): """Statistical distribution of patterns. Attributes: patterns (~qumin.representations.patterns.ParadigmPatterns): A dict of :class:`pandas.DataFrame`, where each row describes an alternation between two cells forms belonging to different cells of the same lexeme. The row also contains the correct pattern and the set of applicable patterns. data (dict[int, pandas.DataFrame]): dict mapping n to a dataframe containing the entropies for the distribution :math:`P(c_{1}, ..., c_{n} → c_{n+1})`. name (str): Name of the dataset. """
[docs] def __init__(self, patterns, dataset, frequencies, features=None): """Constructor for PatternDistribution. Arguments: patterns (~qumin.representations.patterns.ParadigmPatterns): A dict of :class:`pandas.DataFrame`, where each row describes an alternation between forms belonging to two different cells of the same lexeme. The row also contains the correct pattern and the set of applicable patterns. dataset (frictionless.Package): Paralex dataset metadata. frequencies (~qumin.representations.frequencies.Frequencies): The frequencies for the paradigms. features: optional table of features """ self.name = dataset.name self.frequencies = Frequencies(dataset) self.patterns = patterns self.frequencies = frequencies if features is not None: # Add feature names features = features.apply(lambda x: x.name + "=" + x.apply(str), axis=0) # To tuples features = features.map(lambda x: (str(x),)) self.features_len = features.shape[1] self.features = pd.DataFrame.sum(features, axis=1) self.features.index.name = "lexeme" self.features.name = "features" else: self.features_len = 0 self.features = None self.data = pd.DataFrame(None, columns=["predictor", "predicted", "measure", "value", "n_pairs", "n_preds", "dataset" ])
[docs] def get_results(self, measure=None, n=1): """ Returns computation results from a distribution of patterns. Arguments: measure (List or str): measure name. Defaults to all. n (int): Number of predictors to include in the mean. Returns: pandas.DataFrame: a DataFrame of results. """ if isinstance(measure, str): measure = [measure] elif measure is None: measure = list(self.data.measure.unique()) is_measure = self.data.loc[:, "measure"].isin(measure) is_one_pred = self.data.loc[:, "n_preds"] == n return self.data.loc[is_measure & is_one_pred, :]
[docs] def check_zeros(self, n): """ Check whether: - We computed entropies for n-1 predictors - Some of these are 0s and don't need to be computed for n predictors. Arguments: n (int): number of predictors currently computed. Returns: dict: a dictionary of pairs that lead to an entropy of zero. """ log.info("Saving time by listing already known 0 entropies...") if n - 1 in self.data.loc[:, "n_preds"]: df = ( self.get_results(measure="cond_entropy", n=n - 1) .query('value==0') .groupby("predicted") ) if n - 1 == 1: df = df.agg({"predictor": lambda ps: set(frozenset({pred}) for pred in ps)}) else: df = df.agg({"predictor": lambda ps: set(frozenset(pred) for pred in ps)}) return df.predictor.to_dict() return None
[docs] def get_mean(self, **kwargs): """ Returns the average measures from the current run. If cell frequencies are available, they will be used. Arguments: **kwargs: Keyword arguments are passed to `get_results()` Returns: mean (pandas.Series) """ def get_from_weights(weights): return (weights.value * weights.pair_proba)\ .groupby(weights.measure).sum() results = self.get_results(**kwargs) weights_uniform = self.get_weights(results, tokens=False) means = get_from_weights(weights_uniform).rename('Uniform').to_frame() # If possible we also compute the mean with tokens weights_token = self.get_weights(results, tokens=True) if (weights_token.proba_source != "uniform").all(): means["Weighted with tokens"] = get_from_weights(weights_token) means.index.name = "Measure" return means
[docs] def get_weights(self, data, tokens=False): r""" Returns weights computed from cell frequencies for pairs of cells. *The probability of a pair of cells is the product of the probability of the predictors with the probability of the target. The target is chosen different from the predictors.* Let :math:`\{A_1, \dots A_n\}` be the random variables describing the drawing of :math:`n` predictors and :math:`B` the random variable describing the drawing of a target cell. One can write the following generic formula and rewrite it with Bayes' theorem: .. math:: \begin{align} P(\textrm{pred} \to \textrm{target}) &= P(\{A_1, \dots A_n\} = \textrm{pred}) P(B = \textrm{target}\mid B \notin \textrm{pred})\\ &= P(\{A_1, \dots A_n\} = \textrm{pred}) \frac{P(B = \textrm{target}\cap B \notin \textrm{pred})}{P(B \notin \textrm{pred})} \end{align} Since :math:`B = \textrm{target} \subset B \notin \textrm{pred}`, one can write: .. math:: \begin{align} P(\textrm{pred} \to \textrm{target}) &= P(\{A_1, \dots A_n\} = \textrm{pred}) \frac{P(B = \textrm{target})}{P(B \notin \textrm{pred})}\\ &= P(\{A_1, \dots A_n\} = \textrm{pred}) \frac{P(B = \textrm{target})}{1 - \sum_{a\in \textrm{pred}}P(a)} \end{align} We now need to estimate :math:`P(\textrm{pred})` and :math:`P(\textrm{target})`. Let :math:`f_i` be the frequency of cell :math:`i` and :math:`f` the cumulated frequency of all cells. In the simplest case with **one predictor**, the formula can be simplified to: .. math:: \begin{align} P(\textrm{pred} \to \textrm{target}) &= P(A = \textrm{pred}) \frac{P(B = \textrm{target})}{P(B \neq \textrm{pred})}\\ &= \frac{f_\textrm{pred} f_\textrm{target}}{f f_\overline{\textrm{pred}}} \end{align} In the more complex case with **n predictor**, we need to estimate :math:`P(\{A_1, \dots A_n\} = \textrm{pred})`. Let us consider: - :math:`S` the set of all cells, - :math:`C^n_S` the set of all unordered combinations of :math:`k` cells. For instance, if :math:`S=\{A, B, C\}` and :math:`n=2`, then: .. math:: C^n_S = \{\{A, B\}, \{A, C\}, \{B, C\}\} If we draw random combinations of :math:`n` cells, how often are we going to draw each item of :math:`C^k_S`? This value is: .. math:: \begin{align} P(\{A_1, \dots A_n\} = \textrm{pred}) &= \frac{n!\prod_{a\in \textrm{pred}}P(a)} {\sum_{c\in C_S^n} n! \prod_{a\in c}P(a)} \\ &= \frac{\prod_{a\in \textrm{pred}} f_a}{\sum_{c\in C_S^n} \prod_{a\in c}f_a} \end{align} Finally: .. math:: \begin{align} P(\textrm{pred} \to \textrm{target}) &= P(\{A_1, \dots A_n\} = \textrm{pred}) \frac{P(B = \textrm{target})}{1 - \sum_{a\in \textrm{pred}}P(a)}\\ &= \frac{f_\textrm{target}}{f}\Big(\frac{\prod_{a\in \textrm{pred}} f_a} {\sum_{c\in C_S^n} \prod_{a\in c}f_a \times (1-\sum_{a\in \textrm{pred}}\frac{f_a}{f})}\Big) \end{align} Notice that the second part of the final formula does not depend on :math:`B` and can be computed for the predictors beforehand. We then compute the product of this with the relative frequency of the target cell. Arguments: data (pandas.DataFrame): the full computation results. tokens (boolean): Whether the cell token frequencies should be used for weighting. Defaults to False. Returns: two arrays containing the probability of the pairs and the probability of the predictors (:class:`numpy:numpy.ndarray`) """ def compute_weight(x, freq): """ For each group of predictors, compute the constant value. Make then the product with the target frequency. """ preds = x.name.split('&') is_pred = freq.index.isin(preds) pred_freq = x.predicted.map(freq.loc[x.predicted.drop_duplicates()].result.to_dict()) # Probabilities of the combinations of predictors C_sum = sum([reduce(mul, p) for p in combinations(freq.result, len(preds))]) # Probabity of our combination of predictors pred_product = reduce(mul, freq.loc[is_pred, 'result']) # Constant part for predictor constant = pred_product / (C_sum * (1 - freq.loc[is_pred, 'result'].sum())) # Vectorized computation for the target x['pair_proba'] = (pred_freq * constant).values x['pred_proba'] = pred_product / C_sum x['target_proba'] = pred_freq x['proba_source'] = "tokens" return x data = data.copy() used_cells = self.patterns.cells if tokens: if self.frequencies.source['cells'] != "empty": cell_freq = self.frequencies.get_relative_freq(data="cells", filters={"cell": used_cells}) return data.groupby('predictor').apply( compute_weight, cell_freq, include_groups=False).reset_index(level="predictor") else: log.warning("Couldn't find cell frequencies. " "Falling back on uniform weighting.") measure_grp = data.groupby(['measure', "n_preds"]) data.loc[:, 'pair_proba'] = 1 / measure_grp.transform('size') data.loc[:, 'pred_proba'] = 1 / measure_grp.predictor.transform('nunique') data.loc[:, 'target_proba'] = 1 / measure_grp.predicted.transform('nunique') data.loc[:, 'proba_source'] = "uniform" return data
[docs] def export_file(self, filename): """ Export the data DataFrame to file Arguments: filename: the file's path. """ def join_if_multiple(preds): if type(preds) is tuple: return "&".join(preds) return preds data = self.data.copy() # Get weights from token frequencies if possible data = self.get_weights(data, tokens=True) # Format multiple predictors data.loc[:, "predictor"] = data.loc[:, "predictor"].apply(join_if_multiple) # Rounding at 10 significant digits, ensuring positive zeros. data.loc[:, "value"] = data.loc[:, "value"].astype(float).map(lambda x: round(x, 10)) + 0 data.to_csv(filename, index=False)
[docs] def import_file(self, filename): """Read already computed entropies from a file. Arguments: filename (str): the file's path. """ def split_if_multiple(preds): if "&" in preds: return tuple(preds.split("&")) return preds entropy_md = Metadata(path=filename) data = pd.read_csv(entropy_md.get_resource_path('entropies')) data.loc[:, "predictor"] = data.loc[:, "predictor"].apply(split_if_multiple) self.add_measures(data)
[docs] def add_features(self, group): """ Adds lexeme features if available to a DataFrame containing a column named "applicable" and lexemes as indexes. Arguments: group (pandas.DataFrame): a dataframe of lexemes and applicable patterns. """ if self.features is not None: ret = group.applicable + group.merge(self.features, on="lexeme", how="inner", sort=False).features return ret else: return group.applicable
[docs] def add_measures(self, *args, **kwargs): """ Adds data to the existing measures. Arguments: args (:class:`pandas:pandas.DataFrame`): DataFrames to add. kwargs: optional keyword arguments to pass to `pandas.concat()`. """ self.data = pd.concat([self.data, *args], **kwargs)
[docs] def prepare_data(self, n=1, debug=False, legacy=False): """ Prepares the dataframe to store the results for an entropy computation. Attributes: n (int): number of predictors to consider debug (bool): Whether the computation is a standard one or a debug one. Returns: pandas.DataFrame: a dataframe with the predictors and the predicted cells, as well as some metadata. """ rows = self.patterns.cells idx = ["&".join(x) for x in combinations(rows, n)] data = pd.DataFrame(index=idx, columns=rows).reset_index(drop=False, names="predictor").melt(id_vars="predictor", var_name="predicted", value_name="value") suffix = "_debug" if debug else "" # drop A -> A cases data = data[data.apply(lambda x: x.predicted not in x.predictor.split('&'), axis=1)].copy() data.loc[:, "n_pairs"] = None data.loc[:, "n_preds"] = n if n == 1: measures = ["cond_entropy" + suffix] if not legacy: measures += ['cond_p_success' + suffix] data.loc[:, "measure"] = [measures] * data.shape[0] else: data.loc[:, "measure"] = "cond_entropy" + suffix data.loc[:, "dataset"] = self.name data.set_index(['predictor', 'predicted'], inplace=True) return data
[docs] def one_pred_metrics(self, legacy=False, debug=False, **kwargs): r"""Return a :class:`pandas:pandas.DataFrame` with unary entropies and counts of lexemes. The result contains entropy :math:`H(c_{1} \to c_{2})`. Values are computed for all unordered pairs of columns :math:`(c_{1}, c_{2})` where :math:`c_{1} != c_{2}` in the :attr:`PatternDistribution.patterns`'s keys. Example: For two cells c1, c2, entropy of c1 → c2, noted :math:`H(c_{1} \to c_{2})` is: .. math:: H( \textrm{patterns}_{c1, c2} | \textrm{classes}_{c1, c2} ) The probability distribution of the patterns, on which this entropy is computed, is established on the probability distribution of the pairs of forms that instanciate the pattern. For the mathematical formalism, refer to the appendix of Bouton & Bonami 2026. Arguments: debug (bool): Whether to print a debug log. Defaults to False legacy (bool): Whether to use legacy computations. This necessarily disables token frequencies. kwargs (dict): settings to retrieve frequencies. """ log.info("Computing c1 → c2 entropies") log.debug("Logging one predictor probabilities") log.debug(" P(x → y) = P(x~y | Class(x))") # For faster access patterns = self.patterns data = self.prepare_data(debug=debug, legacy=legacy) if not legacy: patterns.add_frequencies(self.frequencies, **kwargs) # Compute conditional entropy for pair, df in patterns.items(): # Defective rows can't be kept here. selector = df.pattern.notna() df = df[selector].copy() if legacy: df['f_pred'] = 1 df['f_pair'] = 1 # We compute the number of pairs affected by this calculation. data.loc[pair, "n_pairs"] = sum(selector) # We aggregate features and applicable patterns. # Lexemes that share these properties belong to similar classes. classes = self.add_features(df) if legacy and not debug: data.loc[pair, "value"] = cond_entropy(df.pattern.apply(lambda x: (x,)), classes, subset=selector) elif not debug: data.at[pair, "value"] = [cond_entropy_slow(df, classes), cond_psuccess(df, classes)] else: data.at[pair, "value"] = self.cond_metrics_log(df, classes, pair, legacy=legacy, subset=selector) data = data.explode(['value', "measure"]) self.add_measures(data.reset_index())
[docs] def cond_metrics_log(self, group, classes, cells, subset=None, legacy=False): """Print a log of the probability distribution for one predictor. Writes down the distributions :math:`P( patterns_{c1, c2} | classes_{c1, c2} )` for all unordered pairs of columns in :attr:`.patterns`. Also writes the entropy of the distributions. """ def subclass_summary(subgroup, total): """ Produces a nice summary of the available patterns for a subclass""" ex = subgroup.iloc[0, :] freq = subgroup.f_pair.sum() / total if total is not None else subgroup.shape[0] freq = 0 if pd.isna(freq) else freq return pd.Series([ f"{ex.lexeme}: {ex.form_x}{ex.form_y}", freq, ], index=["example", 'subclass_size']) def success_table(subgroup, patterns): """ Create a table which tells for each set of words behaving in a similar way, which patterns could apply and what would be their probability of success. """ ex = subgroup.iloc[0, :] forms_y = (str(y) for y in subgroup[subgroup.form_x == ex.form_x].form_y.values) series = {'example': f"{ex.lexeme}: {ex.form_x}{', '.join(forms_y)}"} pats = patterns.copy() pats.proba = pats.proba.astype(str) pats.loc[~pats.index.isin(ex.pattern_pred), "proba"] = "" series.update(pats.set_index('id').proba.to_dict()) series['f_pred'] = subgroup.f_pred.sum() series['psuccess'] = subgroup[(subgroup.form_x == ex.form_x) & (subgroup.lexeme == ex.lexeme)].psuccess.sum() return pd.Series(series) log.debug("\n# Distribution of {}{} \n".format(cells[0], cells[1])) A = group[subset] B = classes[subset] cond_events = A.groupby(B, sort=False) log.debug("Showing distributions for " + str(len(cond_events)) + " classes") summary = [] for i, (classe, members) in enumerate(sorted(cond_events, key=lambda x: len(x[1]), reverse=True)): # Group by patterns and build a summary p_table = members.groupby('pattern').apply(subclass_summary, members.f_pair.sum()) # Log features if self.features is not None: feature_log = ( "Features: " + ", ".join(str(x) for x in classe[-self.features_len:])) classe = classe[:-self.features_len] # List possible patterns that are not used in this class. for pattern in classe: if pattern not in p_table.index: p_table.loc[str(pattern), :] = ["-", 0] # Create an ID for the patterns p_table['id'] = range(p_table.shape[0]) p_table.id = "p_" + p_table.id.astype(str) # Group by patterns for the predictor only (i.e. allow for overabundance) members['pattern_pred'] = members.groupby(['lexeme', 'form_x'], observed=False)\ .pattern.transform(lambda x: [tuple(x)]*x.shape[0]) # Compute the pattern probabilities p_table['proba'] = p_table.subclass_size / p_table.subclass_size.sum() p_table.fillna(0, inplace=True) members['psuccess'] = members.pattern.map(p_table.proba) # Get nice table with examples. table = members.groupby('pattern_pred', group_keys=False)\ .apply(success_table, patterns=p_table)\ .reset_index(drop=True) # Compute metrics ent = 0 + entropy(p_table.proba) psuccess = table.f_pred @ table.psuccess / table.f_pred.sum()\ if table.f_pred.sum() > 0 else 0 summary.append([members.f_pred.sum(), ent, psuccess]) # Log the subclass properties headers = ("Pattern", "Example", "Frequency", "P(Pattern|class)") p_table = p_table.reset_index().set_index("id") p_table.columns = headers table.rename(columns={"example": "Example", "f_pred": "Frequency", "psuccess": "P(success|class)"}, inplace=True) table.set_index('Example', inplace=True) stats = f"\n## Class n°{i} ({len(members)} members), H={ent:.3f}" if not legacy: stats += f", P(success)={psuccess:.3f}" log.debug(stats) if self.features is not None: log.debug(feature_log) if legacy: p_table = p_table.iloc[:, :-1] log.debug("\nPatterns found\n\n" + p_table.to_markdown()) if not legacy: log.debug("\nDistribution of the forms\n\n" + table.to_markdown()) log.debug('\n## Class summary') summary = pd.DataFrame(summary, columns=['Frequency', 'H(pattern|class)', 'P(success|class)']) summary.index.name = "Class" if legacy: summary = summary.iloc[:, :-1] sums = summary.iloc[:, 0].T @ summary.iloc[:, 1::] / summary.iloc[:, 0].sum() log.debug(f'\nAv. conditional entropy: H(pattern|class)={sums.iloc[0]}') if not legacy: log.debug(f'\nAv. success probability: P(success|class)={sums.iloc[1]}') log.debug("\n" + summary.to_markdown()) return sums.values
[docs] def n_preds_entropy(self, n, paradigms, debug=False): r""" Wrapper to prepare the computation of n-ary entropies. Loops through the cells and runs the computations for every set of predictors. Arguments: n (int): number of predictors. paradigms (pandas.DataFrame): a DataFrame of paradigms debug (bool): Whether to run a debug computation with full log. Defaults to False. """ if n == 1 and not debug: return self.one_pred_entropy() elif n == 1 and debug: return self.one_pred_entropy_log() if n >= len(paradigms.cells): raise ValueError(f"There are {len(paradigms.cells)} cells and " f"you requested {n} predictors, which is equal or higher. " "Please set a lower value for entropies.n.") log.info("Computing (c1, ..., c{!s}) → c{!s} entropies".format(n, n + 1)) log.debug(f"Logging n preds probabilities, with n = {n}") log.debug(" P(x, y → z) = P(x~z, y~z | Class(x), Class(y), x~y)") data = self.prepare_data(n=n, debug=debug).reset_index(drop=False) # Get the measures if not debug: zeros = self.check_zeros(n) data = data.groupby('predictor').apply( self.n_preds_condent, paradigms.data, zeros, n, ) else: data = data.groupby('predictor').apply( self.n_preds_condent_log, paradigms.data, n, ) # Add to previous results self.add_measures(data.reset_index(drop=True))
[docs] def n_preds_condent(self, df, paradigms, zeros, n): r""" Computes the probability distribution for n predictors. Writes down the distributions: .. math:: P( patterns_{c1, c3}, \; \; patterns_{c2, c3} \; \; | classes_{c1, c3}, \; \; \; \; classes_{c2, c3}, \; \; patterns_{c1, c2} ) The result contains entropy :math:`H(c_{1}, ..., c_{n} \\to c_{n+1} )`. Values are computed for all unordered combinations of :math:`(c_{1}, ..., c_{n+1})` in the :attr:`paradigms`'s columns. Indexes are tuples :math:`(c_{1}, ..., c_{n})` and columns are the predicted cells :math:`c_{n+1}`. Example: For three cells c1, c2, c3, (n=2) entropy of c1, c2 → c3, noted :math:`H(c_{1}, c_{2} \to c_{3})` is: .. math:: H( patterns_{c1, c3}, \; \; patterns_{c2, c3}\; \; | classes_{c1, c3}, \; \; \; \; classes_{c2, c3}, \; \; patterns_{c1, c2} ) Arguments: n (int): number of predictors. df (pandas.DataFrame): a DataFrame containing patterns and applicable patterns for pairs of forms. paradigms (pandas.DataFrame): a DataFrame of paradigms. zeros (dict): a dictionary of pairs that lead to an entropy of zero. n (int): number of predictors """ def already_zero(predictors, out, zeros): if zeros is None or not zeros: return False for preds_subset in combinations(predictors, n - 1): if frozenset(preds_subset) in zeros.get(out, {}): return True return False # For faster access patterns = self.patterns predictors = df.name.split('&') pairs_of_predictors = list(combinations(predictors, 2)) set_predictors = set(predictors) known_patterns = pd.concat([patterns[k] .set_index('lexeme') .pattern for k in pairs_of_predictors], axis=1) predlexemes = known_patterns.notna().all(axis=1) known_patterns = known_patterns.map(lambda x: (x,) if not isinstance(x, tuple) else x)\ .sum(axis=1) def row_condent(x): """ Computes the conditional entropy for a given set of predictors and a target. Arguments: x (pandas.Series): a Series containing information for the computation. """ out = x.predicted outlexemes = paradigms[(paradigms.cell == out) & ~(paradigms.form.apply(lambda x: x.is_defective()))] selector = predlexemes & predlexemes.index.isin(outlexemes.lexeme) x.n_pairs = sum(selector) if already_zero(set_predictors, out, zeros): x.value = 0 else: # Under the pattern column, getting intersection of patterns events for each # predictor: x~z, y~z # Under the applicable column, getting # - Known classes Class(x), Class(y) # - known patterns x~y # - plus all features pattern_pairs = [patterns[(pred, out)] .set_index('lexeme') [selector][['pattern', 'applicable']] .map(lambda x: (x,) if not isinstance(x, tuple) else x) for pred in predictors] pattern_pairs = reduce(lambda x, y: x+y, pattern_pairs) pattern_pairs.applicable += known_patterns[selector] classes = self.add_features(pattern_pairs) # Prediction of H(A|B) x.value = cond_entropy(pattern_pairs.pattern, classes, subset=selector) return x return df.apply(row_condent, axis=1)
[docs] def n_preds_condent_log(self, df, paradigms, n): r""" Computes the probability distribution for n predictors and logs the details of the computations. Writes down the distributions: .. math:: P( patterns_{c1, c3}, \; \; patterns_{c2, c3} \; \; | classes_{c1, c3}, \; \; \; \; classes_{c2, c3}, \; \; patterns_{c1, c2} ) The result contains entropy :math:`H(c_{1}, ..., c_{n} \to c_{n+1} )`. Values are computed for all unordered combinations of :math:`(c_{1}, ..., c_{n+1})` in the :attr:`paradigms`'s columns. Indexes are tuples :math:`(c_{1}, ..., c_{n})` and columns are the predicted cells :math:`c_{n+1}`. Example: For three cells c1, c2, c3, (n=2) entropy of c1, c2 → c3, noted :math:`H(c_{1}, c_{2} \to c_{3})` is: .. math:: H( patterns_{c1, c3}, \; \; patterns_{c2, c3}\; \; | classes_{c1, c3}, \; \; \; \; classes_{c2, c3}, \; \; patterns_{c1, c2} ) Arguments: n (int): number of predictors. df (pandas.DataFrame): a DataFrame containing patterns and applicable patterns for pairs of forms. paradigms (pandas.DataFrame): a DataFrame of paradigms. n (int): number of predictors """ def count_with_examples(row, counter, examples, paradigms, pred, out): lemma, pattern = row predictors = "; ".join(paradigms.loc[(paradigms.lexeme == lemma) & (paradigms.cell == c)] .form.values[0] for c in pred) predicted = paradigms.loc[(paradigms.lexeme == lemma) & (paradigms.cell == out)].form.values[0] example = f"{lemma}: ({predictors}) → {predicted}" counter[pattern] += 1 examples[pattern] = example def format_patterns(series, string): patterns = ("; ".join(str(pattern) for pattern in pair) for pair in series) return string.format(*patterns) pred_numbers = list(range(1, n + 1)) patterns_string = "\n".join(f"{pred}~{n + 1}" + "= {}" for pred in pred_numbers) applicable_string = "\n * " + "\n * ".join(f"Class({pred}, {n + 1})" + "= {}" for pred in pred_numbers) known_pat_string = "\n * " "\n * ".join("{!s}~{!s}".format(*preds) + "= {}" for preds in combinations(pred_numbers, 2)) def format_features(features): return "\n* Features:\n * " + "\n * ".join(str(x) for x in features) def formatting_local_patterns(x): return format_patterns(x, patterns_string) def formatting_applicable_patterns(x): return format_patterns(x, applicable_string) def formatting_known_patterns(x): return format_patterns(x, known_pat_string) # For faster access patterns = self.patterns predictors = df.name.split('&') pairs_of_predictors = list(combinations(predictors, 2)) # Patterns of alternations between the predictors. known_patterns = pd.concat([patterns[k] .set_index('lexeme') .pattern .rename('&'.join(k)) for k in pairs_of_predictors], axis=1) predlexemes = known_patterns.notna().all(axis=1) known_patterns = known_patterns.map(lambda x: (x,) if not isinstance(x, tuple) else x) def row_condent(x, known_patterns): """ Arguments: x: a Seris with results and information about the cells. known_patterns: the patterns of alternation between the predictors, which are considered known. """ patterns = self.patterns out = x.predicted outlexemes = paradigms[(paradigms.cell == out) & ~(paradigms.form.apply(lambda x: x.is_defective()))] selector = predlexemes & predlexemes.index.isin(outlexemes.lexeme) x.n_pairs = sum(selector) log.debug(f"\n# Distribution of ({', '.join(predictors)}) → {out} \n") applicable_patterns = [ patterns[(pred, out)] .set_index('lexeme')[selector] .applicable .rename(f"{pred}&{out}") .map(lambda x: (x,) if not isinstance(x, tuple) else x) for pred in predictors ] applicable_patterns = pd.concat(applicable_patterns, axis=1) gold_patterns = [patterns[(pred, out)] .set_index('lexeme')[selector] .pattern .rename(f"{pred}&{out}") .map(lambda x: (x,) if not isinstance(x, tuple) else x) for pred in predictors] gold_patterns = pd.concat(gold_patterns, axis=1) # Getting intersection of patterns events for each predictor: # x~z, y~z A = gold_patterns.apply(formatting_local_patterns, axis=1) # Known classes Class(x), Class(y) and known patterns x~y applicable_patterns = applicable_patterns.apply(formatting_applicable_patterns, axis=1) known_patterns = known_patterns.apply(formatting_known_patterns, axis=1) B = applicable_patterns + known_patterns if self.features is not None: known_features = self.features[selector].apply(format_features) B = B + known_features cond_events = A.groupby(B, sort=False) log.debug("Showing distributions for " + str(len(cond_events)) + " classes") summary = [] for i, (classe, members) in enumerate(sorted(cond_events, key=lambda x: len(x[1]), reverse=True)): log.debug("\n## Class n°%s (%s members).", i, len(members)) counter = Counter() examples = defaultdict() members.reset_index().apply(count_with_examples, args=(counter, examples, paradigms, predictors, out), axis=1) total = sum(list(counter.values())) log.debug("* Total: %s", total) table = [] for my_pattern in counter: row = (my_pattern, examples[my_pattern], counter[my_pattern], counter[my_pattern] / total) table.append(row) headers = ("Patterns", "Example", "Size", "P(Pattern|class)") table = pd.DataFrame(table, columns=headers) # Get the slow computation results summary.append([table.Size.sum(), 0 + entropy(table.iloc[:, -1])]) log.debug("\n" + table.to_markdown()) log.debug('\n## Class summary') summary = pd.DataFrame(summary, columns=['Size', 'H(pattern|class)']) summary.index.name = "Class" x.value = (summary.iloc[:, -2] * summary.iloc[:, -1] / summary.iloc[:, -2].sum()).sum() log.debug(f'\nAv. conditional entropy: H(pattern|class)={x.value}') log.debug("\n" + summary.to_markdown()) return x return df.apply(row_condent, args=[known_patterns], axis=1)