Source code for src.aequitas.group

```import logging

import pandas as pd

logging.getLogger(__name__)

__author__ = "Rayid Ghani, Pedro Saleiro <saleiro@uchicago.edu>, Benedict Kuester, Loren Hinkson"

[docs]class Group(object):
"""
"""
def __init__(self):
"""
"""
self.label_neg_count = lambda label_col: lambda x: \
(x[label_col] == 0).sum()
self.label_pos_count = lambda label_col: lambda x: \
(x[label_col] == 1).sum()
self.group_functions = self._get_group_functions()

@staticmethod
def _get_group_functions():
"""
Helper function to accumulate lambda functions used in bias metrics
calculations.
"""

divide = lambda x, y: x / y if y != 0 else pd.np.nan

predicted_pos_count = lambda rank_col, label_col, thres, k: lambda x: \
(x[rank_col] <= thres).sum()

predicted_neg_count = lambda rank_col, label_col, thres, k: lambda x: \
(x[rank_col] > thres).sum()

predicted_pos_ratio_k = lambda rank_col, label_col, thres, k: lambda x: \
divide((x[rank_col] <= thres).sum(), k + 0.0)

predicted_pos_ratio_g = lambda rank_col, label_col, thres, k: lambda x: \
divide((x[rank_col] <= thres).sum(), len(x) + 0.0)

false_neg_count = lambda rank_col, label_col, thres, k: lambda x: \
((x[rank_col] > thres) & (x[label_col] == 1)).sum()

false_pos_count = lambda rank_col, label_col, thres, k: lambda x: \
((x[rank_col] <= thres) & (x[label_col] == 0)).sum()

true_neg_count = lambda rank_col, label_col, thres, k: lambda x: \
((x[rank_col] > thres) & (x[label_col] == 0)).sum()

true_pos_count = lambda rank_col, label_col, thres, k: lambda x: \
((x[rank_col] <= thres) & (x[label_col] == 1)).sum()

fpr = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] <= thres) & (x[label_col] == 0)).sum(),
(x[label_col] == 0).sum().astype(
float))

tnr = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] > thres) & (x[label_col] == 0)).sum(), (x[label_col] ==
0).sum().astype(
float))

fnr = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] > thres) & (x[label_col] == 1)).sum(),
(x[label_col] == 1).sum().astype(
float))

tpr = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] <= thres) & (x[label_col] == 1)).sum(), (x[label_col] ==
1).sum().astype(
float))

fomr = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] > thres) & (x[label_col] == 1)).sum(), (x[rank_col] >
thres).sum(
).astype(float))

npv = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] > thres) & (x[label_col] == 0)).sum(),
(x[rank_col] > thres).sum().astype(
float))

precision = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] <= thres) & (x[label_col] == 1)).sum(), (x[rank_col] <=
thres).sum(
).astype(float))

fdr = lambda rank_col, label_col, thres, k: lambda x: \
divide(((x[rank_col] <= thres) & (x[label_col] == 0)).sum(), (x[rank_col] <=
thres).sum(
).astype(float))

group_functions = {'tpr': tpr,
'tnr': tnr,
'for': fomr,
'fdr': fdr,
'fpr': fpr,
'fnr': fnr,
'npv': npv,
'precision': precision,
'pp': predicted_pos_count,
'pn': predicted_neg_count,
'ppr': predicted_pos_ratio_k,
'pprev': predicted_pos_ratio_g,
'fp': false_pos_count,
'fn': false_neg_count,
'tn': true_neg_count,
'tp': true_pos_count}

return group_functions

[docs]    def get_crosstabs(self, df, score_thresholds=None, model_id=1, attr_cols=None):
"""
Creates univariate groups and calculates group metrics.

:param df: a dataframe containing the following required columns [score,  label_value].
:param score_thresholds: dictionary { 'rank_abs':[] , 'rank_pct':[], 'score':[] }
:param model_id: the model ID on which to subset the df.
:param attr_cols: optional, list of names of columns corresponding to
group attributes (i.e., gender, age category, race, etc.).

:return: A dataframe of group score, label, and error statistics and absolute bias metric values grouped by unique attribute values
"""
if not attr_cols:
non_attr_cols = ['id', 'model_id', 'entity_id', 'score', 'label_value', 'rank_abs', 'rank_pct']
attr_cols = df.columns[~df.columns.isin(non_attr_cols)]  # index of the columns that are
# check if all attr_cols exist in df
check = [col in df.columns for col in attr_cols]
if False in check:
raise Exception('get_crosstabs: not all attribute columns provided exist in input dataframe!')
# check if all columns are strings:
non_string_cols = df.columns[(df.dtypes != object) & (df.dtypes != str) & (df.columns.isin(attr_cols))]
if non_string_cols.empty is False:
raise Exception('get_crosstabs: input df was not preprocessed. There are non-string cols within attr_cols!')

# if no score_thresholds are provided, we assume that rank_abs=number of 1s in the score column
count_ones = None  # it also serves as flag to set parameter to 'binary'

if not score_thresholds:
df['score'] = df['score'].astype(float)
count_ones = df['score'].value_counts().get(1.0, 0)
score_thresholds = {'rank_abs': [count_ones]}

print('model_id, score_thresholds', model_id, score_thresholds)
df = df.sort_values('score', ascending=False)
df['rank_abs'] = range(1, len(df) + 1)
df['rank_pct'] = df['rank_abs'] / len(df)
dfs = []
prior_dfs = []
# calculate the bias for these columns
# not default(non_attr_cols), therefore represent the group variables!
logging.info('getcrosstabs: attribute columns to perform crosstabs:' + ','.join(attr_cols))
# for each group variable do
for col in attr_cols:
# find the priors_df
col_group = df.fillna({col: 'pd.np.nan'}).groupby(col)
counts = col_group.size()
# distinct entities within group value
this_prior_df = pd.DataFrame({
'model_id': [model_id] * len(counts),
'attribute_name': [col] * len(counts),
'attribute_value': counts.index.values,
'group_label_pos': col_group.apply(self.label_pos_count(
'label_value')).values,
'group_label_neg': col_group.apply(self.label_neg_count(
'label_value')).values,
'group_size': counts.values,
'total_entities': [len(df)] * len(counts)
})
this_prior_df['prev'] = this_prior_df['group_label_pos'] / this_prior_df['group_size']
# for each model_id and as_of_date the priors_df has length
# attribute_names * attribute_values
prior_dfs.append(this_prior_df)

# we calculate the bias for two different types of score_thresholds
# units (percentage ranks and absolute ranks)
# YAML ex: thresholds:
#              rank_abs: [300]
#              rank_pct: [1.0, 5.0, 10.0]
for thres_unit, thres_values in score_thresholds.items():

for thres_val in thres_values:
flag = 0

# To discuss with Pedro: believe this might be the reason
# for cutoff error - if numbers are cumulative, per
# line 149 and line 150, why taking sum for k vs. max?
k = (df[thres_unit] <= thres_val).sum()

# denote threshold as binary if numeric count_ones value
# donate as [rank value]_abs or [rank_value]_pct otherwise
score_threshold = 'binary 0/1' if count_ones != None else str(thres_val) + '_' + thres_unit[-3:]
for name, func in self.group_functions.items():
func = func(thres_unit, 'label_value', thres_val, k)
feat_bias = col_group.apply(func)
metrics_df = pd.DataFrame({
'model_id': [model_id] * len(feat_bias),
'score_threshold': [score_threshold] * len(feat_bias),
'k': [k] * len(feat_bias),
'attribute_name': [col] * len(feat_bias),
'attribute_value': feat_bias.index.values,
name: feat_bias.values
})
if flag == 0:
this_group_df = metrics_df
flag = 1
else:
this_group_df = this_group_df.merge(metrics_df)
dfs.append(this_group_df)
groups_df = pd.concat(dfs, ignore_index=True)
priors_df = pd.concat(prior_dfs, ignore_index=True)
groups_df = groups_df.merge(priors_df, on=['model_id', 'attribute_name',
'attribute_value'])
return groups_df, attr_cols

[docs]    def list_absolute_metrics(self, df):
"""
View list of all calculated absolute bias metrics in df
"""
return df.columns.intersection(['fpr', 'fnr', 'tpr', 'tnr', 'for',
'fdr', 'npv', 'precision', 'ppr',
'pprev', 'prev'
]).tolist()
```