Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing, sys
import multiprocessing, sys, time
from datetime import datetime
import jobs, rebalancer, htlc_stream, p2p, manage
import logging
logger = logging.getLogger('[Controller]')
Expand All @@ -22,9 +23,21 @@ def main():
processes.append(process)
process.start()

for process in processes:
process.join()
logger.info('Stopping all LNDg processes...')
try:
while True:
for process in processes:
if not process.is_alive():
logger.error(f"Process {process.name} died with exitcode {process.exitcode}. Exiting controller.")
for p in processes:
if p.is_alive():
p.terminate()
sys.exit(1)
time.sleep(2)
except KeyboardInterrupt:
logger.info('Controller is stopping...')
for p in processes:
if p.is_alive():
p.terminate()

if __name__ == '__main__':
main()
5 changes: 5 additions & 0 deletions htlc_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def main():
del all_forwards[key]
except Exception as e:
logger.error(f'Error while running failed HTLC stream: {str(e)}')
try:
from django.db import connections
connections.close_all()
except Exception as db_err:
logger.error(f"Error closing database connections: {str(db_err)}")
sleep(20)

if __name__ == '__main__':
Expand Down
5 changes: 5 additions & 0 deletions jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,11 @@ def main():
agg_failed_htlcs()
except Exception as e:
logger.error(f'Error processing background data: {str(e)}')
try:
from django.db import connections
connections.close_all()
except Exception as db_err:
logger.error(f"Error closing database connections: {str(db_err)}")
logger.info('Data execution completed...sleeping for 20 seconds')
sleep(20)

Expand Down
4 changes: 4 additions & 0 deletions p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def main():
sleep(2) # polling interval
except Exception as e:
logger.error(f'Error running p2p service: {str(e)}')
try:
django.db.connections.close_all()
except Exception as db_err:
logger.error(f"Error closing database connections: {str(db_err)}")
finally:
if 'p2p_thread' in locals() and p2p_thread.is_alive():
logger.info('Removing any remaining processes...')
Expand Down
14 changes: 14 additions & 0 deletions rebalancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,36 @@
import logging
logger = logging.getLogger('[Rebalancer]')

def close_db_connections():
try:
from django.db import connections
connections.close_all()
except Exception as e:
logger.error(f"Error closing database connections: {str(e)}")

@sync_to_async
def get_out_cans(rebalance, auto_rebalance_channels):
try:
return list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True))
except Exception as e:
logger.error(f'Error getting outbound cands: {str(e)}')
close_db_connections()

@sync_to_async
def save_record(record):
try:
record.save()
except Exception as e:
logger.error(f'Error saving database record: {str(e)}')
close_db_connections()

@sync_to_async
def inbound_cans_len(inbound_cans):
try:
return len(inbound_cans)
except Exception as e:
logger.error(f'Error getting inbound cands: {str(e)}')
close_db_connections()

@sync_to_async
def check_and_set_allow_multishards():
Expand Down Expand Up @@ -265,6 +275,7 @@ def auto_schedule() -> List[Rebalancer]:
return to_schedule
except Exception as e:
logger.error(f'Error scheduling rebalances: {str(e)}')
close_db_connections()
return to_schedule

@sync_to_async
Expand Down Expand Up @@ -317,6 +328,7 @@ def auto_enable():
logger.debug('Case 5: Pass')
except Exception as e:
logger.error(f'Error during auto channel enabling: {str(e)}')
close_db_connections()

@sync_to_async
def get_pending_rebals():
Expand All @@ -325,6 +337,7 @@ def get_pending_rebals():
return rebalances, len(rebalances)
except Exception as e:
logger.error(f'Error getting pending rebalances: {str(e)}')
close_db_connections()

async def async_queue_manager(rebalancer_queue):
global scheduled_rebalances, active_rebalances, shutdown_rebalancer
Expand Down Expand Up @@ -434,6 +447,7 @@ def main():
except Exception as e:
error = str(e)
logger.error(f'Rebalancer loop error: {error}')
close_db_connections()
finally:
logger.info('Rebalancer loop has been terminated')

Expand Down