-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsql_utils.py
More file actions
executable file
·168 lines (137 loc) · 6.06 KB
/
sql_utils.py
File metadata and controls
executable file
·168 lines (137 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
##############################################################################################
# Project : sqlite3 wrapper
# File : sql_utils.py
# Author : Remi Malaquin
# Date : 01/17/2018
# Description : Utils functions for SQL access
##############################################################################################
import logging
import unicodedata
import inspect
from sql_exception import *
class Logger:
def __init__(self, name, severity=logging.DEBUG):
# create logger
self.logger = logging.getLogger(name)
self.logger.setLevel(severity)
# create console handler and set level to debug
self.logger.handlers.clear()
self.ch = self.define_handler()
self.set_severity_level(self.ch)
# create formatter
self.functionName = ""
self.formatter = logging.Formatter('%(asctime)s [%(name)s.' + self.functionName
+ '] %(levelname)s - %(message)s')
# add formatter to ch
self.ch.setFormatter(self.formatter)
# add ch to logger
self.logger.addHandler(self.ch)
def define_handler(self):
return logging.StreamHandler()
def set_severity_level(self, handler, severityLevel=logging.DEBUG):
handler.setLevel(severityLevel)
def change_formatter(self, functionName=""):
if self.functionName != functionName:
self.formatter = logging.Formatter('%(asctime)s [%(name)s.' + functionName
+ '] %(levelname)s - %(message)s')
self.ch.setFormatter(self.formatter)
def debug(self, functionName, message: str):
self.change_formatter(functionName)
self.logger.debug(message)
def info(self, functionName, message: str):
self.change_formatter(functionName)
self.logger.info(message)
def warning(self, functionName, message: str):
self.change_formatter(functionName)
self.logger.warning(message)
def error(self, functionName, message: str):
self.change_formatter(functionName)
self.logger.error(message)
def critical(self, functionName, message: str):
self.change_formatter(functionName)
self.logger.critical(message)
def sql_type(typename, value):
"""
To be completed (Only INTEGER and TEXT are defined)
:param typename:
:param value:
:return: type
"""
log = Logger('SQL_type')
if 'INTEGER'.lower() in typename.lower():
return int(value)
elif 'FLOAT'.lower() in typename.lower():
return float(value)
elif 'TEXT'.lower() in typename.lower():
return str(value)
elif 'TIMESTAMP'.lower() in typename.lower():
return str(value)
elif 'TIME'.lower() in typename.lower():
return int(value)
else:
log.critical(functionName=str(inspect.stack()[-5][3]), message="Type Unknown")
raise TypeError(str(typename) + " is an Unknown Type: Check sql_type function in SQL/sql_utils.py")
def check_param_char(ref_param, test_param, test='111'):
log = Logger(name='Check_param_char')
#####################################################################################################
# check the length of parameter
if test[0] == '1':
if len(ref_param) != len(test_param):
log.error(functionName='check_param_length',
message="Length of parameter defined at the creation of the table "
"differ from the one used to insert data in the table")
# raise Exception
raise SqlLengthParameterError("ref_param = " + str(len(ref_param)) + " | test_param = " + str(len(test_param)))
else:
pass
#####################################################################################################
# Check parameter name
if test[1] == '1':
for key in test_param:
if key not in ref_param.keys():
log.error(functionName='check_param_name',
message="The following parameter does not exist in the reference: " + str(key))
# raise Exception
raise SqlNameParameterError("This parameter does not exist in the SQL table " + str(key))
else:
pass
#####################################################################################################
# Check the type of each parameter
if test[2] == '1':
for key, val in test_param.items():
try:
test_param[key] = sql_type(ref_param[key], val)
except:
# raise Exception
raise SqlTypeParameterError("parameterName = " + str(key) +
" | exp = " + str(ref_param[key]) +
" | get : " + str(type(val).__name__))
def check_for_double_items(param, table, query_info, auth):
double_list = []
indice = []
# Get every column number of the filter key to check on the table
for paramInfo in query_info:
if paramInfo[1] in param.keys():
indice.append(paramInfo[0])
# Check and return True if double occurs
for t in table:
for idx, idx2 in zip(indice, range(len(list(param.values())))):
if t[idx].lower() == list(param.values())[idx2].lower():
double_list.append(True)
else:
double_list.append(False)
# Double didn't occurs so we can clean the list because we are looking for double
if auth:
if len(set(double_list)) > 1:
double_list = []
else:
if True in double_list:
double_list = [True]
if not double_list: # Empty list
double = False
else:
double = list(set(double_list))[0]
if table and double:
raise SqlDoubleItemsOccurs("Item already exist in the database")
def translate_no_accent_nocase_sensitive(string_exemple):
return str(unicodedata.normalize('NFKD', str(string_exemple)).encode('ASCII', 'ignore'), 'utf-8').lower()