From 4c9c84345dbb78dcf1c9a4f1f30dd5b0fd87940f Mon Sep 17 00:00:00 2001 From: Alexander Turenko Date: Thu, 6 May 2021 23:25:22 +0300 Subject: [PATCH 1/3] sampler: add simple sampling infrastructure Track tarantool and unit test executables that are run using test-run with metainformation: worker, test, test configuration and server name. Add a function that will be called each 0.1 second for each tracked process. The implementation tracks non-default servers and re-register default servers that executes several tests ('core = tarantool' case). Part of #277 --- dispatcher.py | 7 +- lib/app_server.py | 6 +- lib/sampler.py | 155 ++++++++++++++++++++++++++++++++++++++++ lib/tarantool_server.py | 15 ++++ lib/unittest_server.py | 2 + 5 files changed, 181 insertions(+), 4 deletions(-) create mode 100644 lib/sampler.py diff --git a/dispatcher.py b/dispatcher.py index e0053d4b..a58ba30d 100644 --- a/dispatcher.py +++ b/dispatcher.py @@ -34,6 +34,7 @@ from multiprocessing.queues import SimpleQueue from lib import Options +from lib.sampler import sampler from lib.utils import set_fd_cloexec from lib.worker import WorkerTaskResult, WorkerDone from lib.colorer import color_stdout @@ -121,7 +122,7 @@ def __init__(self, task_groups, max_workers_cnt, randomize): self.result_queues.append(task_queue_disp.result_queue) self.task_queues.append(task_queue_disp.task_queue) - self.report_timeout = 1.0 + self.report_timeout = 0.1 self.statistics = None self.artifacts = None @@ -166,7 +167,8 @@ def init_listeners(self): self.statistics = StatisticsWatcher(log_output_watcher.get_logfile) self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile) output_watcher = OutputWatcher() - self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts] + self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts, + sampler.watcher] if watch_fail: self.fail_watcher = FailWatcher(self.terminate_all_workers) self.listeners.append(self.fail_watcher) @@ -416,6 +418,7 @@ def _run_worker(self, worker_id, tcp_port_range): os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1]) color_stdout.queue = self.result_queue worker = self.gen_worker(worker_id) + sampler.set_queue(self.result_queue, worker_id, worker.name) worker.run_all(self.task_queue, self.result_queue) def add_worker(self, worker_id, tcp_port_range): diff --git a/lib/app_server.py b/lib/app_server.py index bf816ba5..ba3fd55f 100644 --- a/lib/app_server.py +++ b/lib/app_server.py @@ -13,6 +13,7 @@ from lib.colorer import qa_notice from lib.options import Options from lib.preprocessor import TestState +from lib.sampler import sampler from lib.server import Server from lib.server import DEFAULT_SNAPSHOT_NAME from lib.tarantool_server import Test @@ -31,9 +32,10 @@ def timeout_handler(server_process, test_timeout): server_process.kill() -def run_server(execs, cwd, server, logfile, retval): +def run_server(execs, cwd, server, logfile, retval, test_id): os.putenv("LISTEN", server.iproto) server.process = Popen(execs, stdout=PIPE, stderr=PIPE, cwd=cwd) + sampler.register_process(server.process.pid, test_id, server.name) test_timeout = Options().args.test_timeout timer = Timer(test_timeout, timeout_handler, (server.process, test_timeout)) timer.start() @@ -57,7 +59,7 @@ def execute(self, server): execs = server.prepare_args() retval = dict() tarantool = TestRunGreenlet(run_server, execs, server.vardir, server, - server.logfile, retval) + server.logfile, retval, self.id) self.current_test_greenlet = tarantool # Copy the snapshot right before starting the server. diff --git a/lib/sampler.py b/lib/sampler.py new file mode 100644 index 00000000..54263bf9 --- /dev/null +++ b/lib/sampler.py @@ -0,0 +1,155 @@ +import os +import sys +import time + +from lib.colorer import color_log +from lib.colorer import qa_notice +from lib.utils import format_process + + +if sys.version_info[0] == 2: + ProcessLookupError = OSError + + +# Don't inherit BaseWorkerMessage to bypass cyclic import. +class RegisterProcessMessage(object): + """Ask the sampler in the main test-run process to register + given process. + """ + def __init__(self, worker_id, worker_name, pid, task_id, server_name): + self.worker_id = worker_id + self.worker_name = worker_name + self.pid = pid + self.task_id = task_id + self.server_name = server_name + + +# Don't inherit BaseWatcher to bypass cyclic import. +class SamplerWatcher(object): + def __init__(self, sampler): + self._sampler = sampler + self._last_sample = 0 + self._sample_interval = 0.1 # seconds + self._warn_interval = self._sample_interval * 4 + + def process_result(self, obj): + if isinstance(obj, RegisterProcessMessage): + self._sampler.register_process( + obj.pid, obj.task_id, obj.server_name, obj.worker_id, + obj.worker_name) + self._wakeup() + + def process_timeout(self, delta_seconds): + self._wakeup() + + def _wakeup(self): + """Invoke Sampler.sample() if enough time elapsed since + the previous call. + """ + now = time.time() + delta = now - self._last_sample + if self._last_sample > 0 and delta > self._warn_interval: + template = 'Low sampling resolution. The expected interval\n' + \ + 'is {:.2f} seconds ({:.2f} seconds without warnings),\n' + \ + 'but the last sample was collected {:.2f} seconds ago.' + qa_notice(template.format(self._sample_interval, self._warn_interval, + delta)) + if delta > self._sample_interval: + self._sampler._sample() + self._last_sample = now + + +class Sampler: + def __init__(self): + # The instance is created in the test-run main process. + + # Field for an instance in a worker. + self._worker_id = None + self._worker_name = None + self._queue = None + + # Field for an instance in the main process. + self._watcher = SamplerWatcher(self) + + self._processes = dict() + + def set_queue(self, queue, worker_id, worker_name): + # Called from a worker process (_run_worker()). + self._worker_id = worker_id + self._worker_name = worker_name + self._queue = queue + self._watcher = None + + @property + def watcher(self): + if not self._watcher: + raise RuntimeError('sampler: watcher is available only in the ' + + 'main test-run process') + return self._watcher + + def register_process(self, pid, task_id, server_name, worker_id=None, + worker_name=None): + """Register a process to sampling. + + Call it without worker_* arguments from a worker + process. + """ + if not self._queue: + # In main test-run process. + self._processes[pid] = { + 'task_id': task_id, + 'server_name': server_name, + 'worker_id': worker_id, + 'worker_name': worker_name, + } + self._log('register', pid) + return + + # Pass to the main test-run process. + self._queue.put(RegisterProcessMessage( + self._worker_id, self._worker_name, pid, task_id, server_name)) + + def unregister_process(self, pid): + if self._queue: + raise NotImplementedError('sampler: a process unregistration ' + + 'from a test-run worker is not ' + + 'implemented yet') + if pid not in self._processes: + return + + self._log('unregister', pid) + del self._processes[pid] + + def _log(self, event, pid): + # Those logs are not written due to gh-247. + process_def = self._processes[pid] + task_id = process_def['task_id'] + test_name = task_id[0] + ((':' + task_id[1]) if task_id[1] else '') + worker_name = process_def['worker_name'] + server_name = process_def['server_name'] + color_log('DEBUG: sampler: {} {}\n'.format( + event, format_process(pid)), schema='info') + color_log(' | worker: {}\n'.format(worker_name)) + color_log(' | test: {}\n'.format(test_name)) + color_log(' | server: {}\n'.format(str(server_name))) + + def _sample(self): + for pid in list(self._processes.keys()): + # Unregister processes that're gone. + # Assume that PIDs are rarely reused. + try: + os.kill(pid, 0) + except ProcessLookupError: + self.unregister_process(pid) + else: + self._sample_process(pid) + + def _sample_process(self, pid): + # Your sampling code here. + pass + + +# The 'singleton' sampler instance: created in the main test-run +# process, but then work differently in the main process and +# workers. +sampler = Sampler() diff --git a/lib/tarantool_server.py b/lib/tarantool_server.py index 8c8222ae..6a628aab 100644 --- a/lib/tarantool_server.py +++ b/lib/tarantool_server.py @@ -32,6 +32,7 @@ from lib.colorer import qa_notice from lib.options import Options from lib.preprocessor import TestState +from lib.sampler import sampler from lib.server import Server from lib.server import DEFAULT_SNAPSHOT_NAME from lib.test import Test @@ -332,6 +333,10 @@ def exec_loop(self, ts): def execute(self, server): super(LuaTest, self).execute(server) + + # Track the same process metrics as part of another test. + sampler.register_process(server.process.pid, self.id, server.name) + cls_name = server.__class__.__name__.lower() if 'gdb' in cls_name or 'lldb' in cls_name or 'strace' in cls_name: # don't propagate gdb/lldb/strace mixin to non-default servers, @@ -399,6 +404,10 @@ class PythonTest(Test): def execute(self, server): super(PythonTest, self).execute(server) + + # Track the same process metrics as part of another test. + sampler.register_process(server.process.pid, self.id, server.name) + new_globals = dict(locals(), test_run_current_test=self, **server.__dict__) with open(self.name) as f: code = compile(f.read(), self.name, 'exec') @@ -866,6 +875,12 @@ def start(self, silent=True, wait=True, wait_load=True, rais=True, args=[], # Restore the actual PWD value. os.environ['PWD'] = os.getcwd() + # Track non-default server metrics as part of current + # test. + if self.current_test: + sampler.register_process(self.process.pid, self.current_test.id, + self.name) + # gh-19 crash detection self.crash_detector = TestRunGreenlet(self.crash_detect) self.crash_detector.info = "Crash detector: %s" % self.process diff --git a/lib/unittest_server.py b/lib/unittest_server.py index a3c83d39..affb245d 100644 --- a/lib/unittest_server.py +++ b/lib/unittest_server.py @@ -4,6 +4,7 @@ import glob from subprocess import Popen, PIPE, STDOUT +from lib.sampler import sampler from lib.server import Server from lib.tarantool_server import Test from lib.tarantool_server import TarantoolServer @@ -18,6 +19,7 @@ def execute(self, server): server.current_test = self execs = server.prepare_args() proc = Popen(execs, cwd=server.vardir, stdout=PIPE, stderr=STDOUT) + sampler.register_process(proc.pid, self.id, server.name) sys.stdout.write_bytes(proc.communicate()[0]) From 28c508359e3fbef51502a025be394f80058c9fa7 Mon Sep 17 00:00:00 2001 From: "Alexander V. Tikhonov" Date: Tue, 13 Apr 2021 07:26:19 +0300 Subject: [PATCH 2/3] Add RSS statistics collecting Found that some tests may fail due to lack of memory. Mostly it happens in CI on remote hosts. To be able to collect memory used statistic, decided to add RSS memory status collecting routine get_proc_stat_rss() which parses files: /proc//status for RSS value 'VmRSS' which is size of memory portions. It contains the three following parts (VmRSS = RssAnon + RssFile + RssShmem) [1]: RssAnon - size of resident anonymous memory RssFile - size of resident file mappings RssShmem - size of resident shmem memory (includes SysV shm, mapping of tmpfs and shared anonymous mappings) Decided that the best way for CI not to run this RSS collecting routine for each sent command from tests tasks, but to run it after the test task started each 0.1 second delay, to collect its maximum RSS value during task run. This delay used to run routines in 'SamplerWatcher' listener. Also found that delay of 0.1 sec is completely enough to catch RSS use increase, due to tested check: tarantool> require('clock').bench(function() local t = {} for i = 1, 1024^2 * 100 do t[i] = true end end) Which checked that 100 Mb of data allocated in seconds: - on CI test host: 3.153877479 - on local fast host: 0.54504489 The main idea is to check all test depend processes running at some point in time and choose maximum RSS value reached by it. For it used '_sample_process()' routine which gets RSS for each currently alive process and '_sample()' routine which counts sum of each task alive processes RSS and checks if this value is bigger than previously saved for the current task. Both routines are in 'Sampler()' class which is called by 'process_timeout()' routine from 'SamplerWatcher' listener. Also used print_statistics() routine in listener 'StatisticsWatcher' which prints statistics to stdout after testing. It is used to print RSS usage for failed tasks and up to 10 most used it tasks. Due to sampler delay which collects RSS values based on 0.1 sec then tests that run faster may not have RSS values at all and won't be printed. Created new subdirectory 'statistics' in 'vardir' path to save statistics files. The current patch uses it to save there 'rss.log' file with RSS values per tested tasks in format: Closes #277 [1]: https://www.kernel.org/doc/html/latest/filesystems/proc.html --- dispatcher.py | 5 ++-- lib/sampler.py | 35 ++++++++++++++++++++++--- lib/test_suite.py | 3 +++ lib/utils.py | 19 ++++++++++++++ lib/worker.py | 7 +++-- listeners.py | 66 ++++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 126 insertions(+), 9 deletions(-) diff --git a/dispatcher.py b/dispatcher.py index a58ba30d..5da908d6 100644 --- a/dispatcher.py +++ b/dispatcher.py @@ -167,8 +167,9 @@ def init_listeners(self): self.statistics = StatisticsWatcher(log_output_watcher.get_logfile) self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile) output_watcher = OutputWatcher() - self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts, - sampler.watcher] + self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts] + if sampler.is_enabled: + self.listeners.append(sampler.watcher) if watch_fail: self.fail_watcher = FailWatcher(self.terminate_all_workers) self.listeners.append(self.fail_watcher) diff --git a/lib/sampler.py b/lib/sampler.py index 54263bf9..eb83d890 100644 --- a/lib/sampler.py +++ b/lib/sampler.py @@ -5,6 +5,8 @@ from lib.colorer import color_log from lib.colorer import qa_notice from lib.utils import format_process +from lib.utils import get_proc_stat_rss +from lib.utils import proc_stat_rss_supported if sys.version_info[0] == 2: @@ -42,6 +44,10 @@ def process_result(self, obj): def process_timeout(self, delta_seconds): self._wakeup() + @property + def sample_interval(self): + return self._sample_interval + def _wakeup(self): """Invoke Sampler.sample() if enough time elapsed since the previous call. @@ -72,6 +78,7 @@ def __init__(self): self._watcher = SamplerWatcher(self) self._processes = dict() + self._rss_summary = dict() def set_queue(self, queue, worker_id, worker_name): # Called from a worker process (_run_worker()). @@ -80,6 +87,15 @@ def set_queue(self, queue, worker_id, worker_name): self._queue = queue self._watcher = None + @property + def rss_summary(self): + """Task ID to maximum RSS mapping.""" + return self._rss_summary + + @property + def sample_interval(self): + return self._watcher.sample_interval + @property def watcher(self): if not self._watcher: @@ -87,6 +103,10 @@ def watcher(self): 'main test-run process') return self._watcher + @property + def is_enabled(self): + return proc_stat_rss_supported() + def register_process(self, pid, task_id, server_name, worker_id=None, worker_name=None): """Register a process to sampling. @@ -134,6 +154,7 @@ def _log(self, event, pid): color_log(' | server: {}\n'.format(str(server_name))) def _sample(self): + tasks_rss = dict() for pid in list(self._processes.keys()): # Unregister processes that're gone. # Assume that PIDs are rarely reused. @@ -142,11 +163,17 @@ def _sample(self): except ProcessLookupError: self.unregister_process(pid) else: - self._sample_process(pid) + self._sample_process(pid, tasks_rss) + + # Save current overall RSS value if it is bigger than saved. + for task_id in tasks_rss: + if self.rss_summary.get(task_id, 0) < tasks_rss[task_id]: + self.rss_summary[task_id] = tasks_rss[task_id] - def _sample_process(self, pid): - # Your sampling code here. - pass + def _sample_process(self, pid, tasks_rss): + task_id = self._processes[pid]['task_id'] + # Count overall RSS per task. + tasks_rss[task_id] = get_proc_stat_rss(pid) + tasks_rss.get(task_id, 0) # The 'singleton' sampler instance: created in the main test-run diff --git a/lib/test_suite.py b/lib/test_suite.py index 67cdadff..826aa7d0 100644 --- a/lib/test_suite.py +++ b/lib/test_suite.py @@ -289,3 +289,6 @@ def fragile_retries(self): def show_reproduce_content(self): return self.ini['show_reproduce_content'] + + def test_is_long(self, task_id): + return os.path.basename(task_id[0]) in self.ini['long_run'] diff --git a/lib/utils.py b/lib/utils.py index a92db07c..6d7a21d8 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -233,6 +233,25 @@ def format_process(pid): return 'process %d [%s; %s]' % (pid, status, cmdline) +def proc_stat_rss_supported(): + return os.path.isfile('/proc/%d/status' % os.getpid()) + + +def get_proc_stat_rss(pid): + rss = 0 + try: + with open('/proc/%d/status' % pid, 'r') as f: + for line in f: + if ':' not in line: + continue + key, value = line.split(':', 1) + if key == 'VmRSS': + rss = int(value.strip().split()[0]) + except (OSError, IOError): + pass + return rss + + def set_fd_cloexec(socket): flags = fcntl.fcntl(socket, fcntl.F_GETFD) fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) diff --git a/lib/worker.py b/lib/worker.py index cf4fc441..399ead2f 100644 --- a/lib/worker.py +++ b/lib/worker.py @@ -145,14 +145,16 @@ class WorkerTaskResult(BaseWorkerMessage): the task processed successfully or not, but with little more flexibility than binary True/False. The result_checksum (string) field saves the results file checksum on test fail. The task_id (any hashable object) field hold ID of - the processed task. The show_reproduce_content configuration form suite.ini + the processed task. The is_long (boolean) field shows if task is in long test + list in suite.ini. The show_reproduce_content configuration from suite.ini. """ def __init__(self, worker_id, worker_name, task_id, - short_status, result_checksum, show_reproduce_content): + short_status, result_checksum, is_long, show_reproduce_content): super(WorkerTaskResult, self).__init__(worker_id, worker_name) self.short_status = short_status self.result_checksum = result_checksum self.task_id = task_id + self.is_long = is_long self.show_reproduce_content = show_reproduce_content @@ -221,6 +223,7 @@ def current_task(self, task_id): def wrap_result(self, task_id, short_status, result_checksum): return WorkerTaskResult(self.id, self.name, task_id, short_status, result_checksum, + self.suite.test_is_long(task_id), self.suite.show_reproduce_content()) def sigterm_handler(self, signum, frame): diff --git a/listeners.py b/listeners.py index b6a9b79b..f4bd9b7a 100644 --- a/listeners.py +++ b/listeners.py @@ -6,6 +6,7 @@ from lib import Options from lib.colorer import color_stdout from lib.colorer import decolor +from lib.sampler import sampler from lib.worker import WorkerCurrentTask from lib.worker import WorkerDone from lib.worker import WorkerOutput @@ -33,13 +34,19 @@ def process_timeout(self, delta_seconds): class StatisticsWatcher(BaseWatcher): def __init__(self, get_logfile): self.stats = dict() + self.field_size = 60 + self._sampler = sampler self.failed_tasks = [] self.get_logfile = get_logfile + self.long_tasks = set() def process_result(self, obj): if not isinstance(obj, WorkerTaskResult): return + if obj.is_long: + self.long_tasks.add(obj.task_id) + if obj.short_status not in self.stats: self.stats[obj.short_status] = 0 self.stats[obj.short_status] += 1 @@ -50,8 +57,63 @@ def process_result(self, obj): obj.result_checksum, obj.show_reproduce_content)) + def get_long_mark(self, task): + return '(long)' if task in self.long_tasks else '' + + def prettify_task_name(self, task_id): + return task_id[0] + ((':' + task_id[1]) if task_id[1] else '') + + # RSS. + def print_rss_summary(self, stats_dir): + if not self._sampler.is_enabled: + return + + rss_summary = self._sampler.rss_summary + top_rss = 10 + + # Print to stdout RSS statistics for all failed tasks. + if self.failed_tasks: + color_stdout('Occupied memory in failed tests (RSS, Mb):\n', schema='info') + for task in self.failed_tasks: + task_id = task[0] + if task_id in rss_summary: + color_stdout('* %6.1f %s %s\n' % (float(rss_summary[task_id]) / 1024, + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), + schema='info') + color_stdout('\n') + + # Print to stdout RSS statistics for some number of most it used tasks. + color_stdout('Top {} tests by occupied memory (RSS, Mb):\n'.format( + top_rss), schema='info') + results_sorted = sorted(rss_summary.items(), key=lambda x: x[1], reverse=True) + for task_id, rss in results_sorted[:top_rss]: + color_stdout('* %6.1f %s %s\n' % (float(rss) / 1024, + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), schema='info') + color_stdout('\n') + + color_stdout('(Tests quicker than {} seconds may be missed.)\n'.format( + self._sampler.sample_interval), schema='info') + + color_stdout('-' * 81, "\n", schema='separator') + + # Print RSS statistics to '/statistics/rss.log' file. + filepath = os.path.join(stats_dir, 'rss.log') + fd = open(filepath, 'w') + for task_id in rss_summary: + fd.write("{} {}\n".format(self.prettify_task_name(task_id), + rss_summary[task_id])) + fd.close() + def print_statistics(self): - """Returns are there failed tasks.""" + """Print statistics and results of testing.""" + # Prepare standalone subpath '/statistics' for statistics files. + stats_dir = os.path.join(Options().args.vardir, 'statistics') + safe_makedirs(stats_dir) + + self.print_rss_summary(stats_dir) + if self.stats: color_stdout('Statistics:\n', schema='test_var') for short_status, cnt in self.stats.items(): @@ -121,6 +183,8 @@ def save_artifacts(self): ignore=shutil.ignore_patterns( '*.socket-iproto', '*.socket-admin', '*.sock', '*.control')) + shutil.copytree(os.path.join(vardir, 'statistics'), + os.path.join(artifacts_dir, 'statistics')) class LogOutputWatcher(BaseWatcher): From b80762a9e3f289fd1256d24822e7798e1944323d Mon Sep 17 00:00:00 2001 From: "Alexander V. Tikhonov" Date: Wed, 5 May 2021 09:42:15 +0000 Subject: [PATCH 3/3] Add tests duration collecting Decided to collect tests run duration in standalone file and print to stdout after testing finished. To stdout printing duration for failed tasks and up to 10 most long tasks. For duration collecting used listener 'StatisticsWatcher' which has the following used routines: process_result() Using 'WorkerTaskResult' queue message to collect tasks that failed and final duration for finished tasks. Duration collecting routine added in the worker running tests. Its values using 'result_queue' passing through 'WorkerTaskResult' queue message into the listener. print_statistics() - statistics printing to stdout after testing. Prints duration for failed tasks and up to 10 most long tasks. We use standalone 'statistics' directory in 'vardir' path to save 'duration.log' file with duration for each tested tasks in format: Closes #286 --- lib/test_suite.py | 5 ++++- lib/worker.py | 19 +++++++++++-------- listeners.py | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/lib/test_suite.py b/lib/test_suite.py index 826aa7d0..fd8ad008 100644 --- a/lib/test_suite.py +++ b/lib/test_suite.py @@ -9,6 +9,7 @@ import os import re import sys +import time from lib import Options from lib.app_server import AppServer @@ -268,18 +269,20 @@ def run_test(self, test, server, inspector): conf = test.conf_name color_stdout(just_and_trim(conf, 15) + ' ', schema='test_var') + start_time = time.time() if self.is_test_enabled(test, conf, server): short_status, result_checksum = test.run(server) else: color_stdout("[ disabled ]\n", schema='t_name') short_status = 'disabled' result_checksum = None + duration = time.time() - start_time # cleanup only if test passed or if --force mode enabled if Options().args.is_force or short_status == 'pass': inspector.cleanup_nondefault() - return short_status, result_checksum + return short_status, result_checksum, duration def is_parallel(self): return self.ini['is_parallel'] diff --git a/lib/worker.py b/lib/worker.py index 399ead2f..adfe3c72 100644 --- a/lib/worker.py +++ b/lib/worker.py @@ -146,15 +146,17 @@ class WorkerTaskResult(BaseWorkerMessage): than binary True/False. The result_checksum (string) field saves the results file checksum on test fail. The task_id (any hashable object) field hold ID of the processed task. The is_long (boolean) field shows if task is in long test - list in suite.ini. The show_reproduce_content configuration from suite.ini. + list in suite.ini. The duration (float) field saves the task run time. The + show_reproduce_content configuration from suite.ini. """ def __init__(self, worker_id, worker_name, task_id, - short_status, result_checksum, is_long, show_reproduce_content): + short_status, result_checksum, is_long, duration, show_reproduce_content): super(WorkerTaskResult, self).__init__(worker_id, worker_name) self.short_status = short_status self.result_checksum = result_checksum self.task_id = task_id self.is_long = is_long + self.duration = duration self.show_reproduce_content = show_reproduce_content @@ -220,10 +222,10 @@ def current_task(self, task_id): return WorkerCurrentTask(self.id, self.name, task_name, task_param, task_result, task_tmp_result) - def wrap_result(self, task_id, short_status, result_checksum): + def wrap_result(self, task_id, short_status, result_checksum, duration): return WorkerTaskResult(self.id, self.name, task_id, short_status, result_checksum, - self.suite.test_is_long(task_id), + self.suite.test_is_long(task_id), duration, self.suite.show_reproduce_content()) def sigterm_handler(self, signum, frame): @@ -315,7 +317,7 @@ def run_task(self, task_id): with open(self.reproduce_file, 'a') as f: task_id_str = yaml.safe_dump(task.id, default_flow_style=True) f.write('- ' + task_id_str) - short_status, result_checksum = self.suite.run_test( + short_status, result_checksum, duration = self.suite.run_test( task, self.server, self.inspector) except KeyboardInterrupt: self.report_keyboard_interrupt() @@ -325,7 +327,7 @@ def run_task(self, task_id): '\nWorker "%s" received the following error; stopping...\n' % self.name + traceback.format_exc() + '\n', schema='error') raise - return short_status, result_checksum + return short_status, result_checksum, duration def run_loop(self, task_queue, result_queue): """ called from 'run_all' """ @@ -341,6 +343,7 @@ def run_loop(self, task_queue, result_queue): short_status = None result_checksum = None + duration = 0.0 result_queue.put(self.current_task(task_id)) testname = os.path.basename(task_id[0]) fragile_checksums = self.suite.get_test_fragile_checksums(testname) @@ -356,14 +359,14 @@ def run_loop(self, task_queue, result_queue): ' file checksum: "%s", rerunning with server restart ...\n' % (task_id[0], task_id[1], result_checksum), schema='error') # run task and save the result to short_status - short_status, result_checksum = self.run_task(task_id) + short_status, result_checksum, duration = self.run_task(task_id) # check if the results file checksum set on fail and if # the newly created results file is known by checksum if not result_checksum or (result_checksum not in fragile_checksums): break retries_left = retries_left - 1 - result_queue.put(self.wrap_result(task_id, short_status, result_checksum)) + result_queue.put(self.wrap_result(task_id, short_status, result_checksum, duration)) if short_status == 'fail': if Options().args.is_force: self.restart_server() diff --git a/listeners.py b/listeners.py index f4bd9b7a..107e57e1 100644 --- a/listeners.py +++ b/listeners.py @@ -36,6 +36,7 @@ def __init__(self, get_logfile): self.stats = dict() self.field_size = 60 self._sampler = sampler + self.duration_stats = dict() self.failed_tasks = [] self.get_logfile = get_logfile self.long_tasks = set() @@ -57,6 +58,8 @@ def process_result(self, obj): obj.result_checksum, obj.show_reproduce_content)) + self.duration_stats[obj.task_id] = obj.duration + def get_long_mark(self, task): return '(long)' if task in self.long_tasks else '' @@ -106,6 +109,42 @@ def print_rss_summary(self, stats_dir): rss_summary[task_id])) fd.close() + # Durations. + def print_duration(self, stats_dir): + top_durations = 10 + + # Print to stdout durations for all failed tasks. + if self.failed_tasks: + color_stdout('Duration of failed tests (seconds):\n', + schema='info') + for task in self.failed_tasks: + task_id = task[0] + if task_id in self.duration_stats: + color_stdout('* %6.2f %s %s\n' % (self.duration_stats[task_id], + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), + schema='info') + color_stdout('\n') + + # Print to stdout durations for some number of most long tasks. + color_stdout('Top {} longest tests (seconds):\n'.format(top_durations), + schema='info') + results_sorted = sorted(self.duration_stats.items(), key=lambda x: x[1], reverse=True) + for task_id, duration in results_sorted[:top_durations]: + color_stdout('* %6.2f %s %s\n' % (duration, + self.prettify_task_name(task_id).ljust(self.field_size), + self.get_long_mark(task_id)), schema='info') + + color_stdout('-' * 81, "\n", schema='separator') + + # Print duration statistics to '/statistics/duration.log' file. + filepath = os.path.join(stats_dir, 'duration.log') + fd = open(filepath, 'w') + for task_id in self.duration_stats: + fd.write("{} {}\n".format(self.prettify_task_name(task_id), + self.duration_stats[task_id])) + fd.close() + def print_statistics(self): """Print statistics and results of testing.""" # Prepare standalone subpath '/statistics' for statistics files. @@ -113,6 +152,7 @@ def print_statistics(self): safe_makedirs(stats_dir) self.print_rss_summary(stats_dir) + self.print_duration(stats_dir) if self.stats: color_stdout('Statistics:\n', schema='test_var')