-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdq_improvements.py
More file actions
252 lines (216 loc) · 12.7 KB
/
dq_improvements.py
File metadata and controls
252 lines (216 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import pandas as pd
import re
from pandas.api.types import is_numeric_dtype
import numpy as np
def impute_missing_values(df, dtypes, imputation_methods):
""""Impute missing values function. User can adjst the imputation method per column"""
df_previous = df.copy()
imputation_methods = imputation_methods[0] #get the dict out of the list
for column in list(dtypes.keys()):
try:
if column not in df.columns or df[column].empty: #then it has been deleted before or the whole column is empty and we cant impute
continue
if dtypes[column] == 'categorical' or dtypes[column] == 'boolean':
if imputation_methods[column] == 'most frequent':
df[column].fillna(df[column].mode()[0], inplace=True)
elif imputation_methods[column] == 'do not impute':
print('No imputation performed for column {}'.format(column))
elif imputation_methods[column] in ['mean', 'median']:
if is_numeric_dtype(df[column]): #can happen that sortinghat identifies a numerical column as catgorical due to the
# low nr of distinct values, we should then notify the user about his imputation choice
if imputation_methods[column] == 'mean':
df[column].fillna(df[column].mean(), inplace=True)
print('Column {} imputed using mean strategy, but "most frequent" is recommended as it is a most probably'
' a categorical column with numeric values.'.format(column))
elif imputation_methods[column] == 'median':
df[column].fillna(df[column].median(), inplace=True)
print('Column {} imputed using median strategy, but "most frequent" is recommended as it is a most probably'
' a categorical column with numeric values.'.format(column))
else:
print("Invalid imputation strategy for column {}: use 'most frequent' instead. "
"The missing values in this column will not be imputed.".format(column))
elif dtypes[column] == 'floating' or dtypes[column] == 'integer' or dtypes[column] == 'numeric':
if imputation_methods[column] == 'most frequent':
df[column].fillna(df[column].mode()[0], inplace=True)
elif imputation_methods[column] == 'mean':
df[column].fillna(df[column].mean(), inplace=True)
elif imputation_methods[column] == 'median':
df[column].fillna(df[column].median(), inplace=True)
elif imputation_methods[column] == 'do not impute':
print('No imputation performed for column {}'.format(column))
else: #other data types than numeric or categorical
if imputation_methods[column] == 'most frequent':
df[column].fillna(df[column].mode()[0], inplace=True)
elif imputation_methods[column] == 'mean':
df[column].fillna(df[column].mean(), inplace=True)
elif imputation_methods[column] == 'median':
df[column].fillna(df[column].median(), inplace=True)
elif imputation_methods[column] == 'do not impute':
print('No imputation performed for column {}'.format(column))
except Exception as e:
print('Error occurred while imputing column {}, column was not imputed. Error message: {}'.format(column, str(e)))
changed_data = []
diff_mask = (df != df_previous)
diff_df = df[diff_mask]
diff_series = diff_df.stack(dropna=True)
#iterate over series
for idx, new_val in diff_series.iteritems():
row_idx, col_name = idx
row_idx_new = df.loc[row_idx, 'Original Index']
previous = df_previous.loc[row_idx, col_name]
change = {
"coordinates": (row_idx_new, col_name),
"new": new_val}
# "previous": previous}
changed_data.append(change)
return changed_data
def fix_string_mismatch(df, df_string_mismatch, value_to_replace):
""""string mismatch corrections. Change all variants to base form"""
df_previous = df
result = {}
#obtain base forms and the variants of the base form
for key in df_string_mismatch['Base form'].keys():
base_form = df_string_mismatch['Base form'][key]
value = df_string_mismatch['Value'][key]
if base_form not in result:
result[base_form] = []
result[base_form].append(value)
for key, values in result.items():
if key == value_to_replace:
for value in values:
df = df.replace(value, key)
changed_data = []
diff_mask = (df != df_previous)
diff_df = df[diff_mask]
diff_series = diff_df.stack(dropna=True)
#find changed
for idx, new_val in diff_series.iteritems():
row_idx, col_name = idx
row_idx_new = df.loc[row_idx, 'Original Index']
previous = df_previous.loc[row_idx, col_name]
change = {
"coordinates": (row_idx_new, col_name),
"new": new_val}
#"previous": previous}
changed_data.append(change)
return changed_data
def fix_special_characters(df, df_special_characters, replacement_value):
""""string mismatch corrections. Change all special character only samples to a user-specifiable value"""
df_previous = df
#obtain unique string values with only special charcters
#get all special characters
special_char_list = list(df_special_characters['Most Common Special-Only Samples'].values())
#split based on comma and obtain unique samples
samples_split = [item.strip().strip("'") for sublist in special_char_list for item in sublist.split(',')]
unique_values = list(set(samples_split))
# unique_values = set(value.strip("' ") for s in df_special_characters['Most Common Special-Only Samples'] for value in s.split(','))
df = df.replace(unique_values, replacement_value)
changed_data = []
diff_mask = (df != df_previous)
diff_df = df[diff_mask]
diff_series = diff_df.stack(dropna=True)
#find changes
for idx, new_val in diff_series.iteritems():
row_idx, col_name = idx
row_idx_new = df.loc[row_idx, 'Original Index']
previous = df_previous.loc[row_idx, col_name]
change = {
"coordinates": (row_idx_new, col_name),
"new": new_val}
# "previous": previous}
changed_data.append(change)
return changed_data
def obtain_indices_with_issues(df, check_results, settings_dict):
""""for displaying the relevant (problematic) rows in the datatable, we must retreive their indices"""
dq_issue_indices = {}
dq_issue_indices['redundant_columns'] = []
for check_res in check_results:
if check_res == 'df_missing_values':
missing_value_mask = df.isna()
missing_value_indices = missing_value_mask[missing_value_mask.any(axis=1)].index.tolist()
if len(missing_value_indices) > 0:
dq_issue_indices['missing_values'] = missing_value_indices
else:
dq_issue_indices['missing_values'] = []
elif check_res == 'df_duplicates':
if 'Check notification' in list(check_results[check_res].columns):
dq_issue_indices['duplicate_instances'] = []
else:
df_duplicates = check_results['df_duplicates']
#split string and get all but the first index (as we do not want to delete that one)
df_duplicates['indexes'] = df_duplicates['indexes'].str.split(',')
# dq_issue_indices['duplicate_instances'] = df_duplicates['indexes'].apply(lambda x: x[1:])
# #convert to int
# dq_issue_indices['duplicate_instances'] = dq_issue_indices['duplicate_instances'].apply(lambda x: [int(i) for i in x])
dq_issue_indices['duplicate_instances'] = df_duplicates['indexes'].apply(lambda x: x[1:]).tolist()
dq_issue_indices['duplicate_instances'] = [int(item) for sublist in
dq_issue_indices['duplicate_instances'] for item in sublist]
elif check_res == 'df_duplicate_columns':
if not 'Check notification' in list(check_results[check_res].columns):
if len(check_results[check_res]['Duplicate column'].values) > 0:
dq_issue_indices['redundant_columns'].append(check_results[check_res]['Duplicate column'].values)
elif check_res == 'df_amount_of_diff_values':
df_single_value = check_results['df_amount_of_diff_values']
columns = df_single_value.columns[df_single_value.iloc[0] == 1].tolist()
if len(columns) > 0:
dq_issue_indices['redundant_columns'].append(columns)
elif check_res == 'df_mixed_data_types':
mixed_data_types_df = check_results['df_mixed_data_types']
first_row_numeric = pd.to_numeric(mixed_data_types_df.iloc[0], errors='coerce') #convert strings to numeric
mask = (first_row_numeric > 0) & (first_row_numeric < 1) #check if columns contain mixed datatypes (less than 100% of one data type)
mixed_columns = mixed_data_types_df.columns[mask]
if not mixed_columns.empty:
dq_issue_indices['redundant_columns'].append(mixed_columns.tolist())
elif check_res == 'df_special_characters':
if not 'Check notification' in list(check_results[check_res].columns):
indices = []
columns_with_specials = check_results[check_res]['Column'].values
special_samples = set(value.strip("' ") for s in check_results[check_res]['Most Common Special-Only Samples'] for value in
s.split(','))
for col in columns_with_specials:
mask = df[col].isin(special_samples)
col_indices = df.index[mask].tolist()
indices.append(col_indices)
flattened_list = [item for sublist in indices for item in sublist]
dq_issue_indices['special_characters'] = flattened_list
else:
dq_issue_indices['special_characters'] = []
elif check_res == 'df_string_mismatch': #MINOR ISSUE
if not 'Check notification' in list(check_results[check_res].columns):
df_string_mismatch = check_results[check_res]
values = df_string_mismatch['Value'].tolist()
mask = df.isin(values)
mask_series = mask.any(axis=1)
indices = df[mask_series].index.tolist()
dq_issue_indices['string_mismatch'] = indices
else:
dq_issue_indices['string_mismatch'] = []
elif check_res == 'df_outliers': #MINOR ISSUE
if 'Check notification' in list(check_results[check_res].columns):
dq_issue_indices['outlier_instances'] = []
else:
indices = check_results[check_res]['Row number'].values
indices_string = list(indices)
dq_issue_indices['outlier_instances'] = [int(i) for i in indices_string]
elif check_res == 'df_feature_feature_correlation': #MINOR ISSUE
feature_feature_correlation_df = check_results['df_feature_feature_correlation'].copy()
if 'Column' in feature_feature_correlation_df.columns:
feature_feature_correlation_df.drop(columns=['Column'], inplace=True)
upper = feature_feature_correlation_df.where(
np.triu(np.ones(feature_feature_correlation_df.shape), k=1).astype(bool))
upper = upper.apply(pd.to_numeric, errors='coerce')
high_corr_cols = set()
#check which columns have higher correlation than specified by user
for col in upper.columns:
for row in upper.index:
if abs(upper.loc[row, col]) > settings_dict['advanced_settings_correlation']:
high_corr_cols.add(row)
high_corr_cols.add(col)
elif check_res == 'df_conflicting_labels': #MODERATE ISSUE
if 'Check notification' in list(check_results[check_res].columns):
dq_issue_indices['conflicting_labels'] = []
else:
indices = check_results[check_res]['Instances'].str.split(',').apply(lambda x: list(map(int, x))).sum()
dq_issue_indices['conflicting_labels'] = indices
dq_issue_indices['redundant_columns'] = list(set([item for sublist in dq_issue_indices['redundant_columns'] for item in list(sublist)]))
return dq_issue_indices