Source code for survey_enhance.impute

import pandas as pd
from typing import List, Dict
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from tqdm import tqdm
from pathlib import Path


def to_array(values) -> np.ndarray:
    if isinstance(values, (pd.Series, pd.DataFrame)):
        return values.values
    return values


def get_category_mapping(values: pd.Series) -> Dict[str, int]:
    return {category: i for i, category in enumerate(values.unique())}


[docs]class Imputation: """ An `Imputation` represents a learned function f(`input_variables`) -> `output_variables`. """ models: List["ManyToOneImputation"] """Each column of the output variables is predicted by a separate model, stored in this list.""" X_columns: List[str] """The names of the input variables.""" Y_columns: List[str] """The names of the output variables.""" random_generator: np.random.Generator = None """The random generator used to sample from the distribution of the imputation.""" X_category_mappings: List[Dict[str, int]] = None """The mapping from category names to integers for each input variable."""
[docs] def encode_categories(self, X: pd.DataFrame) -> pd.DataFrame: if self.X_category_mappings is None: self.X_category_mappings = { i: get_category_mapping(X[column]) if X[column].dtype == "object" else None for i, column in enumerate(X.columns) } X = X.copy() for i, column in enumerate(X.columns): if self.X_category_mappings.get(i) is not None: X[column] = X[column].map(self.X_category_mappings[i]) return X
[docs] def train(self, X: pd.DataFrame, Y: pd.DataFrame, num_trees: int = 100): """ Train a random forest model to predict the output variables from the input variables. Args: X (pd.DataFrame): The dataset containing the input variables. Y (pd.DataFrame): The dataset containing the output variables. """ self.X_columns = X.columns self.Y_columns = Y.columns X = self.encode_categories(X) self.models = [] # We train a separate model for each output variable. For example, if X = [income, age] and Y = [height, weight], we train two models: # 1. Predict height from income and age. # 2. Predict weight from income, age and (predicted) height. for i in tqdm(range(len(Y.columns)), desc="Training models"): Y_columns = Y.columns[:i] if i == 0: X_ = to_array(X) else: X_ = to_array(pd.concat([X, Y[Y_columns]], axis=1)) y_ = to_array(Y[Y.columns[i]]) model = ManyToOneImputation() model.encode_categories = self.encode_categories model.train(X_, y_, num_trees=num_trees) self.models.append(model)
[docs] def predict( self, X: pd.DataFrame, mean_quantile: float = 0.5, verbose: bool = False, ) -> pd.DataFrame: """ Predict the output variables for the input dataset. Args: X (pd.DataFrame): The dataset to predict on. mean_quantile (float): The beta parameter for the imputation. Returns: pd.DataFrame: The predicted dataset. """ if isinstance(X, list): X = pd.DataFrame(X, columns=self.X_columns) X = pd.DataFrame(X, columns=self.X_columns) if self.random_generator is None: self.random_generator = np.random.default_rng() X = to_array(self.encode_categories(X)) Y = np.zeros((X.shape[0], len(self.models))) for i, model in enumerate(self.models): if verbose: print(f"Imputing {self.Y_columns[i]}...") if isinstance(mean_quantile, list): quantile = mean_quantile[i] else: quantile = mean_quantile X_ = np.concatenate([X, Y[:, :i]], axis=1) model.encode_categories = self.encode_categories Y[:, i] = model.predict(X_, quantile, self.random_generator) return pd.DataFrame(Y, columns=self.Y_columns)
[docs] def save(self, path: str): """ Save the imputation model to disk. Args: path (str): The path to save the model to. """ import pickle path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "wb") as f: # Store the models only in a dictionary. data = dict( models=self.models, X_columns=self.X_columns, X_category_mappings=self.X_category_mappings, Y_columns=self.Y_columns, ) pickle.dump(data, f)
[docs] @staticmethod def load(path: str) -> "Imputation": """ Load the imputation model from disk. Args: path (str): The path to load the model from. Returns: Imputation: The imputation model. """ import pickle imputation = Imputation() with open(path, "rb") as f: data = pickle.load(f) imputation.models = data["models"] imputation.X_columns = data["X_columns"] imputation.X_category_mappings = data["X_category_mappings"] imputation.Y_columns = data["Y_columns"] for model in imputation.models: model.encode_categories = imputation.encode_categories model.X_category_mappings = imputation.X_category_mappings return imputation
[docs] def solve_for_mean_quantiles( self, targets: list, input_data: pd.DataFrame, weights: pd.Series ): mean_quantiles = [] input_data = input_data.copy() for i, model in enumerate(self.models): mean_quantiles.append( model.solve_for_mean_quantile( target=targets[i], input_df=input_data, weights=weights, verbose=True, ) ) predicted_column = model.predict(input_data, mean_quantiles[-1]) input_data[self.Y_columns[i]] = predicted_column return mean_quantiles
[docs]class ManyToOneImputation: """ An `Imputation` consists of a set of `ManyToOneImputation` models, one for each output variable. """ model: RandomForestRegressor """The random forest model."""
[docs] def train( self, X: pd.DataFrame, y: pd.Series, sample_weight: pd.Series = None, num_trees: int = 100, ): """ Train a random forest model to predict the output variable from the input variables. Args: X (pd.DataFrame): The dataset containing the input variables. y (pd.Series): The dataset containing the output variable. sample_weight (pd.Series): The sample weights. """ X = to_array(X) y = to_array(y) self.model = RandomForestRegressor( n_estimators=num_trees, bootstrap=True, max_samples=0.01 ) self.model.fit(X, y, sample_weight=sample_weight)
[docs] def predict( self, X: pd.DataFrame, mean_quantile: float = 0.5, random_generator: np.random.Generator = None, ) -> pd.DataFrame: """ Predict the output variable for the input dataset. Args: X (pd.DataFrame): The dataset to predict on. mean_quantile (float): The mean quantile under the Beta distribution. random_generator (np.random.Generator): The random generator. Returns: pd.Series: The predicted distribution of values for each input row. """ if isinstance(X, pd.DataFrame) and any( [X[column].dtype == "O" for column in X.columns] ): X = self.encode_categories(X) X = to_array(X) tree_predictions = [tree.predict(X) for tree in self.model.estimators_] # Get the percentiles of the predictions. tree_predictions = np.array(tree_predictions).transpose() if mean_quantile is None: mean_quantile = 0.5 a = mean_quantile / (1 - mean_quantile) if random_generator is None: random_generator = np.random.default_rng() input_quantiles = random_generator.beta( a, 1, size=tree_predictions.shape[0] ) x = np.apply_along_axis( lambda x: np.percentile(x[1:], x[0]), 1, np.concatenate( [ np.array(input_quantiles)[:, np.newaxis] * 100, tree_predictions, ], axis=1, ), ) return x
[docs] def solve_for_mean_quantile( self, target: float, input_df: pd.DataFrame, weights: np.ndarray, max_iterations: int = 10, verbose: bool = False, ): """ Solve for the mean quantile that produces the target value. Args: target (float): The target value. input_df (pd.DataFrame): The input dataset. weights (np.ndarray): The sample weights. max_iterations (int, optional): The maximum number of iterations. Defaults to 5. verbose (bool, optional): Whether to print the loss at each iteration. Defaults to False. Returns: float: The mean quantile. """ def loss(mean_quantile): pred_values = self.predict(input_df, mean_quantile) pred_aggregate = (pred_values * weights).sum() print( f"PREDICTED: {pred_aggregate/1e9:.1f} (target: {target/1e9:.1f})" ) return (pred_aggregate - target) ** 2, pred_aggregate best_loss = float("inf") min_quantile = 0 max_quantile = 1 # Binary search for the mean quantile. for i in range(max_iterations): mean_quantile = (min_quantile + max_quantile) / 2 loss_value, pred_agg = loss(mean_quantile) if verbose: print( f"Iteration {i}: {mean_quantile:.4f} (loss: {loss_value:.4f})" ) if loss_value < best_loss: best_loss = loss_value if pred_agg < target: min_quantile = mean_quantile else: max_quantile = mean_quantile return mean_quantile