diff --git a/dispatcher.py b/dispatcher.py index e0053d4b..5da908d6 100644 --- a/dispatcher.py +++ b/dispatcher.py @@ -34,6 +34,7 @@ from multiprocessing.queues import SimpleQueue from lib import Options +from lib.sampler import sampler from lib.utils import set_fd_cloexec from lib.worker import WorkerTaskResult, WorkerDone from lib.colorer import color_stdout @@ -121,7 +122,7 @@ def __init__(self, task_groups, max_workers_cnt, randomize): self.result_queues.append(task_queue_disp.result_queue) self.task_queues.append(task_queue_disp.task_queue) - self.report_timeout = 1.0 + self.report_timeout = 0.1 self.statistics = None self.artifacts = None @@ -167,6 +168,8 @@ def init_listeners(self): self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile) output_watcher = OutputWatcher() self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts] + if sampler.is_enabled: + self.listeners.append(sampler.watcher) if watch_fail: self.fail_watcher = FailWatcher(self.terminate_all_workers) self.listeners.append(self.fail_watcher) @@ -416,6 +419,7 @@ def _run_worker(self, worker_id, tcp_port_range): os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1]) color_stdout.queue = self.result_queue worker = self.gen_worker(worker_id) + sampler.set_queue(self.result_queue, worker_id, worker.name) worker.run_all(self.task_queue, self.result_queue) def add_worker(self, worker_id, tcp_port_range): diff --git a/lib/app_server.py b/lib/app_server.py index bf816ba5..ba3fd55f 100644 --- a/lib/app_server.py +++ b/lib/app_server.py @@ -13,6 +13,7 @@ from lib.colorer import qa_notice from lib.options import Options from lib.preprocessor import TestState +from lib.sampler import sampler from lib.server import Server from lib.server import DEFAULT_SNAPSHOT_NAME from lib.tarantool_server import Test @@ -31,9 +32,10 @@ def timeout_handler(server_process, test_timeout): server_process.kill() -def run_server(execs, cwd, server, logfile, retval): +def run_server(execs, cwd, server, logfile, retval, test_id): os.putenv("LISTEN", server.iproto) server.process = Popen(execs, stdout=PIPE, stderr=PIPE, cwd=cwd) + sampler.register_process(server.process.pid, test_id, server.name) test_timeout = Options().args.test_timeout timer = Timer(test_timeout, timeout_handler, (server.process, test_timeout)) timer.start() @@ -57,7 +59,7 @@ def execute(self, server): execs = server.prepare_args() retval = dict() tarantool = TestRunGreenlet(run_server, execs, server.vardir, server, - server.logfile, retval) + server.logfile, retval, self.id) self.current_test_greenlet = tarantool # Copy the snapshot right before starting the server. diff --git a/lib/sampler.py b/lib/sampler.py new file mode 100644 index 00000000..eb83d890 --- /dev/null +++ b/lib/sampler.py @@ -0,0 +1,182 @@ +import os +import sys +import time + +from lib.colorer import color_log +from lib.colorer import qa_notice +from lib.utils import format_process +from lib.utils import get_proc_stat_rss +from lib.utils import proc_stat_rss_supported + + +if sys.version_info[0] == 2: + ProcessLookupError = OSError + + +# Don't inherit BaseWorkerMessage to bypass cyclic import. +class RegisterProcessMessage(object): + """Ask the sampler in the main test-run process to register + given process. + """ + def __init__(self, worker_id, worker_name, pid, task_id, server_name): + self.worker_id = worker_id + self.worker_name = worker_name + self.pid = pid + self.task_id = task_id + self.server_name = server_name + + +# Don't inherit BaseWatcher to bypass cyclic import. +class SamplerWatcher(object): + def __init__(self, sampler): + self._sampler = sampler + self._last_sample = 0 + self._sample_interval = 0.1 # seconds + self._warn_interval = self._sample_interval * 4 + + def process_result(self, obj): + if isinstance(obj, RegisterProcessMessage): + self._sampler.register_process( + obj.pid, obj.task_id, obj.server_name, obj.worker_id, + obj.worker_name) + self._wakeup() + + def process_timeout(self, delta_seconds): + self._wakeup() + + @property + def sample_interval(self): + return self._sample_interval + + def _wakeup(self): + """Invoke Sampler.sample() if enough time elapsed since + the previous call. + """ + now = time.time() + delta = now - self._last_sample + if self._last_sample > 0 and delta > self._warn_interval: + template = 'Low sampling resolution. The expected interval\n' + \ + 'is {:.2f} seconds ({:.2f} seconds without warnings),\n' + \ + 'but the last sample was collected {:.2f} seconds ago.' + qa_notice(template.format(self._sample_interval, self._warn_interval, + delta)) + if delta > self._sample_interval: + self._sampler._sample() + self._last_sample = now + + +class Sampler: + def __init__(self): + # The instance is created in the test-run main process. + + # Field for an instance in a worker. + self._worker_id = None + self._worker_name = None + self._queue = None + + # Field for an instance in the main process. + self._watcher = SamplerWatcher(self) + + self._processes = dict() + self._rss_summary = dict() + + def set_queue(self, queue, worker_id, worker_name): + # Called from a worker process (_run_worker()). + self._worker_id = worker_id + self._worker_name = worker_name + self._queue = queue + self._watcher = None + + @property + def rss_summary(self): + """Task ID to maximum RSS mapping.""" + return self._rss_summary + + @property + def sample_interval(self): + return self._watcher.sample_interval + + @property + def watcher(self): + if not self._watcher: + raise RuntimeError('sampler: watcher is available only in the ' + + 'main test-run process') + return self._watcher + + @property + def is_enabled(self): + return proc_stat_rss_supported() + + def register_process(self, pid, task_id, server_name, worker_id=None, + worker_name=None): + """Register a process to sampling. + + Call it without worker_* arguments from a worker + process. + """ + if not self._queue: + # In main test-run process. + self._processes[pid] = { + 'task_id': task_id, + 'server_name': server_name, + 'worker_id': worker_id, + 'worker_name': worker_name, + } + self._log('register', pid) + return + + # Pass to the main test-run process. + self._queue.put(RegisterProcessMessage( + self._worker_id, self._worker_name, pid, task_id, server_name)) + + def unregister_process(self, pid): + if self._queue: + raise NotImplementedError('sampler: a process unregistration ' + + 'from a test-run worker is not ' + + 'implemented yet') + if pid not in self._processes: + return + + self._log('unregister', pid) + del self._processes[pid] + + def _log(self, event, pid): + # Those logs are not written due to gh-247. + process_def = self._processes[pid] + task_id = process_def['task_id'] + test_name = task_id[0] + ((':' + task_id[1]) if task_id[1] else '') + worker_name = process_def['worker_name'] + server_name = process_def['server_name'] + color_log('DEBUG: sampler: {} {}\n'.format( + event, format_process(pid)), schema='info') + color_log(' | worker: {}\n'.format(worker_name)) + color_log(' | test: {}\n'.format(test_name)) + color_log(' | server: {}\n'.format(str(server_name))) + + def _sample(self): + tasks_rss = dict() + for pid in list(self._processes.keys()): + # Unregister processes that're gone. + # Assume that PIDs are rarely reused. + try: + os.kill(pid, 0) + except ProcessLookupError: + self.unregister_process(pid) + else: + self._sample_process(pid, tasks_rss) + + # Save current overall RSS value if it is bigger than saved. + for task_id in tasks_rss: + if self.rss_summary.get(task_id, 0) < tasks_rss[task_id]: + self.rss_summary[task_id] = tasks_rss[task_id] + + def _sample_process(self, pid, tasks_rss): + task_id = self._processes[pid]['task_id'] + # Count overall RSS per task. + tasks_rss[task_id] = get_proc_stat_rss(pid) + tasks_rss.get(task_id, 0) + + +# The 'singleton' sampler instance: created in the main test-run +# process, but then work differently in the main process and +# workers. +sampler = Sampler() diff --git a/lib/tarantool_server.py b/lib/tarantool_server.py index 8c8222ae..6a628aab 100644 --- a/lib/tarantool_server.py +++ b/lib/tarantool_server.py @@ -32,6 +32,7 @@ from lib.colorer import qa_notice from lib.options import Options from lib.preprocessor import TestState +from lib.sampler import sampler from lib.server import Server from lib.server import DEFAULT_SNAPSHOT_NAME from lib.test import Test @@ -332,6 +333,10 @@ def exec_loop(self, ts): def execute(self, server): super(LuaTest, self).execute(server) + + # Track the same process metrics as part of another test. + sampler.register_process(server.process.pid, self.id, server.name) + cls_name = server.__class__.__name__.lower() if 'gdb' in cls_name or 'lldb' in cls_name or 'strace' in cls_name: # don't propagate gdb/lldb/strace mixin to non-default servers, @@ -399,6 +404,10 @@ class PythonTest(Test): def execute(self, server): super(PythonTest, self).execute(server) + + # Track the same process metrics as part of another test. + sampler.register_process(server.process.pid, self.id, server.name) + new_globals = dict(locals(), test_run_current_test=self, **server.__dict__) with open(self.name) as f: code = compile(f.read(), self.name, 'exec') @@ -866,6 +875,12 @@ def start(self, silent=True, wait=True, wait_load=True, rais=True, args=[], # Restore the actual PWD value. os.environ['PWD'] = os.getcwd() + # Track non-default server metrics as part of current + # test. + if self.current_test: + sampler.register_process(self.process.pid, self.current_test.id, + self.name) + # gh-19 crash detection self.crash_detector = TestRunGreenlet(self.crash_detect) self.crash_detector.info = "Crash detector: %s" % self.process diff --git a/lib/test_suite.py b/lib/test_suite.py index 67cdadff..fd8ad008 100644 --- a/lib/test_suite.py +++ b/lib/test_suite.py @@ -9,6 +9,7 @@ import os import re import sys +import time from lib import Options from lib.app_server import AppServer @@ -268,18 +269,20 @@ def run_test(self, test, server, inspector): conf = test.conf_name color_stdout(just_and_trim(conf, 15) + ' ', schema='test_var') + start_time = time.time() if self.is_test_enabled(test, conf, server): short_status, result_checksum = test.run(server) else: color_stdout("[ disabled ]\n", schema='t_name') short_status = 'disabled' result_checksum = None + duration = time.time() - start_time # cleanup only if test passed or if --force mode enabled if Options().args.is_force or short_status == 'pass': inspector.cleanup_nondefault() - return short_status, result_checksum + return short_status, result_checksum, duration def is_parallel(self): return self.ini['is_parallel'] @@ -289,3 +292,6 @@ def fragile_retries(self): def show_reproduce_content(self): return self.ini['show_reproduce_content'] + + def test_is_long(self, task_id): + return os.path.basename(task_id[0]) in self.ini['long_run'] diff --git a/lib/unittest_server.py b/lib/unittest_server.py index a3c83d39..affb245d 100644 --- a/lib/unittest_server.py +++ b/lib/unittest_server.py @@ -4,6 +4,7 @@ import glob from subprocess import Popen, PIPE, STDOUT +from lib.sampler import sampler from lib.server import Server from lib.tarantool_server import Test from lib.tarantool_server import TarantoolServer @@ -18,6 +19,7 @@ def execute(self, server): server.current_test = self execs = server.prepare_args() proc = Popen(execs, cwd=server.vardir, stdout=PIPE, stderr=STDOUT) + sampler.register_process(proc.pid, self.id, server.name) sys.stdout.write_bytes(proc.communicate()[0]) diff --git a/lib/utils.py b/lib/utils.py index a92db07c..6d7a21d8 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -233,6 +233,25 @@ def format_process(pid): return 'process %d [%s; %s]' % (pid, status, cmdline) +def proc_stat_rss_supported(): + return os.path.isfile('/proc/%d/status' % os.getpid()) + + +def get_proc_stat_rss(pid): + rss = 0 + try: + with open('/proc/%d/status' % pid, 'r') as f: + for line in f: + if ':' not in line: + continue + key, value = line.split(':', 1) + if key == 'VmRSS': + rss = int(value.strip().split()[0]) + except (OSError, IOError): + pass + return rss + + def set_fd_cloexec(socket): flags = fcntl.fcntl(socket, fcntl.F_GETFD) fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) diff --git a/lib/worker.py b/lib/worker.py index cf4fc441..adfe3c72 100644 --- a/lib/worker.py +++ b/lib/worker.py @@ -145,14 +145,18 @@ class WorkerTaskResult(BaseWorkerMessage): the task processed successfully or not, but with little more flexibility than binary True/False. The result_checksum (string) field saves the results file checksum on test fail. The task_id (any hashable object) field hold ID of - the processed task. The show_reproduce_content configuration form suite.ini + the processed task. The is_long (boolean) field shows if task is in long test + list in suite.ini. The duration (float) field saves the task run time. The + show_reproduce_content configuration from suite.ini. """ def __init__(self, worker_id, worker_name, task_id, - short_status, result_checksum, show_reproduce_content): + short_status, result_checksum, is_long, duration, show_reproduce_content): super(WorkerTaskResult, self).__init__(worker_id, worker_name) self.short_status = short_status self.result_checksum = result_checksum self.task_id = task_id + self.is_long = is_long + self.duration = duration self.show_reproduce_content = show_reproduce_content @@ -218,9 +222,10 @@ def current_task(self, task_id): return WorkerCurrentTask(self.id, self.name, task_name, task_param, task_result, task_tmp_result) - def wrap_result(self, task_id, short_status, result_checksum): + def wrap_result(self, task_id, short_status, result_checksum, duration): return WorkerTaskResult(self.id, self.name, task_id, short_status, result_checksum, + self.suite.test_is_long(task_id), duration, self.suite.show_reproduce_content()) def sigterm_handler(self, signum, frame): @@ -312,7 +317,7 @@ def run_task(self, task_id): with open(self.reproduce_file, 'a') as f: task_id_str = yaml.safe_dump(task.id, default_flow_style=True) f.write('- ' + task_id_str) - short_status, result_checksum = self.suite.run_test( + short_status, result_checksum, duration = self.suite.run_test( task, self.server, self.inspector) except KeyboardInterrupt: self.report_keyboard_interrupt() @@ -322,7 +327,7 @@ def run_task(self, task_id): '\nWorker "%s" received the following error; stopping...\n' % self.name + traceback.format_exc() + '\n', schema='error') raise - return short_status, result_checksum + return short_status, result_checksum, duration def run_loop(self, task_queue, result_queue): """ called from 'run_all' """ @@ -338,6 +343,7 @@ def run_loop(self, task_queue, result_queue): short_status = None result_checksum = None + duration = 0.0 result_queue.put(self.current_task(task_id)) testname = os.path.basename(task_id[0]) fragile_checksums = self.suite.get_test_fragile_checksums(testname) @@ -353,14 +359,14 @@ def run_loop(self, task_queue, result_queue): ' file checksum: "%s", rerunning with server restart ...\n' % (task_id[0], task_id[1], result_checksum), schema='error') # run task and save the result to short_status - short_status, result_checksum = self.run_task(task_id) + short_status, result_checksum, duration = self.run_task(task_id) # check if the results file checksum set on fail and if # the newly created results file is known by checksum if not result_checksum or (result_checksum not in fragile_checksums): break retries_left = retries_left - 1 - result_queue.put(self.wrap_result(task_id, short_status, result_checksum)) + result_queue.put(self.wrap_result(task_id, short_status, result_checksum, duration)) if short_status == 'fail': if Options().args.is_force: self.restart_server() diff --git a/listeners.py b/listeners.py index b6a9b79b..107e57e1 100644 --- a/listeners.py +++ b/listeners.py @@ -6,6 +6,7 @@ from lib import Options from lib.colorer import color_stdout from lib.colorer import decolor +from lib.sampler import sampler from lib.worker import WorkerCurrentTask from lib.worker import WorkerDone from lib.worker import WorkerOutput @@ -33,13 +34,20 @@ def process_timeout(self, delta_seconds): class StatisticsWatcher(BaseWatcher): def __init__(self, get_logfile): self.stats = dict() + self.field_size = 60 + self._sampler = sampler + self.duration_stats = dict() self.failed_tasks = [] self.get_logfile = get_logfile + self.long_tasks = set() def process_result(self, obj): if not isinstance(obj, WorkerTaskResult): return + if obj.is_long: + self.long_tasks.add(obj.task_id) + if obj.short_status not in self.stats: self.stats[obj.short_status] = 0 self.stats[obj.short_status] += 1 @@ -50,8 +58,102 @@ def process_result(self, obj): obj.result_checksum, obj.show_reproduce_content)) + self.duration_stats[obj.task_id] = obj.duration + + def get_long_mark(self, task): + return '(long)' if task in self.long_tasks else '' + + def prettify_task_name(self, task_id): + return task_id[0] + ((':' + task_id[1]) if task_id[1] else '') + + # RSS. + def print_rss_summary(self, stats_dir): + if not self._sampler.is_enabled: + return + + rss_summary = self._sampler.rss_summary + top_rss = 10 + + # Print to stdout RSS statistics for all failed tasks. + if self.failed_tasks: + color_stdout('Occupied memory in failed tests (RSS, Mb):\n', schema='info') + for task in self.failed_tasks: + task_id = task[0] + if task_id in rss_summary: + color_stdout('* %6.1f %s %s\n' % (float(rss_summary[task_id]) / 1024, + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), + schema='info') + color_stdout('\n') + + # Print to stdout RSS statistics for some number of most it used tasks. + color_stdout('Top {} tests by occupied memory (RSS, Mb):\n'.format( + top_rss), schema='info') + results_sorted = sorted(rss_summary.items(), key=lambda x: x[1], reverse=True) + for task_id, rss in results_sorted[:top_rss]: + color_stdout('* %6.1f %s %s\n' % (float(rss) / 1024, + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), schema='info') + color_stdout('\n') + + color_stdout('(Tests quicker than {} seconds may be missed.)\n'.format( + self._sampler.sample_interval), schema='info') + + color_stdout('-' * 81, "\n", schema='separator') + + # Print RSS statistics to '/statistics/rss.log' file. + filepath = os.path.join(stats_dir, 'rss.log') + fd = open(filepath, 'w') + for task_id in rss_summary: + fd.write("{} {}\n".format(self.prettify_task_name(task_id), + rss_summary[task_id])) + fd.close() + + # Durations. + def print_duration(self, stats_dir): + top_durations = 10 + + # Print to stdout durations for all failed tasks. + if self.failed_tasks: + color_stdout('Duration of failed tests (seconds):\n', + schema='info') + for task in self.failed_tasks: + task_id = task[0] + if task_id in self.duration_stats: + color_stdout('* %6.2f %s %s\n' % (self.duration_stats[task_id], + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), + schema='info') + color_stdout('\n') + + # Print to stdout durations for some number of most long tasks. + color_stdout('Top {} longest tests (seconds):\n'.format(top_durations), + schema='info') + results_sorted = sorted(self.duration_stats.items(), key=lambda x: x[1], reverse=True) + for task_id, duration in results_sorted[:top_durations]: + color_stdout('* %6.2f %s %s\n' % (duration, + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), schema='info') + + color_stdout('-' * 81, "\n", schema='separator') + + # Print duration statistics to '/statistics/duration.log' file. + filepath = os.path.join(stats_dir, 'duration.log') + fd = open(filepath, 'w') + for task_id in self.duration_stats: + fd.write("{} {}\n".format(self.prettify_task_name(task_id), + self.duration_stats[task_id])) + fd.close() + def print_statistics(self): - """Returns are there failed tasks.""" + """Print statistics and results of testing.""" + # Prepare standalone subpath '/statistics' for statistics files. + stats_dir = os.path.join(Options().args.vardir, 'statistics') + safe_makedirs(stats_dir) + + self.print_rss_summary(stats_dir) + self.print_duration(stats_dir) + if self.stats: color_stdout('Statistics:\n', schema='test_var') for short_status, cnt in self.stats.items(): @@ -121,6 +223,8 @@ def save_artifacts(self): ignore=shutil.ignore_patterns( '*.socket-iproto', '*.socket-admin', '*.sock', '*.control')) + shutil.copytree(os.path.join(vardir, 'statistics'), + os.path.join(artifacts_dir, 'statistics')) class LogOutputWatcher(BaseWatcher):