Skip to content

Add RSS statistics and timings collecting #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from multiprocessing.queues import SimpleQueue

from lib import Options
from lib.sampler import sampler
from lib.utils import set_fd_cloexec
from lib.worker import WorkerTaskResult, WorkerDone
from lib.colorer import color_stdout
Expand Down Expand Up @@ -121,7 +122,7 @@ def __init__(self, task_groups, max_workers_cnt, randomize):
self.result_queues.append(task_queue_disp.result_queue)
self.task_queues.append(task_queue_disp.task_queue)

self.report_timeout = 1.0
self.report_timeout = 0.1

self.statistics = None
self.artifacts = None
Expand Down Expand Up @@ -167,6 +168,8 @@ def init_listeners(self):
self.artifacts = ArtifactsWatcher(log_output_watcher.get_logfile)
output_watcher = OutputWatcher()
self.listeners = [self.statistics, log_output_watcher, output_watcher, self.artifacts]
if sampler.is_enabled:
self.listeners.append(sampler.watcher)
if watch_fail:
self.fail_watcher = FailWatcher(self.terminate_all_workers)
self.listeners.append(self.fail_watcher)
Expand Down Expand Up @@ -416,6 +419,7 @@ def _run_worker(self, worker_id, tcp_port_range):
os.environ['TEST_RUN_TCP_PORT_END'] = str(tcp_port_range[1])
color_stdout.queue = self.result_queue
worker = self.gen_worker(worker_id)
sampler.set_queue(self.result_queue, worker_id, worker.name)
worker.run_all(self.task_queue, self.result_queue)

def add_worker(self, worker_id, tcp_port_range):
Expand Down
6 changes: 4 additions & 2 deletions lib/app_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from lib.colorer import qa_notice
from lib.options import Options
from lib.preprocessor import TestState
from lib.sampler import sampler
from lib.server import Server
from lib.server import DEFAULT_SNAPSHOT_NAME
from lib.tarantool_server import Test
Expand All @@ -31,9 +32,10 @@ def timeout_handler(server_process, test_timeout):
server_process.kill()


def run_server(execs, cwd, server, logfile, retval):
def run_server(execs, cwd, server, logfile, retval, test_id):
os.putenv("LISTEN", server.iproto)
server.process = Popen(execs, stdout=PIPE, stderr=PIPE, cwd=cwd)
sampler.register_process(server.process.pid, test_id, server.name)
test_timeout = Options().args.test_timeout
timer = Timer(test_timeout, timeout_handler, (server.process, test_timeout))
timer.start()
Expand All @@ -57,7 +59,7 @@ def execute(self, server):
execs = server.prepare_args()
retval = dict()
tarantool = TestRunGreenlet(run_server, execs, server.vardir, server,
server.logfile, retval)
server.logfile, retval, self.id)
self.current_test_greenlet = tarantool

# Copy the snapshot right before starting the server.
Expand Down
182 changes: 182 additions & 0 deletions lib/sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import os
import sys
import time

from lib.colorer import color_log
from lib.colorer import qa_notice
from lib.utils import format_process
from lib.utils import get_proc_stat_rss
from lib.utils import proc_stat_rss_supported


if sys.version_info[0] == 2:
ProcessLookupError = OSError


# Don't inherit BaseWorkerMessage to bypass cyclic import.
class RegisterProcessMessage(object):
"""Ask the sampler in the main test-run process to register
given process.
"""
def __init__(self, worker_id, worker_name, pid, task_id, server_name):
self.worker_id = worker_id
self.worker_name = worker_name
self.pid = pid
self.task_id = task_id
self.server_name = server_name


# Don't inherit BaseWatcher to bypass cyclic import.
class SamplerWatcher(object):
def __init__(self, sampler):
self._sampler = sampler
self._last_sample = 0
self._sample_interval = 0.1 # seconds
self._warn_interval = self._sample_interval * 4

def process_result(self, obj):
if isinstance(obj, RegisterProcessMessage):
self._sampler.register_process(
obj.pid, obj.task_id, obj.server_name, obj.worker_id,
obj.worker_name)
self._wakeup()

