You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
4.5 KiB

#!/usr/bin/env python
import os
import sys
import time
import platform
from multiprocessing import Process
import psutil
import cexc
from util.constants import HOWTO_CONFIGURE_MLSPL_LIMITS
from util.chunk_util import write_chunk
MONITORING_INTERVAL = 1.0
class Watchdog(object):
def __init__(
self, time_limit=60, memory_limit=100 * 1024 * 1024, finalize_file=None, pid=None
):
self.time_limit = time_limit
self.memory_limit = memory_limit
self.finalize_file = finalize_file
self.started = False
self._process = None
self.start_time = None
self._pid = pid or os.getpid()
self._victim = None # do not access directly. Use self._get_victim()
self._init_victim()
def _get_current_process(self):
return psutil.Process(pid=self._pid)
def _init_victim(self):
# On Windows psutil.Process is not pickle-able (https://bugs.python.org/issue29168)
# whereas this class needs to be pickleale. As such, on Windows
# we do not instantiate an instance during class initializatoin but instead
# create an instance each time we need one.
if platform.system() in ('Windows', 'Darwin'):
return
self._victim = self._get_current_process()
def _get_victim(self):
return self._victim or self._get_current_process()
def start(self):
self._process = Process(target=self.main, name="ML-SPL Watchdog")
self._process.daemon = True
self._process.start()
self.started = True
def __del__(self):
if hasattr(self, '_process') and isinstance(self._process, Process):
self.join()
def join(self):
self._process.terminate()
self._process.join(1)
def main(self):
logger = cexc.setup_logging.get_logger('mlspl_watchdog')
self.start_time = time.time()
while True:
# Check to see if parent is still running...
if not self._get_victim().is_running():
logger.info(
'Watchdog exiting because parent %s disappeared.', self._get_victim()
)
return 0
delta = time.time() - self.start_time
# Check time_limit
if self.time_limit >= 0 and delta > self.time_limit:
logger.info(
'Terminate %s: exceeded time limit (%d > %d)',
self._get_victim(),
delta,
self.time_limit,
)
# Note: this chunk output may race with our parent...
write_chunk( # pylint: disable=W0212
sys.stdout.buffer,
{
'error': 'Time limit exceeded (> {} seconds). {}'.format(
self.time_limit, HOWTO_CONFIGURE_MLSPL_LIMITS
)
},
)
self._get_victim().terminate()
return 1
# Check memory limit
rss = self._get_victim().memory_info().rss
if self.memory_limit >= 0 and rss > self.memory_limit:
logger.info(
'Terminating %s: exceeded memory limit (%d > %d)',
self._get_victim(),
rss,
self.memory_limit,
)
# Note: this chunk output may race with our parent...
write_chunk( # pylint: disable=W0212
sys.stdout.buffer,
{
'error': 'Memory limit exceeded (> {} bytes). {}'.format(
self.memory_limit, HOWTO_CONFIGURE_MLSPL_LIMITS
)
},
)
self._get_victim().terminate()
return 2
# Check if finalize file exists
if self.finalize_file and os.path.exists(self.finalize_file):
logger.info('Terminating %s: finalize file detected', self._get_victim())
# Note: this chunk output may race with our parent...
write_chunk( # pylint: disable=W0212
sys.stdout.buffer,
{'error': 'Aborting because job finalization was requested.'},
)
self._get_victim().terminate()
return 3
logger.debug(
'Monitoring %s: Running for %.1f secs, rss %d', self._get_victim(), delta, rss
)
time.sleep(MONITORING_INTERVAL)
return 0