-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdata_processor.py
More file actions
98 lines (76 loc) · 3.01 KB
/
data_processor.py
File metadata and controls
98 lines (76 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import sys
import csv
class DataProcessor(object):
LEGAL_LINE_LENGTH = 7
INDEX_OF_SALARY = 5
INDEX_OF_EMPLOYEE_ID = 1
DEFAULT_FILE_HEADER = ['RecordId', 'EmployID', 'Name', 'Age', 'Year', 'Salary', 'Type']
"""
load_file method loads the file from the current folder and check if it's valid , with open will close file if any
exception happened
"""
@staticmethod
def load_file(input_file):
if os.path.isfile(input_file):
with open(input_file) as csv_file:
data = csv_file.readlines()
return data
else:
return None
"""filter each person and get the highest salary record for each person.
By default , the input records are sorted"""
def filter_info(self, data):
result_dict = {}
if not isinstance(data, list):
raise TypeError("The type of the data is not list.")
"""skip the header of the file"""
for line in data[1:]:
if not isinstance(line, str):
line = str(line)
words = line.split(",")
"""if the current line is not long enough or too long then skip the line"""
if len(words) != self.LEGAL_LINE_LENGTH:
continue
employee_id = words[self.INDEX_OF_EMPLOYEE_ID]
salary = words[self.INDEX_OF_SALARY]
if not self.is_int(employee_id):
raise ValueError("The type of the employee id is not integer.")
if employee_id not in result_dict:
result_dict[employee_id] = line
else:
old_words = result_dict.get(employee_id).split(",")
old_salary = old_words[self.INDEX_OF_SALARY]
if self.is_int(salary) and self.is_int(old_salary) and \
int(salary) > int(old_salary):
result_dict[employee_id] = line
return result_dict
"""Export the filtered result to a file"""
@staticmethod
def export_result(result_dict, output_file,
fieldnames=DEFAULT_FILE_HEADER):
if os.path.isfile(output_file):
with open(output_file, 'w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
if not isinstance(result_dict, dict):
raise DictTypeException("The type of the result_dict is not dictionary.")
for key in result_dict:
csv_file.write(result_dict.get(key))
else:
raise TypeError("The output file is valid.")
@staticmethod
def is_int(value):
try:
int(value)
return True
except ValueError:
return False
class DictTypeException(Exception):
pass
def main(argv):
data_p = DataProcessor()
data = data_p.load_file("./testInput.csv")
data_p.export_result(data_p.filter_info(data), "./filteredOutput.csv")
if __name__ == "__main__":
main(sys.argv[1:])