-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdemeter_run_utils.py
More file actions
61 lines (50 loc) · 1.86 KB
/
demeter_run_utils.py
File metadata and controls
61 lines (50 loc) · 1.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from config import *
import psycopg2.extras
from psycopg2.extras import Json
from functools import wraps
import time
def retry(fn=None):
@wraps(fn)
def wrapper(*args, **kw):
cls = args[0]
while True:
try:
return fn(*args, **kw)
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
print(e)
print ("\nDatabase Connection [InterfaceError or OperationalError]")
print ("Idle for %s seconds" % (2))
time.sleep(2)
cls._connect()
return wrapper
class demeter_run_utils:
def __init__(self,connname="PoolTool"):
self.conn = None
self.cur1 = None
self.connname=connname
self._connect()
def _connect(self):
self.conn = psycopg2.connect(application_name=self.connname, database=demeter_run_db_sync_database, user=demeter_run_db_sync_user, password=demeter_run_db_sync_password, host=demeter_run_db_sync_instance, port=demeter_run_db_sync_port)
self.cur1 = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
def __del__(self):
# Maybe there is a connection but no cursor, whatever close silently!
try:
self.conn.close()
except:
pass
self.conn = None
@retry
def cur1_create(self):
if self.cur1 == None or self.cur1.closed:
self.cur1 = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
return True
def cur1_close(self):
self.cur1.close()
self.cur1=None
return True
@retry
def cur1_execute(self,sql,params=None): # pass here required params to get data from DB
return self.cur1.execute(sql,params)
@retry
def cur1_fetchone(self): # pass here required params to get data from DB
return self.cur1.fetchone()