Source code for qumin.representations.paradigms

# -*- coding: utf-8 -*-
# !/usr/bin/env python
"""author: Sacha Beniamine and Jules Bouton.

Paradigms class to represent paralex paradigms.
"""

from collections import defaultdict
from pathlib import Path
from tqdm import tqdm
import random

from .segments import Inventory, Form
from .frequencies import Frequencies
from ..utils import memory_check

import logging
import pandas as pd
from paralex import read_table

log = logging.getLogger("Qumin")
tqdm.pandas()


[docs] class Paradigms(object): """ Paradigms with methods to normalize them, merge and restore columns, etc. """ default_cols = ("lexeme", "cell", "phon_form") data = None cells = None cells_dedup = None
[docs] def __init__(self, dataset, **kwargs): """Read paradigms data, and prepare it according to a Segment class pool. Arguments: dataset (`frictionless.Package`): paralex frictionless Package All characters occuring in the paradigms except the first column should be inventoried in this class. kwargs: additional arguments passed to :func:`Package.preprocess` Returns: paradigms (:class:`pandas:pandas.DataFrame`): paradigms table (rows contain forms, lemmas, cells). """ self.dataset = dataset # Initializing segments sounds_file_name = Path(dataset.basepath or "./") / dataset.get_resource("sounds").path self.inventory = Inventory.from_file(sounds_file_name) # Reading the paradigms. # 1. Set the expected dtypes & NA values on each important column. form_cols = ['phon_form', 'orth_form', 'analyzed_phon_form', 'analyzed_orth_form'] default_dtype = {'category': ['cell', 'lexeme'], 'string': form_cols + ['overabundance_tag'], 'float': ['frequency'] } na_values = {c: ["#DEF#", "#MISSING#"] for c in form_cols} na_values.update({'frequency': ''}) # 2. Read the data using the paralex utility to handle multifile tables. self.data = read_table("forms", dataset, na_values=na_values, dtype=defaultdict(lambda: 'object', {col: dt for dt, cols in default_dtype.items() for col in cols}), keep_default_na=False) self.frequencies = Frequencies(dataset) self.preprocess(**kwargs) self.frequencies.drop_unused(self.data)
def _get_unknown_segments(self, row, unknowns, resegment=False): """ Checks whether all segments that appear in the paradigms are known. """ cell, form = row known_sounds = set(self.inventory._classes) | set(self.inventory._normalization) | {"", " "} tokens = self.inventory.segment_form(form, resegment=resegment) for char in tokens: if char not in known_sounds: unknowns[char].append(form + " " + cell)
[docs] def preprocess(self, fillna=True, segcheck=True, defective=False, overabundant=False, cells=None, sample_lexemes=None, sample_cells=None, sample_kws=None, pos=None, resegment=False, lexemes_list=None, **kwargs): """ Preprocess a Paralex paradigms table to meet the requirements of Qumin: - Filter by POS and by cells - Filter by frequency, sample - Filter overabundance and defectivity - Merge identical columns - Check segments and create Form() objects Arguments: fillna (bool): Defaults to True. Should #DEF# be replaced by np.NaN ? Otherwise they are filled with empty strings (""). segcheck (bool): Defaults to True. Should I check that all the phonological segments in the table are defined in the segments table? defective (bool): Defaults to False. Should I keep rows with defective forms? overabundant (bool): Defaults to False. Should I keep rows with overabundant forms? cells (List[str]): List of cell names to consider. Defaults to all. pos (List[str]): List of parts of speech to consider. Defaults to all. lexemes_list (path): Path to a file containing one lexeme per row. sample_lexemes (int): Defaults to None. Should I sample n lexemes (for debug purposes)? sample_cells (int): Defaults to None. Should I sample n lexemes (for debug purposes)? sample_kws (dict): Dict of keywords passed to :func:`_sample_paradigms`. resegment (bool): Defaults to False. Should I resegment the paradigms? """ lexemes, cell_col, form_col = self.default_cols paradigms = self.data # Check long format conformity if not {lexemes, cell_col, form_col} < set(paradigms.columns): log.warning("Please use Paralex-style long-form table " "(http://www.paralex-standard.org).") # POS filtering if pos: self._filter_pos(paradigms, pos) sample_kws = {} if sample_kws is None else sample_kws cells = self._get_cells(cells=cells, pos=pos, n=sample_cells, **sample_kws) if cells is not None: self._filter_cells(paradigms, cells, cell_col) # Remove defectives if not defective: defective_lexemes = set(paradigms.loc[paradigms[form_col].isna(), lexemes].unique()) paradigms.drop(paradigms[paradigms.loc[:, lexemes].isin(defective_lexemes)].index, inplace=True) # Check for duplicate overabundant phon_forms and sum the frequencies. # This handles cases where the orth_form is different and has two records. # For actual handling of truly overabundant phon_form, see below subset_cols = ["lexeme", "cell", "phon_form"] dup = paradigms.duplicated(subset=subset_cols, keep=False) if dup.any(): if "frequency" in paradigms.columns: paradigms.loc[dup, 'frequency'] = paradigms.loc[dup].groupby(subset_cols, observed=True).frequency.transform('sum') paradigms.drop_duplicates(subset=subset_cols, inplace=True) # Remove overabundance if asked if not overabundant.keep: paradigms = self._drop_overabundant(paradigms, overabundant) # Sample lexemes if lexemes_list: self._keep_lexemes(paradigms, lexemes_list, lexeme_col=lexemes) if sample_lexemes: self._sample_paradigms(paradigms, lexeme_col=lexemes, n=sample_lexemes, **sample_kws) paradigms[form_col] = paradigms[form_col].fillna(value="") # Check segment definitionskwargs if segcheck: log.info("Checking we have definitions for all " "the phonological segments in this data...") unknowns = defaultdict(list) paradigms[[cell_col, form_col]].progress_apply(self._get_unknown_segments, resegment=resegment, unknowns=unknowns, axis=1) if len(unknowns) > 0: alert = "Your paradigm has unknown segments: " + "\n ".join( "[{}] (in {} forms:{}) ".format( u, len(unknowns[u]), ", ".join(unknowns[u][:10])) for u in unknowns) raise ValueError(alert) # Create Form() objects from strings representations. keep_cols = ["form_id"] + list(self.default_cols) paradigms.drop([c for c in paradigms.columns if c not in keep_cols], axis=1, inplace=True) paradigms[form_col] = paradigms[['form_id', form_col]].apply( lambda x: Form.from_raw(x[form_col], self.inventory, x.form_id, resegment=resegment), axis=1) paradigms.rename(columns={form_col: "form"}, inplace=True) paradigms.set_index('form_id', inplace=True) self.data = paradigms # Identify identical (fully syncretic) cell pairs self.find_cell_duplicates() # Save data log.debug(self.data) memory_check(self.data, 2, **kwargs) self._update_cell()
def _drop_overabundant(self, paradigms, overabundant): """ Drop overabundant forms in a non-random way: - Check first for tags - Then use the most frequent form """ log.info("Dropping overabundant entries according to policy: {}".format(overabundant)) tag_cols = [c for c in paradigms.columns if c.endswith("_tag")] def form_sorter(row): tag_sorter = [] freq_sorter = [] if overabundant.tags: tags = [t for col in tag_cols for t in row[col].split(";")] s = [] for i, t in enumerate(overabundant.tags): if t in tags: s.append(i) if not s: s.append(len(overabundant.tags)) tag_sorter = [tuple(s)] if overabundant.freq and "frequency" in row: freq_sorter = [-float(row["frequency"]) if not pd.isna(row["frequency"]) and row["frequency"] != "" else 0] return tag_sorter + freq_sorter + [row.name] lexemes, cell_col, form_col = self.default_cols # For defectives, we need to explicitely set 0. overab_order = paradigms.apply(form_sorter, axis=1).sort_values() # this is difficult to do in place, hence assigning to self.data paradigms = paradigms.loc[overab_order.index, :] paradigms.drop_duplicates([lexemes, cell_col], keep="first", inplace=True) return paradigms def _filter_pos(self, paradigms, pos): """ Keeps only lexemes with required POS. Arguments: paradigms (pandas.DataFrame): The dataframe to sample. pos (str or List(str)): the POS to keep. """ if 'lexemes' in self.dataset.resource_names: table = read_table('lexemes', self.dataset) if 'POS' not in table.columns: log.warning('No POS column in the lexemes table.') else: if isinstance(pos, str): pos = [pos] paradigms.drop(paradigms[~paradigms['lexeme'] .map(table.set_index('lexeme_id').POS) .isin(pos)].index, inplace=True) else: log.warning("No lexemes table. Can't filter based on POS.") @staticmethod def _keep_lexemes(paradigms, lexemes_list, lexeme_col="lexeme"): """ Keeps only the lexemes provided in a file. Arguments: paradigms (pandas.DataFrame): The dataframe to sample. lexemes_list (path): The path to a file containing one lexeme per row. lexeme_col (str): The name of the lexemes' column. """ lexemes_list = pd.read_csv(lexemes_list, names=['lexeme_id']) missing = lexemes_list[~lexemes_list.lexeme_id.isin(paradigms.lexeme)] if not missing.empty: log.warning('Some lexeme ids in the lexemes file were missing or already dropped:' f' {" ,".join(missing.head(n=10).lexeme_id)} (total: {len(missing)})') paradigms.drop(paradigms.loc[~paradigms.lexeme.isin(lexemes_list.lexeme_id), :].index, inplace=True) def _sample_paradigms(self, paradigms, n, force_random=False, lexeme_col="lexeme", seed=1): """ Samples the paradigms to keep only some lexemes. Arguments: paradigms (pandas.DataFrame): The dataframe to sample. n (int): The number of lexemes to sample. force_random (bool): Whether to force random sampling. lexeme_col (str): The name of the lexemes' column. seed (int): Random seed to use. Ensures reproducibility between scripts. """ # By frequency, if possible if not force_random and self.frequencies.has_frequencies('lexemes'): lex_freq = self.frequencies.lexemes # Restrict to lexemes we have kept, if we dropped defectives inflected = paradigms.loc[:, lexeme_col].unique() selected = lex_freq[lex_freq.index.isin(inflected)]\ .sort_values("value", ascending=False)\ .iloc[:n, :].index.to_list() else: # Random sampling if not force_random: log.warning("You requested frequency sampling but no frequencies " "were available for the lexemes. Falling back to random " "sampling. You could set force_random=True.") population = list(paradigms.lexeme.unique()) if n > len(population): log.warning(f"You requested more lexemes than I can offer (sample={n})." f"Using all available lexemes ({len(population)})") selected = population else: random.seed(seed) selected = random.sample(population, n) paradigms.drop(paradigms.loc[~paradigms.lexeme.isin(selected), :].index, inplace=True) def _get_cells(self, cells=None, pos=None, n=None, force_random=False, seed=1): """ Returns a list of cells to use based on CLI arguments. Two configurations: - If a list of cells was provided, select those cells - If a POS, select all cells belonging to this POS - If n is provided, sample n cells randomly from the resulting list. Arguments: cells (list): A list of cells pos (str): A POS to consider n (int): The number of lexemes to sample. force_random (bool): Whether to forced random sampling. seed (int): Random seed to use. Ensures reproducibility between scripts. """ # Ensure that all cells are interpreted as strings (a frequent issue with inf) if cells: cells = [str(c) for c in cells] # POS based selection if cells and pos: raise ValueError("You can't specify both cells and POS.") elif cells: if cells and len(cells) == 1: raise ValueError("You can't provide only one cell.") cells = cells elif pos: if 'cells' in self.dataset.resource_names: table = read_table('cells', self.dataset) if 'POS' not in table.columns: log.warning('No POS column in the cells table. The POS filtering will be applied to lexemes only') if isinstance(pos, str): pos = [pos] cells = table.loc[table['POS'].isin(pos), 'cell_id'] else: log.warning('No cells table. The POS filtering will be applied to lexemes only') cells = None # Optional sampling if n: if not cells: cells = list(read_table('forms', self.dataset).cell.unique()) if not force_random and self.frequencies.has_frequencies('cells'): cell_freq = self.frequencies.cells # Restrict to cells we have kept, if we dropped some cells = cell_freq[cell_freq.index.isin(cells)]\ .sort_values("value", ascending=False)\ .iloc[:n, :].index.to_list() else: # Random sampling if not force_random: log.warning("You requested frequency sampling but no frequencies " "were available for the cells. Falling back to random " "sampling. You could set force_random=True.") if n > len(cells): log.warning(f"You requested more cells than I can offer (sample={n})." f"Using all available cells ({len(cells)})") cells = cells else: random.seed(seed) cells = random.sample(cells, n) return cells @staticmethod def _filter_cells(paradigms, cells, column): """ Keeps only the provided cells. Performs security check before dropping. Arguments: paradigms (pandas.DataFrame): the paradigms to alter. cells (List[Str]): the list of cells to drop. column (str): name of the column which contains the cells """ col_cells = paradigms[column].unique() unknown_cells = set(cells) - set(col_cells) if unknown_cells: raise ValueError("You specified some cells which aren't " f"in the paradigm : {' '.join(unknown_cells)}") to_drop = set(col_cells) - set(cells) if len(to_drop) > 0: log.info(f"Dropping rows with following cell values: {', '.join(sorted(to_drop))}") paradigms.drop(paradigms[paradigms[column].isin(to_drop)].index, inplace=True)
[docs] def find_cell_duplicates(self): """ Identify duplicate cells (same forms everywhere). """ log.info("Checking for identical columns...") names = defaultdict(list) self._update_cell() for c in tqdm(self.cells): hashable = tuple(sorted(self.data.loc[self.data.cell == c, ['form', 'lexeme']] .apply(tuple, axis=1).to_list())) names[hashable].append(c) self.cells_dedup = {i[0]: i[1:] for i in names.values()} for c, dups in self.cells_dedup.items(): if dups: log.debug(f"\tCell(s) {dups} have identical forms to {c}")
[docs] def get_empty_pattern_df(self, a, b): """ Returns an oriented dataframe to store patterns for two cells. Arguments: a (str): cell A name b (str): cell B name """ new = pd.merge(self.data.loc[self.data.cell == a], self.data.loc[self.data.cell == b], on="lexeme") new.sort_values(by=['lexeme', 'form_x', 'form_y'], inplace=True) new.reset_index(inplace=True) return new[['lexeme', 'form_x', 'form_y']]
def _update_cell(self): """ Updates the ``cells`` attribute based on the cells from the dataframe. """ self.cells = list(self.data.cell.unique())