# -*- coding: utf-8 -*-
# !/usr/bin/env python
"""author: Sacha Beniamine.
Encloses distribution of patterns on paradigms.
"""
import logging
from collections import Counter, defaultdict
from functools import reduce
from itertools import combinations
from operator import mul
import pandas as pd
from . import cond_entropy, cond_entropy_slow, entropy, cond_psuccess
from ..representations.frequencies import Frequencies
from ..utils.metadata import Metadata
log = logging.getLogger("Qumin")
[docs]
class PatternDistribution(object):
"""Statistical distribution of patterns.
Attributes:
patterns (~qumin.representations.patterns.ParadigmPatterns):
A dict of :class:`pandas.DataFrame`, where each row describes an alternation between
two cells forms belonging to different cells of the same lexeme.
The row also contains the correct pattern and the set of applicable patterns.
data (dict[int, pandas.DataFrame]):
dict mapping n to a dataframe containing the entropies
for the distribution :math:`P(c_{1}, ..., c_{n} → c_{n+1})`.
name (str):
Name of the dataset.
"""
[docs]
def __init__(self, patterns, dataset, frequencies, features=None):
"""Constructor for PatternDistribution.
Arguments:
patterns (~qumin.representations.patterns.ParadigmPatterns):
A dict of :class:`pandas.DataFrame`,
where each row describes an alternation between
forms belonging to two different cells of the same lexeme.
The row also contains the correct pattern and the set of applicable patterns.
dataset (frictionless.Package): Paralex dataset metadata.
frequencies (~qumin.representations.frequencies.Frequencies):
The frequencies for the paradigms.
features:
optional table of features
"""
self.name = dataset.name
self.frequencies = Frequencies(dataset)
self.patterns = patterns
self.frequencies = frequencies
if features is not None:
# Add feature names
features = features.apply(lambda x: x.name + "=" + x.apply(str), axis=0)
# To tuples
features = features.map(lambda x: (str(x),))
self.features_len = features.shape[1]
self.features = pd.DataFrame.sum(features, axis=1)
self.features.index.name = "lexeme"
self.features.name = "features"
else:
self.features_len = 0
self.features = None
self.data = pd.DataFrame(None,
columns=["predictor",
"predicted",
"measure",
"value",
"n_pairs",
"n_preds",
"dataset"
])
[docs]
def get_results(self, measure=None, n=1):
"""
Returns computation results from a distribution of patterns.
Arguments:
measure (List or str): measure name. Defaults to all.
n (int): Number of predictors to include in the mean.
Returns:
pandas.DataFrame: a DataFrame of results.
"""
if isinstance(measure, str):
measure = [measure]
elif measure is None:
measure = list(self.data.measure.unique())
is_measure = self.data.loc[:, "measure"].isin(measure)
is_one_pred = self.data.loc[:, "n_preds"] == n
return self.data.loc[is_measure & is_one_pred, :]
[docs]
def check_zeros(self, n):
"""
Check whether:
- We computed entropies for n-1 predictors
- Some of these are 0s and don't need to be computed for n predictors.
Arguments:
n (int): number of predictors currently computed.
Returns:
dict: a dictionary of pairs that lead to an entropy of zero.
"""
log.info("Saving time by listing already known 0 entropies...")
if n - 1 in self.data.loc[:, "n_preds"]:
df = (
self.get_results(measure="cond_entropy", n=n - 1)
.query('value==0')
.groupby("predicted")
)
if n - 1 == 1:
df = df.agg({"predictor": lambda ps: set(frozenset({pred}) for pred in ps)})
else:
df = df.agg({"predictor": lambda ps: set(frozenset(pred) for pred in ps)})
return df.predictor.to_dict()
return None
[docs]
def get_mean(self, **kwargs):
"""
Returns the average measures from the current run.
If cell frequencies are available, they will be used.
Arguments:
**kwargs: Keyword arguments are passed to `get_results()`
Returns: mean (pandas.Series)
"""
def get_from_weights(weights):
return (weights.value * weights.pair_proba)\
.groupby(weights.measure).sum()
results = self.get_results(**kwargs)
weights_uniform = self.get_weights(results, tokens=False)
means = get_from_weights(weights_uniform).rename('Uniform').to_frame()
# If possible we also compute the mean with tokens
weights_token = self.get_weights(results, tokens=True)
if (weights_token.proba_source != "uniform").all():
means["Weighted with tokens"] = get_from_weights(weights_token)
means.index.name = "Measure"
return means
[docs]
def get_weights(self, data, tokens=False):
r""" Returns weights computed from cell frequencies for pairs of cells.
*The probability of a pair of cells is the product of the probability of the predictors
with the probability of the target. The target is chosen different from the predictors.*
Let :math:`\{A_1, \dots A_n\}` be the random variables describing
the drawing of :math:`n` predictors
and :math:`B` the random variable describing the drawing of a target cell.
One can write the following generic formula and rewrite it with Bayes' theorem:
.. math::
\begin{align}
P(\textrm{pred} \to \textrm{target}) &= P(\{A_1, \dots A_n\} = \textrm{pred})
P(B = \textrm{target}\mid B \notin \textrm{pred})\\
&= P(\{A_1, \dots A_n\} = \textrm{pred})
\frac{P(B = \textrm{target}\cap B \notin \textrm{pred})}{P(B \notin \textrm{pred})}
\end{align}
Since :math:`B = \textrm{target} \subset B \notin \textrm{pred}`, one can write:
.. math::
\begin{align}
P(\textrm{pred} \to \textrm{target}) &= P(\{A_1, \dots A_n\} = \textrm{pred})
\frac{P(B = \textrm{target})}{P(B \notin \textrm{pred})}\\
&= P(\{A_1, \dots A_n\} = \textrm{pred})
\frac{P(B = \textrm{target})}{1 - \sum_{a\in \textrm{pred}}P(a)}
\end{align}
We now need to estimate :math:`P(\textrm{pred})` and :math:`P(\textrm{target})`.
Let :math:`f_i` be the frequency of cell :math:`i` and
:math:`f` the cumulated frequency of all cells.
In the simplest case with **one predictor**, the formula can be simplified to:
.. math::
\begin{align}
P(\textrm{pred} \to \textrm{target}) &= P(A = \textrm{pred})
\frac{P(B = \textrm{target})}{P(B \neq \textrm{pred})}\\
&= \frac{f_\textrm{pred} f_\textrm{target}}{f f_\overline{\textrm{pred}}}
\end{align}
In the more complex case with **n predictor**, we need to estimate
:math:`P(\{A_1, \dots A_n\} = \textrm{pred})`.
Let us consider:
- :math:`S` the set of all cells,
- :math:`C^n_S` the set of all unordered combinations of :math:`k` cells.
For instance, if :math:`S=\{A, B, C\}` and :math:`n=2`, then:
.. math::
C^n_S = \{\{A, B\}, \{A, C\}, \{B, C\}\}
If we draw random combinations of :math:`n` cells,
how often are we going to draw each item of :math:`C^k_S`?
This value is:
.. math::
\begin{align}
P(\{A_1, \dots A_n\} = \textrm{pred}) &= \frac{n!\prod_{a\in \textrm{pred}}P(a)}
{\sum_{c\in C_S^n} n! \prod_{a\in c}P(a)} \\
&= \frac{\prod_{a\in \textrm{pred}} f_a}{\sum_{c\in C_S^n} \prod_{a\in c}f_a}
\end{align}
Finally:
.. math::
\begin{align}
P(\textrm{pred} \to \textrm{target}) &= P(\{A_1, \dots A_n\} = \textrm{pred})
\frac{P(B = \textrm{target})}{1 - \sum_{a\in \textrm{pred}}P(a)}\\
&= \frac{f_\textrm{target}}{f}\Big(\frac{\prod_{a\in \textrm{pred}} f_a}
{\sum_{c\in C_S^n} \prod_{a\in c}f_a
\times (1-\sum_{a\in \textrm{pred}}\frac{f_a}{f})}\Big)
\end{align}
Notice that the second part of the final formula does not depend on :math:`B` and can
be computed for the predictors beforehand. We then compute the product of this with
the relative frequency of the target cell.
Arguments:
data (pandas.DataFrame): the full computation results.
tokens (boolean): Whether the cell token frequencies should be used for weighting.
Defaults to False.
Returns:
two arrays containing the probability of the pairs and the
probability of the predictors (:class:`numpy:numpy.ndarray`)
"""
def compute_weight(x, freq):
"""
For each group of predictors, compute the constant value.
Make then the product with the target frequency.
"""
preds = x.name.split('&')
is_pred = freq.index.isin(preds)
pred_freq = x.predicted.map(freq.loc[x.predicted.drop_duplicates()].result.to_dict())
# Probabilities of the combinations of predictors
C_sum = sum([reduce(mul, p) for p in combinations(freq.result, len(preds))])
# Probabity of our combination of predictors
pred_product = reduce(mul, freq.loc[is_pred, 'result'])
# Constant part for predictor
constant = pred_product / (C_sum * (1 - freq.loc[is_pred, 'result'].sum()))
# Vectorized computation for the target
x['pair_proba'] = (pred_freq * constant).values
x['pred_proba'] = pred_product / C_sum
x['target_proba'] = pred_freq
x['proba_source'] = "tokens"
return x
data = data.copy()
used_cells = self.patterns.cells
if tokens:
if self.frequencies.source['cells'] != "empty":
cell_freq = self.frequencies.get_relative_freq(data="cells",
filters={"cell": used_cells})
return data.groupby('predictor').apply(
compute_weight, cell_freq, include_groups=False).reset_index(level="predictor")
else:
log.warning("Couldn't find cell frequencies. "
"Falling back on uniform weighting.")
measure_grp = data.groupby(['measure', "n_preds"])
data.loc[:, 'pair_proba'] = 1 / measure_grp.transform('size')
data.loc[:, 'pred_proba'] = 1 / measure_grp.predictor.transform('nunique')
data.loc[:, 'target_proba'] = 1 / measure_grp.predicted.transform('nunique')
data.loc[:, 'proba_source'] = "uniform"
return data
[docs]
def export_file(self, filename):
""" Export the data DataFrame to file
Arguments:
filename: the file's path.
"""
def join_if_multiple(preds):
if type(preds) is tuple:
return "&".join(preds)
return preds
data = self.data.copy()
# Get weights from token frequencies if possible
data = self.get_weights(data, tokens=True)
# Format multiple predictors
data.loc[:, "predictor"] = data.loc[:, "predictor"].apply(join_if_multiple)
# Rounding at 10 significant digits, ensuring positive zeros.
data.loc[:, "value"] = data.loc[:, "value"].astype(float).map(lambda x: round(x, 10)) + 0
data.to_csv(filename, index=False)
[docs]
def import_file(self, filename):
"""Read already computed entropies from a file.
Arguments:
filename (str): the file's path.
"""
def split_if_multiple(preds):
if "&" in preds:
return tuple(preds.split("&"))
return preds
entropy_md = Metadata(path=filename)
data = pd.read_csv(entropy_md.get_resource_path('entropies'))
data.loc[:, "predictor"] = data.loc[:, "predictor"].apply(split_if_multiple)
self.add_measures(data)
[docs]
def add_features(self, group):
"""
Adds lexeme features if available to a DataFrame containing a column named "applicable"
and lexemes as indexes.
Arguments:
group (pandas.DataFrame): a dataframe of lexemes and applicable patterns.
"""
if self.features is not None:
ret = group.applicable + group.merge(self.features, on="lexeme", how="inner", sort=False).features
return ret
else:
return group.applicable
[docs]
def add_measures(self, *args, **kwargs):
""" Adds data to the existing measures.
Arguments:
args (:class:`pandas:pandas.DataFrame`): DataFrames to add.
kwargs: optional keyword arguments to pass to `pandas.concat()`.
"""
self.data = pd.concat([self.data, *args], **kwargs)
[docs]
def prepare_data(self, n=1, debug=False, legacy=False):
"""
Prepares the dataframe to store the results for an entropy computation.
Attributes:
n (int): number of predictors to consider
debug (bool): Whether the computation is a standard one or a debug one.
Returns:
pandas.DataFrame: a dataframe with the predictors and the predicted cells,
as well as some metadata.
"""
rows = self.patterns.cells
idx = ["&".join(x) for x in combinations(rows, n)]
data = pd.DataFrame(index=idx,
columns=rows).reset_index(drop=False,
names="predictor").melt(id_vars="predictor",
var_name="predicted",
value_name="value")
suffix = "_debug" if debug else ""
# drop A -> A cases
data = data[data.apply(lambda x: x.predicted not in x.predictor.split('&'), axis=1)].copy()
data.loc[:, "n_pairs"] = None
data.loc[:, "n_preds"] = n
if n == 1:
measures = ["cond_entropy" + suffix]
if not legacy:
measures += ['cond_p_success' + suffix]
data.loc[:, "measure"] = [measures] * data.shape[0]
else:
data.loc[:, "measure"] = "cond_entropy" + suffix
data.loc[:, "dataset"] = self.name
data.set_index(['predictor', 'predicted'], inplace=True)
return data
[docs]
def one_pred_metrics(self, legacy=False, debug=False, **kwargs):
r"""Return a :class:`pandas:pandas.DataFrame` with unary entropies and counts of lexemes.
The result contains entropy :math:`H(c_{1} \to c_{2})`.
Values are computed for all unordered pairs
of columns :math:`(c_{1}, c_{2})` where :math:`c_{1} != c_{2}`
in the :attr:`PatternDistribution.patterns`'s keys.
Example:
For two cells c1, c2, entropy of c1 → c2,
noted :math:`H(c_{1} \to c_{2})` is:
.. math::
H( \textrm{patterns}_{c1, c2} | \textrm{classes}_{c1, c2} )
The probability distribution of the patterns, on which this entropy
is computed, is established on the probability distribution of the
pairs of forms that instanciate the pattern. For the mathematical
formalism, refer to the appendix of Bouton & Bonami 2026.
Arguments:
debug (bool): Whether to print a debug log. Defaults to False
legacy (bool): Whether to use legacy computations.
This necessarily disables token frequencies.
kwargs (dict): settings to retrieve frequencies.
"""
log.info("Computing c1 → c2 entropies")
log.debug("Logging one predictor probabilities")
log.debug(" P(x → y) = P(x~y | Class(x))")
# For faster access
patterns = self.patterns
data = self.prepare_data(debug=debug, legacy=legacy)
if not legacy:
patterns.add_frequencies(self.frequencies, **kwargs)
# Compute conditional entropy
for pair, df in patterns.items():
# Defective rows can't be kept here.
selector = df.pattern.notna()
df = df[selector].copy()
if legacy:
df['f_pred'] = 1
df['f_pair'] = 1
# We compute the number of pairs affected by this calculation.
data.loc[pair, "n_pairs"] = sum(selector)
# We aggregate features and applicable patterns.
# Lexemes that share these properties belong to similar classes.
classes = self.add_features(df)
if legacy and not debug:
data.loc[pair, "value"] = cond_entropy(df.pattern.apply(lambda x: (x,)),
classes,
subset=selector)
elif not debug:
data.at[pair, "value"] = [cond_entropy_slow(df, classes), cond_psuccess(df, classes)]
else:
data.at[pair, "value"] = self.cond_metrics_log(df,
classes,
pair,
legacy=legacy,
subset=selector)
data = data.explode(['value', "measure"])
self.add_measures(data.reset_index())
[docs]
def cond_metrics_log(self, group, classes, cells, subset=None, legacy=False):
"""Print a log of the probability distribution for one predictor.
Writes down the distributions
:math:`P( patterns_{c1, c2} | classes_{c1, c2} )`
for all unordered pairs of columns in :attr:`.patterns`.
Also writes the entropy of the distributions.
"""
def subclass_summary(subgroup, total):
""" Produces a nice summary of the available patterns for a subclass"""
ex = subgroup.iloc[0, :]
freq = subgroup.f_pair.sum() / total if total is not None else subgroup.shape[0]
freq = 0 if pd.isna(freq) else freq
return pd.Series([
f"{ex.lexeme}: {ex.form_x} → {ex.form_y}",
freq,
],
index=["example", 'subclass_size'])
def success_table(subgroup, patterns):
"""
Create a table which tells for each set of words behaving in a similar way,
which patterns could apply and what would be their probability of success.
"""
ex = subgroup.iloc[0, :]
forms_y = (str(y) for y in subgroup[subgroup.form_x == ex.form_x].form_y.values)
series = {'example': f"{ex.lexeme}: {ex.form_x} → {', '.join(forms_y)}"}
pats = patterns.copy()
pats.proba = pats.proba.astype(str)
pats.loc[~pats.index.isin(ex.pattern_pred), "proba"] = ""
series.update(pats.set_index('id').proba.to_dict())
series['f_pred'] = subgroup.f_pred.sum()
series['psuccess'] = subgroup[(subgroup.form_x == ex.form_x) &
(subgroup.lexeme == ex.lexeme)].psuccess.sum()
return pd.Series(series)
log.debug("\n# Distribution of {}→{} \n".format(cells[0], cells[1]))
A = group[subset]
B = classes[subset]
cond_events = A.groupby(B, sort=False)
log.debug("Showing distributions for "
+ str(len(cond_events))
+ " classes")
summary = []
for i, (classe, members) in enumerate(sorted(cond_events,
key=lambda x: len(x[1]),
reverse=True)):
# Group by patterns and build a summary
p_table = members.groupby('pattern').apply(subclass_summary, members.f_pair.sum())
# Log features
if self.features is not None:
feature_log = (
"Features: "
+ ", ".join(str(x) for x in classe[-self.features_len:]))
classe = classe[:-self.features_len]
# List possible patterns that are not used in this class.
for pattern in classe:
if pattern not in p_table.index:
p_table.loc[str(pattern), :] = ["-", 0]
# Create an ID for the patterns
p_table['id'] = range(p_table.shape[0])
p_table.id = "p_" + p_table.id.astype(str)
# Group by patterns for the predictor only (i.e. allow for overabundance)
members['pattern_pred'] = members.groupby(['lexeme', 'form_x'], observed=False)\
.pattern.transform(lambda x: [tuple(x)]*x.shape[0])
# Compute the pattern probabilities
p_table['proba'] = p_table.subclass_size / p_table.subclass_size.sum()
p_table.fillna(0, inplace=True)
members['psuccess'] = members.pattern.map(p_table.proba)
# Get nice table with examples.
table = members.groupby('pattern_pred', group_keys=False)\
.apply(success_table, patterns=p_table)\
.reset_index(drop=True)
# Compute metrics
ent = 0 + entropy(p_table.proba)
psuccess = table.f_pred @ table.psuccess / table.f_pred.sum()\
if table.f_pred.sum() > 0 else 0
summary.append([members.f_pred.sum(), ent, psuccess])
# Log the subclass properties
headers = ("Pattern", "Example",
"Frequency", "P(Pattern|class)")
p_table = p_table.reset_index().set_index("id")
p_table.columns = headers
table.rename(columns={"example": "Example",
"f_pred": "Frequency",
"psuccess": "P(success|class)"}, inplace=True)
table.set_index('Example', inplace=True)
stats = f"\n## Class n°{i} ({len(members)} members), H={ent:.3f}"
if not legacy:
stats += f", P(success)={psuccess:.3f}"
log.debug(stats)
if self.features is not None:
log.debug(feature_log)
if legacy:
p_table = p_table.iloc[:, :-1]
log.debug("\nPatterns found\n\n" + p_table.to_markdown())
if not legacy:
log.debug("\nDistribution of the forms\n\n" + table.to_markdown())
log.debug('\n## Class summary')
summary = pd.DataFrame(summary, columns=['Frequency',
'H(pattern|class)',
'P(success|class)'])
summary.index.name = "Class"
if legacy:
summary = summary.iloc[:, :-1]
sums = summary.iloc[:, 0].T @ summary.iloc[:, 1::] / summary.iloc[:, 0].sum()
log.debug(f'\nAv. conditional entropy: H(pattern|class)={sums.iloc[0]}')
if not legacy:
log.debug(f'\nAv. success probability: P(success|class)={sums.iloc[1]}')
log.debug("\n" + summary.to_markdown())
return sums.values
[docs]
def n_preds_entropy(self, n, paradigms, debug=False):
r"""
Wrapper to prepare the computation of n-ary entropies.
Loops through the cells and runs the computations for every set of predictors.
Arguments:
n (int): number of predictors.
paradigms (pandas.DataFrame): a DataFrame of paradigms
debug (bool): Whether to run a debug computation with full log. Defaults to False.
"""
if n == 1 and not debug:
return self.one_pred_entropy()
elif n == 1 and debug:
return self.one_pred_entropy_log()
if n >= len(paradigms.cells):
raise ValueError(f"There are {len(paradigms.cells)} cells and "
f"you requested {n} predictors, which is equal or higher. "
"Please set a lower value for entropies.n.")
log.info("Computing (c1, ..., c{!s}) → c{!s} entropies".format(n, n + 1))
log.debug(f"Logging n preds probabilities, with n = {n}")
log.debug(" P(x, y → z) = P(x~z, y~z | Class(x), Class(y), x~y)")
data = self.prepare_data(n=n, debug=debug).reset_index(drop=False)
# Get the measures
if not debug:
zeros = self.check_zeros(n)
data = data.groupby('predictor').apply(
self.n_preds_condent,
paradigms.data, zeros, n,
)
else:
data = data.groupby('predictor').apply(
self.n_preds_condent_log,
paradigms.data, n,
)
# Add to previous results
self.add_measures(data.reset_index(drop=True))
[docs]
def n_preds_condent(self, df, paradigms, zeros, n):
r"""
Computes the probability distribution for n predictors.
Writes down the distributions:
.. math::
P( patterns_{c1, c3}, \; \; patterns_{c2, c3} \; \; |
classes_{c1, c3}, \; \; \; \; classes_{c2, c3},
\; \; patterns_{c1, c2} )
The result contains entropy :math:`H(c_{1}, ..., c_{n} \\to c_{n+1} )`.
Values are computed for all unordered combinations of
:math:`(c_{1}, ..., c_{n+1})` in the
:attr:`paradigms`'s columns.
Indexes are tuples :math:`(c_{1}, ..., c_{n})`
and columns are the predicted cells :math:`c_{n+1}`.
Example:
For three cells c1, c2, c3, (n=2)
entropy of c1, c2 → c3,
noted :math:`H(c_{1}, c_{2} \to c_{3})` is:
.. math::
H( patterns_{c1, c3}, \; \; patterns_{c2, c3}\; \;
| classes_{c1, c3}, \; \; \; \;
classes_{c2, c3}, \; \; patterns_{c1, c2} )
Arguments:
n (int): number of predictors.
df (pandas.DataFrame): a DataFrame containing patterns
and applicable patterns for pairs of forms.
paradigms (pandas.DataFrame): a DataFrame of paradigms.
zeros (dict): a dictionary of pairs that lead to an entropy of zero.
n (int): number of predictors
"""
def already_zero(predictors, out, zeros):
if zeros is None or not zeros:
return False
for preds_subset in combinations(predictors, n - 1):
if frozenset(preds_subset) in zeros.get(out, {}):
return True
return False
# For faster access
patterns = self.patterns
predictors = df.name.split('&')
pairs_of_predictors = list(combinations(predictors, 2))
set_predictors = set(predictors)
known_patterns = pd.concat([patterns[k]
.set_index('lexeme')
.pattern
for k in pairs_of_predictors],
axis=1)
predlexemes = known_patterns.notna().all(axis=1)
known_patterns = known_patterns.map(lambda x: (x,) if not isinstance(x, tuple) else x)\
.sum(axis=1)
def row_condent(x):
"""
Computes the conditional entropy for a given set of predictors
and a target.
Arguments:
x (pandas.Series): a Series containing information for the computation.
"""
out = x.predicted
outlexemes = paradigms[(paradigms.cell == out) &
~(paradigms.form.apply(lambda x: x.is_defective()))]
selector = predlexemes & predlexemes.index.isin(outlexemes.lexeme)
x.n_pairs = sum(selector)
if already_zero(set_predictors, out, zeros):
x.value = 0
else:
# Under the pattern column, getting intersection of patterns events for each
# predictor: x~z, y~z
# Under the applicable column, getting
# - Known classes Class(x), Class(y)
# - known patterns x~y
# - plus all features
pattern_pairs = [patterns[(pred, out)]
.set_index('lexeme')
[selector][['pattern', 'applicable']]
.map(lambda x: (x,) if not isinstance(x, tuple) else x)
for pred in predictors]
pattern_pairs = reduce(lambda x, y: x+y, pattern_pairs)
pattern_pairs.applicable += known_patterns[selector]
classes = self.add_features(pattern_pairs)
# Prediction of H(A|B)
x.value = cond_entropy(pattern_pairs.pattern,
classes,
subset=selector)
return x
return df.apply(row_condent, axis=1)
[docs]
def n_preds_condent_log(self, df, paradigms, n):
r"""
Computes the probability distribution for n predictors
and logs the details of the computations.
Writes down the distributions:
.. math::
P( patterns_{c1, c3}, \; \; patterns_{c2, c3} \; \; |
classes_{c1, c3}, \; \; \; \; classes_{c2, c3},
\; \; patterns_{c1, c2} )
The result contains entropy :math:`H(c_{1}, ..., c_{n} \to c_{n+1} )`.
Values are computed for all unordered combinations of
:math:`(c_{1}, ..., c_{n+1})` in the
:attr:`paradigms`'s columns.
Indexes are tuples :math:`(c_{1}, ..., c_{n})`
and columns are the predicted cells :math:`c_{n+1}`.
Example:
For three cells c1, c2, c3, (n=2)
entropy of c1, c2 → c3,
noted :math:`H(c_{1}, c_{2} \to c_{3})` is:
.. math::
H( patterns_{c1, c3}, \; \; patterns_{c2, c3}\; \;
| classes_{c1, c3}, \; \; \; \;
classes_{c2, c3}, \; \; patterns_{c1, c2} )
Arguments:
n (int): number of predictors.
df (pandas.DataFrame): a DataFrame containing patterns
and applicable patterns for pairs of forms.
paradigms (pandas.DataFrame): a DataFrame of paradigms.
n (int): number of predictors
"""
def count_with_examples(row, counter, examples, paradigms, pred, out):
lemma, pattern = row
predictors = "; ".join(paradigms.loc[(paradigms.lexeme == lemma) &
(paradigms.cell == c)]
.form.values[0]
for c in pred)
predicted = paradigms.loc[(paradigms.lexeme == lemma) &
(paradigms.cell == out)].form.values[0]
example = f"{lemma}: ({predictors}) → {predicted}"
counter[pattern] += 1
examples[pattern] = example
def format_patterns(series, string):
patterns = ("; ".join(str(pattern)
for pattern in pair)
for pair in series)
return string.format(*patterns)
pred_numbers = list(range(1, n + 1))
patterns_string = "\n".join(f"{pred}~{n + 1}" + "= {}" for pred in pred_numbers)
applicable_string = "\n * " + "\n * ".join(f"Class({pred}, {n + 1})" + "= {}"
for pred in pred_numbers)
known_pat_string = "\n * " "\n * ".join("{!s}~{!s}".format(*preds) +
"= {}" for preds
in combinations(pred_numbers, 2))
def format_features(features):
return "\n* Features:\n * " + "\n * ".join(str(x) for x in features)
def formatting_local_patterns(x):
return format_patterns(x, patterns_string)
def formatting_applicable_patterns(x):
return format_patterns(x, applicable_string)
def formatting_known_patterns(x):
return format_patterns(x, known_pat_string)
# For faster access
patterns = self.patterns
predictors = df.name.split('&')
pairs_of_predictors = list(combinations(predictors, 2))
# Patterns of alternations between the predictors.
known_patterns = pd.concat([patterns[k]
.set_index('lexeme')
.pattern
.rename('&'.join(k))
for k in pairs_of_predictors],
axis=1)
predlexemes = known_patterns.notna().all(axis=1)
known_patterns = known_patterns.map(lambda x: (x,) if not isinstance(x, tuple) else x)
def row_condent(x, known_patterns):
"""
Arguments:
x: a Seris with results and information about the cells.
known_patterns: the patterns of alternation
between the predictors, which are considered known.
"""
patterns = self.patterns
out = x.predicted
outlexemes = paradigms[(paradigms.cell == out) &
~(paradigms.form.apply(lambda x: x.is_defective()))]
selector = predlexemes & predlexemes.index.isin(outlexemes.lexeme)
x.n_pairs = sum(selector)
log.debug(f"\n# Distribution of ({', '.join(predictors)}) → {out} \n")
applicable_patterns = [
patterns[(pred, out)]
.set_index('lexeme')[selector]
.applicable
.rename(f"{pred}&{out}")
.map(lambda x: (x,) if not isinstance(x, tuple) else x)
for pred in predictors
]
applicable_patterns = pd.concat(applicable_patterns, axis=1)
gold_patterns = [patterns[(pred, out)]
.set_index('lexeme')[selector]
.pattern
.rename(f"{pred}&{out}")
.map(lambda x: (x,) if not isinstance(x, tuple) else x)
for pred in predictors]
gold_patterns = pd.concat(gold_patterns, axis=1)
# Getting intersection of patterns events for each predictor:
# x~z, y~z
A = gold_patterns.apply(formatting_local_patterns, axis=1)
# Known classes Class(x), Class(y) and known patterns x~y
applicable_patterns = applicable_patterns.apply(formatting_applicable_patterns,
axis=1)
known_patterns = known_patterns.apply(formatting_known_patterns,
axis=1)
B = applicable_patterns + known_patterns
if self.features is not None:
known_features = self.features[selector].apply(format_features)
B = B + known_features
cond_events = A.groupby(B, sort=False)
log.debug("Showing distributions for "
+ str(len(cond_events))
+ " classes")
summary = []
for i, (classe, members) in enumerate(sorted(cond_events,
key=lambda x: len(x[1]),
reverse=True)):
log.debug("\n## Class n°%s (%s members).", i, len(members))
counter = Counter()
examples = defaultdict()
members.reset_index().apply(count_with_examples,
args=(counter, examples,
paradigms,
predictors, out), axis=1)
total = sum(list(counter.values()))
log.debug("* Total: %s", total)
table = []
for my_pattern in counter:
row = (my_pattern,
examples[my_pattern],
counter[my_pattern],
counter[my_pattern] / total)
table.append(row)
headers = ("Patterns", "Example",
"Size", "P(Pattern|class)")
table = pd.DataFrame(table, columns=headers)
# Get the slow computation results
summary.append([table.Size.sum(),
0 + entropy(table.iloc[:, -1])])
log.debug("\n" + table.to_markdown())
log.debug('\n## Class summary')
summary = pd.DataFrame(summary, columns=['Size', 'H(pattern|class)'])
summary.index.name = "Class"
x.value = (summary.iloc[:, -2] * summary.iloc[:, -1] / summary.iloc[:, -2].sum()).sum()
log.debug(f'\nAv. conditional entropy: H(pattern|class)={x.value}')
log.debug("\n" + summary.to_markdown())
return x
return df.apply(row_condent, args=[known_patterns], axis=1)