From 157862f533d13f1e346a1dbe8f2302d37c8d1266 Mon Sep 17 00:00:00 2001 From: TrezorHannes Date: Fri, 22 May 2026 19:58:39 +0200 Subject: [PATCH] fix: database connection auto-healing and controller fail-fast monitoring - Implement Django connection auto-healing in background workers (jobs.py, rebalancer.py, p2p.py, htlc_stream.py) to prevent unrecoverable zombie loops on DB drop. - Implement process monitoring in controller.py to terminate the process group and exit if any worker process dies, allowing systemd restart. --- controller.py | 21 +++++++++++++++++---- htlc_stream.py | 5 +++++ jobs.py | 5 +++++ p2p.py | 4 ++++ rebalancer.py | 14 ++++++++++++++ 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/controller.py b/controller.py index ae71c0e0..d816dfc3 100644 --- a/controller.py +++ b/controller.py @@ -1,4 +1,5 @@ -import multiprocessing, sys +import multiprocessing, sys, time +from datetime import datetime import jobs, rebalancer, htlc_stream, p2p, manage import logging logger = logging.getLogger('[Controller]') @@ -22,9 +23,21 @@ def main(): processes.append(process) process.start() - for process in processes: - process.join() - logger.info('Stopping all LNDg processes...') + try: + while True: + for process in processes: + if not process.is_alive(): + logger.error(f"Process {process.name} died with exitcode {process.exitcode}. Exiting controller.") + for p in processes: + if p.is_alive(): + p.terminate() + sys.exit(1) + time.sleep(2) + except KeyboardInterrupt: + logger.info('Controller is stopping...') + for p in processes: + if p.is_alive(): + p.terminate() if __name__ == '__main__': main() diff --git a/htlc_stream.py b/htlc_stream.py index ad9185e0..1bccef18 100644 --- a/htlc_stream.py +++ b/htlc_stream.py @@ -61,6 +61,11 @@ def main(): del all_forwards[key] except Exception as e: logger.error(f'Error while running failed HTLC stream: {str(e)}') + try: + from django.db import connections + connections.close_all() + except Exception as db_err: + logger.error(f"Error closing database connections: {str(db_err)}") sleep(20) if __name__ == '__main__': diff --git a/jobs.py b/jobs.py index 13494f63..b47cbe26 100644 --- a/jobs.py +++ b/jobs.py @@ -834,6 +834,11 @@ def main(): agg_failed_htlcs() except Exception as e: logger.error(f'Error processing background data: {str(e)}') + try: + from django.db import connections + connections.close_all() + except Exception as db_err: + logger.error(f"Error closing database connections: {str(db_err)}") logger.info('Data execution completed...sleeping for 20 seconds') sleep(20) diff --git a/p2p.py b/p2p.py index 1055d3d1..f7413197 100644 --- a/p2p.py +++ b/p2p.py @@ -41,6 +41,10 @@ def main(): sleep(2) # polling interval except Exception as e: logger.error(f'Error running p2p service: {str(e)}') + try: + django.db.connections.close_all() + except Exception as db_err: + logger.error(f"Error closing database connections: {str(db_err)}") finally: if 'p2p_thread' in locals() and p2p_thread.is_alive(): logger.info('Removing any remaining processes...') diff --git a/rebalancer.py b/rebalancer.py index 60ccc55d..d99d9052 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -16,12 +16,20 @@ import logging logger = logging.getLogger('[Rebalancer]') +def close_db_connections(): + try: + from django.db import connections + connections.close_all() + except Exception as e: + logger.error(f"Error closing database connections: {str(e)}") + @sync_to_async def get_out_cans(rebalance, auto_rebalance_channels): try: return list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True)) except Exception as e: logger.error(f'Error getting outbound cands: {str(e)}') + close_db_connections() @sync_to_async def save_record(record): @@ -29,6 +37,7 @@ def save_record(record): record.save() except Exception as e: logger.error(f'Error saving database record: {str(e)}') + close_db_connections() @sync_to_async def inbound_cans_len(inbound_cans): @@ -36,6 +45,7 @@ def inbound_cans_len(inbound_cans): return len(inbound_cans) except Exception as e: logger.error(f'Error getting inbound cands: {str(e)}') + close_db_connections() @sync_to_async def check_and_set_allow_multishards(): @@ -265,6 +275,7 @@ def auto_schedule() -> List[Rebalancer]: return to_schedule except Exception as e: logger.error(f'Error scheduling rebalances: {str(e)}') + close_db_connections() return to_schedule @sync_to_async @@ -317,6 +328,7 @@ def auto_enable(): logger.debug('Case 5: Pass') except Exception as e: logger.error(f'Error during auto channel enabling: {str(e)}') + close_db_connections() @sync_to_async def get_pending_rebals(): @@ -325,6 +337,7 @@ def get_pending_rebals(): return rebalances, len(rebalances) except Exception as e: logger.error(f'Error getting pending rebalances: {str(e)}') + close_db_connections() async def async_queue_manager(rebalancer_queue): global scheduled_rebalances, active_rebalances, shutdown_rebalancer @@ -434,6 +447,7 @@ def main(): except Exception as e: error = str(e) logger.error(f'Rebalancer loop error: {error}') + close_db_connections() finally: logger.info('Rebalancer loop has been terminated')