Source code for namematch.generate_data_rows

import logging
import os

from collections import defaultdict
import multiprocessing as mp
from math import ceil

import numpy as np
import pandas as pd
from namematch.name_probability import nm_prob

import pyarrow.parquet as pq
import pyarrow as pa

from namematch.utils.utils import *
from namematch.comparison_functions import *
from namematch.data_structures.schema import Schema
from namematch.data_structures.parameters import Parameters
from namematch.base import NamematchBase
from namematch.utils.profiler import Profiler

profile = Profiler()
logger = logging.getLogger()


[docs]class GenerateDataRows(NamematchBase): def __init__( self, params, schema, output_dir, all_names_file, candidate_pairs_file, *args, **kwargs ): super(GenerateDataRows, self).__init__(params, schema, *args, **kwargs) self.all_names_file = all_names_file self.candidate_pairs_file = candidate_pairs_file self.output_dir = output_dir @property def output_files(self): return [os.path.join(self.output_dir, f'data_rows_{i}.parquet') for i in range(self.params.num_workers)] # @log_runtime_and_memory
[docs] def main(self, **kw): '''Take candidate pairs and merge on the all-names records (twice) to get a dataset at the record pair level. Compute distance metrics between the records in the pair -- these are the features for modeling. Args: params (Parameters object): contains parameter values schema (Schema object): contains match schema info (files to match, variables to use, etc.) all_names_file (str): path to the all-names file candidate_pairs_file (str): path to the candidate-pairs file output_dir (str): path to the data-rows dir ''' if not os.path.exists(self.output_dir): os.mkdir(self.output_dir) an_columns = ['blockstring'] + self.schema.variables.get_an_column_names() table = pq.read_table(self.all_names_file) an = table.to_pandas()[an_columns] an = an[an.drop_from_nm == 0] an = an.drop_duplicates(['record_id']) logger.info('Building name probability object.') name_probs = self.generate_name_probabilities_object( an, self.params.first_name_column, self.params.last_name_column) logger.info('Generating data rows...') # remove previous data rows files if they exist for dr_file in os.listdir(self.output_dir): os.remove(os.path.join(self.output_dir, dr_file)) table = pq.read_table(self.candidate_pairs_file) cp_df = table.to_pandas()[['blockstring_1', 'blockstring_2', 'covered_pair']] # get data rows end_points = get_endpoints(len(cp_df), self.params.num_workers) if self.params.parallelize: jobs = [mp.Process( target=self.generate_data_row_files, args=(self.params, self.schema, an, cp_df.iloc[end_point[0]:end_point[1]], name_probs, end_point[0], end_point[1], os.path.join(self.output_dir, f'data_rows_{i}.parquet')) ) for i, end_point in enumerate(end_points)] for job in jobs: job.start() for job in jobs: t = job.join() failure_occurred = sum([job.exitcode != 0 for job in jobs]) if failure_occurred: logger.error(f"Error occurred in {failure_occurred} worker(s).") raise Exception(f"Error occurred in {failure_occurred} worker(s).") else: for i, end_point in enumerate(end_points): self.generate_data_row_files( self.params, self.schema, an, cp_df.iloc[end_point[0]:end_point[1]], name_probs, end_point[0], end_point[1], os.path.join(self.output_dir, f'data_rows_{i}.parquet')) if self.enable_lprof: self.write_line_profile_stats(profile.line_profiler)
# @log_runtime_and_memory
[docs] def generate_name_probabilities_object(self, an, fn_col=None, ln_col=None, **kw): '''The generate_name_probabilites function uses a list of names (from all_names file) and creates an object containing queryable probability information for each name. Args: an (pd.DataFrame): all-names, just the name columns fn_col (str): name of first name column ln_col (str): name of last name column Return: name probability object ''' if (fn_col is None) or (ln_col is None): return None an['fn'] = an[fn_col].str.replace(' ', '') an['ln'] = an[ln_col].str.replace(' ', '') an['name_prob_str'] = '*' + an['fn'] + ' ' + an['ln'] + '*' np_object = nm_prob.NameProbability(name_list=an.name_prob_str.tolist()) np_object.n_name_appearances_dict = an.groupby('name_prob_str').size().rank(pct=True, method='min').round(2).to_dict() np_object.n_firstname_appearances_dict = an.groupby('fn').size().rank(pct=True, method='min').round(2).to_dict() np_object.n_lastname_appearances_dict = an.groupby('ln').size().rank(pct=True, method='min').round(2).to_dict() return np_object
# @log_runtime_and_memory
[docs] def find_valid_training_records(self, an, an_match_criteria, **kw): an['meets_match_criteria'] = 1 for col, accepted_values in an_match_criteria.items(): if not isinstance(accepted_values, list): accepted_values = [accepted_values] accepted_values = [str(av) for av in accepted_values] an['meets_match_criteria'] = (an['meets_match_criteria']) & (an[col].isin(accepted_values)) return an['meets_match_criteria']
[docs] @profile def generate_actual_data_rows(self, params, schema, sbs_df, np_object, first_iter): '''Create modeling dataframe by comparing each variable (via numerous distance metrics). Args: params (Parameters object): contains matching parameters schema (Schema object): contains matching schema (data files and variables) sbs_df (pd.DatFrame): side-by-side table (record pair level, with info from both an records) ============================== ======================================================= record_id (_1, _2) unique record identifier blockstring (_1, _2) concatenated version of blocking columns (sep by ::) file_type (_1, _2) either "new" or "existing" candidate_pair_ix index from candidate-pairs list covered_pair flag, 1 if blockstring pair passed blocking 0 otherwise <fields for matching> (_1, _2) both for the matching model and for constraint checking ============================== ======================================================= np_object (nm_prob.NameProbability object): contains information about name probabilities Returns: pd.DataFrame: chunk of the data-rows file ===================== ======================================================= dr_id unique record pair identifier (record_id_1__record_id_2) record_id (_1, _2) unique record identifiers <distance metrics> how similar are the different matching fields between recrods label "1" if the records refer to the same person, "0" if not, "" otherwise ===================== ======================================================= ''' sbs_df = sbs_df.copy() uid_cols = schema.variables.get_variables_where(attr='compare_type', attr_value='UniqueID') # don't compare a record to itself sbs_df = sbs_df[sbs_df.record_id_1 != sbs_df.record_id_2] # if blockstrings are same, don't want to compare A to B then B to A sbs_df = sbs_df[ (sbs_df.blockstring_1 != sbs_df.blockstring_2) | (sbs_df.record_id_1 < sbs_df.record_id_2)] if params.incremental: # dont compare if both are from existing (this would work outside of # if statement, but would always be true -- do it here to save time) sbs_df = sbs_df[ (sbs_df.file_type_1 == 'new') | (sbs_df.file_type_2 == 'new')] if params.drop_mixed_label_pairs: for uid_col in uid_cols: sbs_df = sbs_df[(sbs_df[f"{uid_col}_1"] == '') == (sbs_df[f"{uid_col}_2"] == '')] if len(sbs_df) == 0: return None non_feature_cols = ['candidate_pair_ix', 'record_id_1', 'record_id_2', 'covered_pair'] data_rows_df = sbs_df[non_feature_cols].copy() data_rows_df.reset_index(drop=True, inplace=True) sbs_df.reset_index(drop=True, inplace=True) if np_object is not None: # get name probabilities (both as feature and for knowing which # name to switch if a switch is needed) sbs_df = get_name_probabilities(sbs_df, np_object, params.first_name_column, params.last_name_column) # determine if name switch need to happen sbs_df = try_switch_first_last_name(sbs_df, params.first_name_column, params.last_name_column) if any(data_rows_df.index != sbs_df.index): logger.error('Index mismatch when creating data rows.') raise ValueError # add name probability columns to data_rows_df try: #data_rows_df['prob_name1'] = sbs_df['prob_name_1'] #data_rows_df.loc[(sbs_df.switched_name == 1), 'prob_name1'] = sbs_df.prob_rev_name_1 #data_rows_df['prob_name2'] = sbs_df['prob_name_2'] #data_rows_df.loc[(sbs_df.switched_name == 2), 'prob_name2'] = sbs_df['prob_rev_name_2'] #data_rows_df['prob_same_name'] = sbs_df['prob_same_name'] #data_rows_df.loc[sbs_df.switched_name == 1, 'prob_same_name'] = sbs_df['prob_same_name_rev_1'] #data_rows_df.loc[sbs_df.switched_name == 2, 'prob_same_name'] = sbs_df['prob_same_name_rev_2'] #data_rows_df['max_prob_name'] = data_rows_df[['prob_name1', 'prob_name2']].max(axis=1) #data_rows_df['count_pctl_name_1'] = sbs_df['count_pctl_name_1'] #data_rows_df['count_pctl_name_2'] = sbs_df['count_pctl_name_2'] #data_rows_df['max_count_pctl_name'] = data_rows_df[['count_pctl_name_1', 'count_pctl_name_2']].max(axis=1) data_rows_df['diff_count_pctl_name'] = (sbs_df['count_pctl_name_1'] - sbs_df['count_pctl_name_2']).abs() data_rows_df['max_count_pctl_name'] = sbs_df[['count_pctl_name_1', 'count_pctl_name_2']].max(axis=1) data_rows_df['diff_count_pctl_fn'] = (sbs_df['count_pctl_fn_1'] - sbs_df['count_pctl_fn_2']).abs() data_rows_df['max_count_pctl_fn'] = sbs_df[['count_pctl_fn_1', 'count_pctl_fn_2']].max(axis=1) data_rows_df['diff_count_pctl_ln'] = (sbs_df['count_pctl_ln_1'] - sbs_df['count_pctl_ln_2']).abs() data_rows_df['max_count_pctl_ln'] = sbs_df[['count_pctl_ln_1', 'count_pctl_ln_2']].max(axis=1) except: if first_iter: logger.info('No name probability features generated.') for variable in schema.variables.varlist: if variable.compare_type == 'String': features_df = compare_strings(sbs_df, variable.name) elif variable.compare_type == 'Numeric': features_df = compare_numbers(sbs_df, variable.name) elif variable.compare_type == 'Categorical': features_df = compare_categories(sbs_df, variable.name) elif variable.compare_type == 'Date': features_df = compare_dates(sbs_df, variable.name) elif variable.compare_type == 'Geography': features_df = compare_geographies(sbs_df, variable.name) elif variable.compare_type == 'Address': features_df_1 = compare_strings(sbs_df, 'address_street_number') features_df_2 = compare_strings(sbs_df, 'address_street_name') features_df_3 = compare_categories(sbs_df, 'address_street_type') features_df = pd.concat([features_df_1, features_df_2, features_df_3], axis=1) elif variable.compare_type == "LastName": ##features_df_1 = compare_strings(sbs_df, variable.name) ##features_df_2 = compare_last_name(sbs_df, variable.name) ##features_df = pd.concat([features_df_1, features_df_2], axis=1) # TODO add back in some sort of JR, SR check features_df = compare_strings(sbs_df, variable.name) else: features_df = None if features_df is not None: data_rows_df = pd.concat([data_rows_df, features_df], axis=1) data_rows_df.columns = ['var_' + colname if colname not in non_feature_cols else colname for colname in data_rows_df.columns] try: data_rows_df['exactmatch'] = 1 for exact_match_col in params.exact_match_variables: data_rows_df['exactmatch'] = \ ((data_rows_df.exactmatch) & (sbs_df[f'{exact_match_col}_1'] == sbs_df[f'{exact_match_col}_2']) & (sbs_df[f'{exact_match_col}_1'] != '')).astype(int) data_rows_df['var_exact_match'] = data_rows_df.exactmatch.copy() for neg_var in params.negate_exact_match_variables: data_rows_df.loc[(sbs_df[f'{neg_var}_1'] != sbs_df[f'{neg_var}_2']) & (sbs_df[f'{neg_var}_1'] != '') & (sbs_df[f'{neg_var}_2'] != ''), 'var_exact_match'] = 0 except: pass data_rows_df['label'] = generate_label( sbs_df, uid_cols, params.leven_thresh) if data_rows_df is not None and len(data_rows_df) > 0: data_rows_df['dr_id'] = data_rows_df.record_id_1 + '__' + data_rows_df.record_id_2 data_rows_df.set_index('dr_id', inplace=True) return data_rows_df
# @log_runtime_and_memory
[docs] @profile def generate_data_row_files( self, params, schema, an, cp_df, name_probs, start_ix_worker, end_ix_worker, dr_file, **kw): '''The get_data_row_files function is run in parallel to generate the data needed for the random forest; it performs the merge between candidate pairs and all-names and calls the function that calculates distance metrics. Args: params (Parameters object): contains matching parameters schema (Schema object): contains matching schema (data files and variables) an (pd.DatFrame): all-names table (one row per input record) ===================== ======================================================= record_id unique record identifier file_type either "new" or "existing" <fields for matching> both for the matching model and for constraint checking <raw name fields> pre-cleaning version of first and last name blockstring concatenated version of blocking columns (sep by ::) drop_from_nm flag, 1 if met any "to drop" criteria 0 otherwise ===================== ======================================================= cp_df (pd.DataFrame): candidate-pairs list ====================== ======================================================= blockstring_1 concatenated version of blocking columns for first element in pair (sep by ::) blockstring_2 concatenated version of blocking columns for second element in pair (sep by ::) covered_pair flag; 1 for pairs that made it through blocking, 0 otherwise; all 1s here ====================== ======================================================= name_probs (nm_prob.NameProbability object): contains information about name probabilities start_ix_worker (int): starting index of the candidate-pairs chunk to read in this thread end_ix_worker (int): end index of the candidate-pairs chunk to read in this thread dr_file (str): path to data-rows file to write (one for each worker thread) ''' try: cp_df = cp_df.copy() # sqlite replacement an_bs = an[['blockstring']].copy() an_bs['i'] = np.arange(len(an_bs)) an_ix_map = defaultdict(lambda: []) for tup in an_bs.itertuples(): an_ix_map[tup.blockstring].append(tup.i) # for each candidate pair, get the records associated with each name and compute # distances between features like date, first_name, dob, etc. cp_df['candidate_pair_ix'] = cp_df.index self.num_batches = ceil(len(cp_df) / self.params.data_rows_batch_size) start_ix_cp = 0 while start_ix_cp < len(cp_df): if (start_ix_worker == 0) and (params.verbose is not None) and (start_ix_cp % params.verbose == 0): logger.info(f" Generated features for {start_ix_cp * params.num_workers} " f"of ~{len(cp_df) * params.num_workers} pairs of blockstrings.") end_ix_cp = min(start_ix_cp + self.params.data_rows_batch_size, len(cp_df)) relevant_cp = cp_df.iloc[start_ix_cp : end_ix_cp].copy() relevant_blockstrings = pd.concat([relevant_cp.blockstring_1, relevant_cp.blockstring_2]).unique().tolist() relevant_bs_ix = [an_ix_map[bs] for bs in relevant_blockstrings] relevant_bs_ix = [item for sublist in relevant_bs_ix for item in sublist] # get relevant records for this batch relevant_records = an.iloc[relevant_bs_ix].copy() relevant_records.set_index('blockstring', inplace=True) # expand the candidate pairs from a blockstring level to a record level side_by_side_df = pd.merge( relevant_cp[['blockstring_1', 'blockstring_2', 'candidate_pair_ix', 'covered_pair']], relevant_records.copy(), left_on='blockstring_1', right_index=True).reset_index(drop=True) side_by_side_df = pd.merge( side_by_side_df, relevant_records.copy(), left_on='blockstring_2', right_index=True, suffixes=['_1', '_2']) data_rows_df = self.generate_actual_data_rows( params, schema, side_by_side_df, name_probs, first_iter=(start_ix_cp == 0)) if data_rows_df is None: start_ix_cp += self.params.data_rows_batch_size continue # outcome for selection model data_rows_df['labeled_data'] = (data_rows_df.label != '').astype(int) table = pa.Table.from_pandas(data_rows_df) if start_ix_cp == 0: pqwriter = pq.ParquetWriter(dr_file, table.schema) pqwriter.write_table(table) start_ix_cp += self.params.data_rows_batch_size if pqwriter: pqwriter.close() except Exception as e: os.remove(dr_file) logger.error(f"{dr_file} failed.") raise e