def process_timeout(self, delta_seconds):
self._wakeup()

@property
def sample_interval(self):
return self._sample_interval

def _wakeup(self):
"""Invoke Sampler.sample() if enough time elapsed since
the previous call.
"""
now = time.time()
delta = now - self._last_sample
if self._last_sample > 0 and delta > self._warn_interval:
template = 'Low sampling resolution. The expected interval\n' + \
'is {:.2f} seconds ({:.2f} seconds without warnings),\n' + \
'but the last sample was collected {:.2f} seconds ago.'
qa_notice(template.format(self._sample_interval, self._warn_interval,
delta))
if delta > self._sample_interval:
self._sampler._sample()
self._last_sample = now


class Sampler:
def __init__(self):
# The instance is created in the test-run main process.

# Field for an instance in a worker.
self._worker_id = None
self._worker_name = None
self._queue = None

# Field for an instance in the main process.
self._watcher = SamplerWatcher(self)

self._processes = dict()
self._rss_summary = dict()

def set_queue(self, queue, worker_id, worker_name):
# Called from a worker process (_run_worker()).
self._worker_id = worker_id
self._worker_name = worker_name
self._queue = queue
self._watcher = None

@property
def rss_summary(self):
"""Task ID to maximum RSS mapping."""
return self._rss_summary

@property
def sample_interval(self):
return self._watcher.sample_interval

@property
def watcher(self):
if not self._watcher:
raise RuntimeError('sampler: watcher is available only in the ' +
'main test-run process')
return self._watcher

@property
def is_enabled(self):
return proc_stat_rss_supported()

def register_process(self, pid, task_id, server_name, worker_id=None,
worker_name=None):
"""Register a process to sampling.

Call it without worker_* arguments from a worker
process.
"""
if not self._queue:
# In main test-run process.
self._processes[pid] = {
'task_id': task_id,
'server_name': server_name,
'worker_id': worker_id,
'worker_name': worker_name,
}
self._log('register', pid)
return

# Pass to the main test-run process.
self._queue.put(RegisterProcessMessage(
self._worker_id, self._worker_name, pid, task_id, server_name))

def unregister_process(self, pid):
if self._queue:
raise NotImplementedError('sampler: a process unregistration ' +
'from a test-run worker is not ' +
'implemented yet')
if pid not in self._processes:
return

self._log('unregister', pid)
del self._processes[pid]

def _log(self, event, pid):
# Those logs are not written due to gh-247.
process_def = self._processes[pid]
task_id = process_def['task_id']
test_name = task_id[0] + ((':' + task_id[1]) if task_id[1] else '')
worker_name = process_def['worker_name']
server_name = process_def['server_name']
color_log('DEBUG: sampler: {} {}\n'.format(
event, format_process(pid)), schema='info')
color_log(' | worker: {}\n'.format(worker_name))
color_log(' | test: {}\n'.format(test_name))
color_log(' | server: {}\n'.format(str(server_name)))

def _sample(self):
tasks_rss = dict()
for pid in list(self._processes.keys()):
# Unregister processes that're gone.
# Assume that PIDs are rarely reused.
try:
os.kill(pid, 0)
except ProcessLookupError:
self.unregister_process(pid)
else:
self._sample_process(pid, tasks_rss)

# Save current overall RSS value if it is bigger than saved.
for task_id in tasks_rss:
if self.rss_summary.get(task_id, 0) < tasks_rss[task_id]:
self.rss_summary[task_id] = tasks_rss[task_id]

def _sample_process(self, pid, tasks_rss):
task_id = self._processes[pid]['task_id']
# Count overall RSS per task.
tasks_rss[task_id] = get_proc_stat_rss(pid) + tasks_rss.get(task_id, 0)


