# -*- coding: utf-8 -*-
# !/usr/bin/env python
"""author: Sacha Beniamine and Jules Bouton.
Paradigms class to represent paralex paradigms.
"""
from collections import defaultdict
from pathlib import Path
from tqdm import tqdm
import random
from .segments import Inventory, Form
from .frequencies import Frequencies
from ..utils import memory_check
import logging
import pandas as pd
from paralex import read_table
log = logging.getLogger("Qumin")
tqdm.pandas()
[docs]
class Paradigms(object):
"""
Paradigms with methods to normalize them, merge and restore columns, etc.
"""
default_cols = ("lexeme", "cell", "phon_form")
data = None
cells = None
cells_dedup = None
[docs]
def __init__(self, dataset, **kwargs):
"""Read paradigms data, and prepare it according to a Segment class pool.
Arguments:
dataset (`frictionless.Package`): paralex frictionless Package
All characters occuring in the paradigms except the first column
should be inventoried in this class.
kwargs: additional arguments passed to :func:`Package.preprocess`
Returns:
paradigms (:class:`pandas:pandas.DataFrame`): paradigms table
(rows contain forms, lemmas, cells).
"""
self.dataset = dataset
# Initializing segments
sounds_file_name = Path(dataset.basepath or "./") / dataset.get_resource("sounds").path
self.inventory = Inventory.from_file(sounds_file_name)
# Reading the paradigms.
# 1. Set the expected dtypes & NA values on each important column.
form_cols = ['phon_form', 'orth_form',
'analyzed_phon_form', 'analyzed_orth_form']
default_dtype = {'category': ['cell', 'lexeme'],
'string': form_cols + ['overabundance_tag'],
'float': ['frequency']
}
na_values = {c: ["#DEF#", "#MISSING#"] for c in form_cols}
na_values.update({'frequency': ''})
# 2. Read the data using the paralex utility to handle multifile tables.
self.data = read_table("forms", dataset, na_values=na_values,
dtype=defaultdict(lambda: 'object', {col: dt for dt, cols in default_dtype.items() for col in cols}),
keep_default_na=False)
self.frequencies = Frequencies(dataset)
self.preprocess(**kwargs)
self.frequencies.drop_unused(self.data)
def _get_unknown_segments(self, row, unknowns, resegment=False):
"""
Checks whether all segments that appear in the paradigms are known.
"""
cell, form = row
known_sounds = set(self.inventory._classes) | set(self.inventory._normalization) | {"", " "}
tokens = self.inventory.segment_form(form, resegment=resegment)
for char in tokens:
if char not in known_sounds:
unknowns[char].append(form + " " + cell)
[docs]
def preprocess(self, fillna=True, segcheck=True,
defective=False, overabundant=False,
cells=None, sample_lexemes=None, sample_cells=None, sample_kws=None, pos=None,
resegment=False, lexemes_list=None, **kwargs):
"""
Preprocess a Paralex paradigms table to meet the requirements of Qumin:
- Filter by POS and by cells
- Filter by frequency, sample
- Filter overabundance and defectivity
- Merge identical columns
- Check segments and create Form() objects
Arguments:
fillna (bool): Defaults to True. Should #DEF# be replaced by np.NaN ?
Otherwise they are filled with empty strings ("").
segcheck (bool): Defaults to True. Should I check that all the phonological segments
in the table are defined in the segments table?
defective (bool): Defaults to False. Should I keep rows with defective forms?
overabundant (bool): Defaults to False. Should I keep rows with overabundant forms?
cells (List[str]): List of cell names to consider. Defaults to all.
pos (List[str]): List of parts of speech to consider. Defaults to all.
lexemes_list (path): Path to a file containing one lexeme per row.
sample_lexemes (int): Defaults to None. Should I sample n lexemes
(for debug purposes)?
sample_cells (int): Defaults to None. Should I sample n lexemes
(for debug purposes)?
sample_kws (dict): Dict of keywords passed to :func:`_sample_paradigms`.
resegment (bool): Defaults to False. Should I resegment the paradigms?
"""
lexemes, cell_col, form_col = self.default_cols
paradigms = self.data
# Check long format conformity
if not {lexemes, cell_col, form_col} < set(paradigms.columns):
log.warning("Please use Paralex-style long-form table "
"(http://www.paralex-standard.org).")
# POS filtering
if pos:
self._filter_pos(paradigms, pos)
sample_kws = {} if sample_kws is None else sample_kws
cells = self._get_cells(cells=cells, pos=pos, n=sample_cells, **sample_kws)
if cells is not None:
self._filter_cells(paradigms, cells, cell_col)
# Remove defectives
if not defective:
defective_lexemes = set(paradigms.loc[paradigms[form_col].isna(), lexemes].unique())
paradigms.drop(paradigms[paradigms.loc[:, lexemes].isin(defective_lexemes)].index,
inplace=True)
# Check for duplicate overabundant phon_forms and sum the frequencies.
# This handles cases where the orth_form is different and has two records.
# For actual handling of truly overabundant phon_form, see below
subset_cols = ["lexeme", "cell", "phon_form"]
dup = paradigms.duplicated(subset=subset_cols, keep=False)
if dup.any():
if "frequency" in paradigms.columns:
paradigms.loc[dup, 'frequency'] = paradigms.loc[dup].groupby(subset_cols, observed=True).frequency.transform('sum')
paradigms.drop_duplicates(subset=subset_cols, inplace=True)
# Remove overabundance if asked
if not overabundant.keep:
paradigms = self._drop_overabundant(paradigms, overabundant)
# Sample lexemes
if lexemes_list:
self._keep_lexemes(paradigms, lexemes_list, lexeme_col=lexemes)
if sample_lexemes:
self._sample_paradigms(paradigms, lexeme_col=lexemes, n=sample_lexemes, **sample_kws)
paradigms[form_col] = paradigms[form_col].fillna(value="")
# Check segment definitionskwargs
if segcheck:
log.info("Checking we have definitions for all "
"the phonological segments in this data...")
unknowns = defaultdict(list)
paradigms[[cell_col, form_col]].progress_apply(self._get_unknown_segments,
resegment=resegment,
unknowns=unknowns, axis=1)
if len(unknowns) > 0:
alert = "Your paradigm has unknown segments: " + "\n ".join(
"[{}] (in {} forms:{}) ".format(
u, len(unknowns[u]), ", ".join(unknowns[u][:10])) for u in unknowns)
raise ValueError(alert)
# Create Form() objects from strings representations.
keep_cols = ["form_id"] + list(self.default_cols)
paradigms.drop([c for c in paradigms.columns if c not in keep_cols], axis=1, inplace=True)
paradigms[form_col] = paradigms[['form_id', form_col]].apply(
lambda x: Form.from_raw(x[form_col], self.inventory, x.form_id, resegment=resegment), axis=1)
paradigms.rename(columns={form_col: "form"}, inplace=True)
paradigms.set_index('form_id', inplace=True)
self.data = paradigms
# Identify identical (fully syncretic) cell pairs
self.find_cell_duplicates()
# Save data
log.debug(self.data)
memory_check(self.data, 2, **kwargs)
self._update_cell()
def _drop_overabundant(self, paradigms, overabundant):
"""
Drop overabundant forms in a non-random way:
- Check first for tags
- Then use the most frequent form
"""
log.info("Dropping overabundant entries according to policy: {}".format(overabundant))
tag_cols = [c for c in paradigms.columns if c.endswith("_tag")]
def form_sorter(row):
tag_sorter = []
freq_sorter = []
if overabundant.tags:
tags = [t for col in tag_cols for t in row[col].split(";")]
s = []
for i, t in enumerate(overabundant.tags):
if t in tags:
s.append(i)
if not s:
s.append(len(overabundant.tags))
tag_sorter = [tuple(s)]
if overabundant.freq and "frequency" in row:
freq_sorter = [-float(row["frequency"]) if not pd.isna(row["frequency"])
and row["frequency"] != "" else 0]
return tag_sorter + freq_sorter + [row.name]
lexemes, cell_col, form_col = self.default_cols
# For defectives, we need to explicitely set 0.
overab_order = paradigms.apply(form_sorter, axis=1).sort_values()
# this is difficult to do in place, hence assigning to self.data
paradigms = paradigms.loc[overab_order.index, :]
paradigms.drop_duplicates([lexemes, cell_col], keep="first", inplace=True)
return paradigms
def _filter_pos(self, paradigms, pos):
"""
Keeps only lexemes with required POS.
Arguments:
paradigms (pandas.DataFrame): The dataframe to sample.
pos (str or List(str)): the POS to keep.
"""
if 'lexemes' in self.dataset.resource_names:
table = read_table('lexemes', self.dataset)
if 'POS' not in table.columns:
log.warning('No POS column in the lexemes table.')
else:
if isinstance(pos, str):
pos = [pos]
paradigms.drop(paradigms[~paradigms['lexeme']
.map(table.set_index('lexeme_id').POS)
.isin(pos)].index,
inplace=True)
else:
log.warning("No lexemes table. Can't filter based on POS.")
@staticmethod
def _keep_lexemes(paradigms, lexemes_list, lexeme_col="lexeme"):
"""
Keeps only the lexemes provided in a file.
Arguments:
paradigms (pandas.DataFrame): The dataframe to sample.
lexemes_list (path): The path to a file containing one lexeme per row.
lexeme_col (str): The name of the lexemes' column.
"""
lexemes_list = pd.read_csv(lexemes_list, names=['lexeme_id'])
missing = lexemes_list[~lexemes_list.lexeme_id.isin(paradigms.lexeme)]
if not missing.empty:
log.warning('Some lexeme ids in the lexemes file were missing or already dropped:'
f' {" ,".join(missing.head(n=10).lexeme_id)} (total: {len(missing)})')
paradigms.drop(paradigms.loc[~paradigms.lexeme.isin(lexemes_list.lexeme_id), :].index,
inplace=True)
def _sample_paradigms(self, paradigms, n, force_random=False, lexeme_col="lexeme", seed=1):
"""
Samples the paradigms to keep only some lexemes.
Arguments:
paradigms (pandas.DataFrame): The dataframe to sample.
n (int): The number of lexemes to sample.
force_random (bool): Whether to force random sampling.
lexeme_col (str): The name of the lexemes' column.
seed (int): Random seed to use. Ensures reproducibility between scripts.
"""
# By frequency, if possible
if not force_random and self.frequencies.has_frequencies('lexemes'):
lex_freq = self.frequencies.lexemes
# Restrict to lexemes we have kept, if we dropped defectives
inflected = paradigms.loc[:, lexeme_col].unique()
selected = lex_freq[lex_freq.index.isin(inflected)]\
.sort_values("value", ascending=False)\
.iloc[:n, :].index.to_list()
else:
# Random sampling
if not force_random:
log.warning("You requested frequency sampling but no frequencies "
"were available for the lexemes. Falling back to random "
"sampling. You could set force_random=True.")
population = list(paradigms.lexeme.unique())
if n > len(population):
log.warning(f"You requested more lexemes than I can offer (sample={n})."
f"Using all available lexemes ({len(population)})")
selected = population
else:
random.seed(seed)
selected = random.sample(population, n)
paradigms.drop(paradigms.loc[~paradigms.lexeme.isin(selected), :].index,
inplace=True)
def _get_cells(self, cells=None, pos=None, n=None, force_random=False, seed=1):
"""
Returns a list of cells to use based on CLI arguments. Two configurations:
- If a list of cells was provided, select those cells
- If a POS, select all cells belonging to this POS
- If n is provided, sample n cells randomly from the resulting list.
Arguments:
cells (list): A list of cells
pos (str): A POS to consider
n (int): The number of lexemes to sample.
force_random (bool): Whether to forced random sampling.
seed (int): Random seed to use. Ensures reproducibility between scripts.
"""
# Ensure that all cells are interpreted as strings (a frequent issue with inf)
if cells:
cells = [str(c) for c in cells]
# POS based selection
if cells and pos:
raise ValueError("You can't specify both cells and POS.")
elif cells:
if cells and len(cells) == 1:
raise ValueError("You can't provide only one cell.")
cells = cells
elif pos:
if 'cells' in self.dataset.resource_names:
table = read_table('cells', self.dataset)
if 'POS' not in table.columns:
log.warning('No POS column in the cells table. The POS filtering will be applied to lexemes only')
if isinstance(pos, str):
pos = [pos]
cells = table.loc[table['POS'].isin(pos), 'cell_id']
else:
log.warning('No cells table. The POS filtering will be applied to lexemes only')
cells = None
# Optional sampling
if n:
if not cells:
cells = list(read_table('forms', self.dataset).cell.unique())
if not force_random and self.frequencies.has_frequencies('cells'):
cell_freq = self.frequencies.cells
# Restrict to cells we have kept, if we dropped some
cells = cell_freq[cell_freq.index.isin(cells)]\
.sort_values("value", ascending=False)\
.iloc[:n, :].index.to_list()
else:
# Random sampling
if not force_random:
log.warning("You requested frequency sampling but no frequencies "
"were available for the cells. Falling back to random "
"sampling. You could set force_random=True.")
if n > len(cells):
log.warning(f"You requested more cells than I can offer (sample={n})."
f"Using all available cells ({len(cells)})")
cells = cells
else:
random.seed(seed)
cells = random.sample(cells, n)
return cells
@staticmethod
def _filter_cells(paradigms, cells, column):
""" Keeps only the provided cells.
Performs security check before dropping.
Arguments:
paradigms (pandas.DataFrame): the paradigms to alter.
cells (List[Str]): the list of cells to drop.
column (str): name of the column which contains the cells
"""
col_cells = paradigms[column].unique()
unknown_cells = set(cells) - set(col_cells)
if unknown_cells:
raise ValueError("You specified some cells which aren't "
f"in the paradigm : {' '.join(unknown_cells)}")
to_drop = set(col_cells) - set(cells)
if len(to_drop) > 0:
log.info(f"Dropping rows with following cell values: {', '.join(sorted(to_drop))}")
paradigms.drop(paradigms[paradigms[column].isin(to_drop)].index,
inplace=True)
[docs]
def find_cell_duplicates(self):
""" Identify duplicate cells (same forms everywhere).
"""
log.info("Checking for identical columns...")
names = defaultdict(list)
self._update_cell()
for c in tqdm(self.cells):
hashable = tuple(sorted(self.data.loc[self.data.cell == c, ['form', 'lexeme']]
.apply(tuple, axis=1).to_list()))
names[hashable].append(c)
self.cells_dedup = {i[0]: i[1:] for i in names.values()}
for c, dups in self.cells_dedup.items():
if dups:
log.debug(f"\tCell(s) {dups} have identical forms to {c}")
[docs]
def get_empty_pattern_df(self, a, b):
"""
Returns an oriented dataframe to store
patterns for two cells.
Arguments:
a (str): cell A name
b (str): cell B name
"""
new = pd.merge(self.data.loc[self.data.cell == a],
self.data.loc[self.data.cell == b],
on="lexeme")
new.sort_values(by=['lexeme', 'form_x', 'form_y'], inplace=True)
new.reset_index(inplace=True)
return new[['lexeme', 'form_x', 'form_y']]
def _update_cell(self):
"""
Updates the ``cells`` attribute based on the cells from the dataframe.
"""
self.cells = list(self.data.cell.unique())