# The 'singleton' sampler instance: created in the main test-run
# process, but then work differently in the main process and
# workers.
sampler = Sampler()
15 changes: 15 additions & 0 deletions lib/tarantool_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from lib.colorer import qa_notice
from lib.options import Options
from lib.preprocessor import TestState
from lib.sampler import sampler
from lib.server import Server
from lib.server import DEFAULT_SNAPSHOT_NAME
from lib.test import Test
Expand Down Expand Up @@ -332,6 +333,10 @@ def exec_loop(self, ts):

def execute(self, server):
super(LuaTest, self).execute(server)

# Track the same process metrics as part of another test.
sampler.register_process(server.process.pid, self.id, server.name)

cls_name = server.__class__.__name__.lower()
if 'gdb' in cls_name or 'lldb' in cls_name or 'strace' in cls_name:
# don't propagate gdb/lldb/strace mixin to non-default servers,
Expand Down Expand Up @@ -399,6 +404,10 @@ class PythonTest(Test):

def execute(self, server):
super(PythonTest, self).execute(server)

# Track the same process metrics as part of another test.
sampler.register_process(server.process.pid, self.id, server.name)

new_globals = dict(locals(), test_run_current_test=self, **server.__dict__)
with open(self.name) as f:
code = compile(f.read(), self.name, 'exec')
Expand Down Expand Up @@ -866,6 +875,12 @@ def start(self, silent=True, wait=True, wait_load=True, rais=True, args=[],
# Restore the actual PWD value.
os.environ['PWD'] = os.getcwd()

# Track non-default server metrics as part of current
# test.
if self.current_test:
sampler.register_process(self.process.pid, self.current_test.id,
self.name)

# gh-19 crash detection
self.crash_detector = TestRunGreenlet(self.crash_detect)
self.crash_detector.info = "Crash detector: %s" % self.process
Expand Down
8 changes: 7 additions & 1 deletion lib/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import re
import sys
import time

from lib import Options
from lib.app_server import AppServer
Expand Down Expand Up @@ -268,18 +269,20 @@ def run_test(self, test, server, inspector):
conf = test.conf_name
color_stdout(just_and_trim(conf, 15) + ' ', schema='test_var')

start_time = time.time()
if self.is_test_enabled(test, conf, server):
short_status, result_checksum = test.run(server)
else:
color_stdout("[ disabled ]\n", schema='t_name')
short_status = 'disabled'
result_checksum = None
duration = time.time() - start_time

# cleanup only if test passed or if --force mode enabled
if Options().args.is_force or short_status == 'pass':
inspector.cleanup_nondefault()

return short_status, result_checksum
return short_status, result_checksum, duration

def is_parallel(self):
return self.ini['is_parallel']
Expand All @@ -289,3 +292,6 @@ def fragile_retries(self):

def show_reproduce_content(self):
return self.ini['show_reproduce_content']

def test_is_long(self, task_id):
return os.path.basename(task_id[0]) in self.ini['long_run']
2 changes: 2 additions & 0 deletions lib/unittest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import glob
from subprocess import Popen, PIPE, STDOUT

from lib.sampler import sampler
from lib.server import Server
from lib.tarantool_server import Test
from lib.tarantool_server import TarantoolServer
Expand All @@ -18,6 +19,7 @@ def execute(self, server):
server.current_test = self
execs = server.prepare_args()
proc = Popen(execs, cwd=server.vardir, stdout=PIPE, stderr=STDOUT)
sampler.register_process(proc.pid, self.id, server.name)
sys.stdout.write_bytes(proc.communicate()[0])


Expand Down
19 changes: 19 additions & 0 deletions lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,25 @@ def format_process(pid):
return 'process %d [%s; %s]' % (pid, status, cmdline)


def proc_stat_rss_supported():
return os.path.isfile('/proc/%d/status' % os.getpid())


def get_proc_stat_rss(pid):
rss = 0
try:
with open('/proc/%d/status' % pid, 'r') as f:
for line in f:
if ':' not in line:
continue
key, value = line.split(':', 1)
if key == 'VmRSS':
rss = int(value.strip().split()[0])
except (OSError, IOError):
pass
return rss


def set_fd_cloexec(socket):
flags = fcntl.fcntl(socket, fcntl.F_GETFD)
fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
Expand Down
Loading