#!/usr/bin/env python3 import argparse import threading import subprocess import os import sys from typing import List # Dict, Set, , Tuple, Optional from datetime import datetime from pathlib import Path from abc import ABC, abstractmethod # from typing import ForwardRef try: from typing import ForwardRef # type: ignore except ImportError: # python 3.6 from typing import _ForwardRef as ForwardRef assert sys.version_info >= (3, 5, 0), 'this code requires at least python 3.5' # type hints in arguments class Run(): def __init__(self, run_id: int, worker_id: int): self.id = run_id self.worker_id = worker_id # the worker used for this run (number of workers = number of parallel runs) self.pid = None self.return_code = 0 self.start_time = datetime.now() self.end_time = None def has_finished(self): return self.end_time is not None def get_duration(self): assert self.has_finished() return (self.end_time - self.start_time).total_seconds() StarBencher = ForwardRef('StarBencher') class IStarBencherStopCondition(ABC): @abstractmethod def should_stop(self, star_bencher: StarBencher): pass class StopAfterSingleRun(IStarBencherStopCondition): def __init__(self): pass def should_stop(self, star_bencher: StarBencher): # never start a new run return True class StopWhenConverged(IStarBencherStopCondition): def __init__(self, max_error: float = 0.01): self.max_error = max_error self._last_mean_duration = None def should_stop(self, star_bencher: StarBencher): do_stop = False mean_duration, num_runs = star_bencher._get_run_mean_duration() print('mean_duration = %f' % mean_duration) if self._last_mean_duration is not None: diff = abs(mean_duration - self._last_mean_duration) print('diff = %f' % diff) if diff < self.max_error: do_stop = True self._last_mean_duration = mean_duration return do_stop class StarBencher(): ''' the 'star' term comes from hpl's stadgemm benchmark, where we launch `n` independent programs on `n cores` ''' def __init__(self, run_command: List[str], num_cores_per_run: int, num_parallel_runs: int, max_num_cores: int, stop_condition: IStarBencherStopCondition, stop_on_error=True, run_command_cwd: Path = None, stdout_filepath: Path = None, stderr_filepath: Path = None): assert num_cores_per_run * num_parallel_runs <= max_num_cores self.run_command = run_command # in python3.6+, replace with self.run_command: List[str] = run_command self.run_command_cwd = run_command_cwd self.stdout_filepath = stdout_filepath self.stderr_filepath = stderr_filepath self.num_cores_per_run = num_cores_per_run self.num_parallel_runs = num_parallel_runs self.max_num_cores = max_num_cores # in python3.6+, replace with self.max_num_cores: int = max_num_cores self.stop_condition = stop_condition # in python3.6+, replace with self.stop_condition: IStarBencherStopCondition = stop_condition self.stop_on_error = stop_on_error self._next_run_id = 0 # in python3.6+, replace with self._next_run_id: int = 0 self._runs = {} # in python3.6+, replace with self._runs: Dict(int, Run) = {} self._last_mean_duration = None self._num_runs = 0 self._runs_lock = threading.Lock() self._finished_event = threading.Event() def popen_and_call(self, popen_args, on_exit, run_id: int, cwd: Path, stdout_filepath: Path = None, stderr_filepath: Path = None): """ Runs the given args in a subprocess.Popen, and then calls the function on_exit when the subprocess completes. on_exit is a callable object, and popen_args is a list/tuple of args that would give to subprocess.Popen. """ def run_in_thread(popen_args, on_exit): stdout = None stderr = None if stdout_filepath is not None: stdout = open(stdout_filepath, 'w') if stderr_filepath is not None: stderr = open(stderr_filepath, 'w') env = os.environ.copy() # restrict the number of threads used by openmp env['OMP_NUM_THREADS'] = '%d' % self.num_cores_per_run # restrict the nu,ber of threads used by intel math kernel library env['MKL_NUM_THREADS'] = '%d' % self.num_cores_per_run proc = subprocess.Popen(popen_args, cwd=cwd, stdout=stdout, stderr=stderr, env=env) proc.wait() if stderr is not None: stderr.close() if stdout is not None: stdout.close() on_exit(proc.pid, proc.returncode, run_id) return thread = threading.Thread(target=run_in_thread, args=(popen_args, on_exit)) thread.start() # returns immediately after the thread starts return thread def _get_run_mean_duration(self): duration_sums = 0.0 # in python3.6+, replace with duration_sums: float = 0.0 num_finished_runs = 0 # in python3.6+, replace with num_finished_runs: int = 0 with self._runs_lock: for run in self._runs.values(): if run.has_finished(): num_finished_runs += 1 duration_sums += run.get_duration() assert num_finished_runs > 0 return duration_sums / num_finished_runs, num_finished_runs def _all_runs_have_finished(self): with self._runs_lock: for run in self._runs.values(): if not run.has_finished(): return False return True def on_exit(self, pid: int, return_code: int, run_id: int): end_time = datetime.now() # print(self, pid, run_id) run = self._runs[run_id] run.pid = pid run.end_time = end_time run.return_code = return_code do_stop = False if self.stop_on_error and run.return_code != 0: do_stop = True else: do_stop = self.stop_condition.should_stop(self) if not do_stop: # print('adding a run') self._start_run(run.worker_id) # reuse the same worker as the run that has just finished if self._all_runs_have_finished(): # tell the main thread that all the runs have finished self._finished_event.set() def _start_run(self, worker_id: int): worker_as_str = '%03d' % worker_id run_command = [str(s).replace('', worker_as_str) for s in self.run_command] run_command_cwd = str(self.run_command_cwd).replace('', worker_as_str) stdout_filepath = None if self.stdout_filepath is not None: stdout_filepath = str(self.stdout_filepath).replace('', worker_as_str) stderr_filepath = None if self.stderr_filepath is not None: stderr_filepath = str(self.stderr_filepath).replace('', worker_as_str) run_command_cwd = str(self.run_command_cwd).replace('', worker_as_str) with self._runs_lock: run = Run(self._next_run_id, worker_id) self._next_run_id += 1 run_thread = self.popen_and_call(popen_args=run_command, on_exit=self.on_exit, run_id=run.id, cwd=run_command_cwd, stdout_filepath=stdout_filepath, stderr_filepath=stderr_filepath) # noqa:F841 self._runs[run.id] = run def run(self): print("executing the following command in parallel (%d parallel runs) : '%s'" % (self.num_parallel_runs, str(self.run_command))) for worker_id in range(self.num_parallel_runs): self._start_run(worker_id) # wait until all runs have finished self._finished_event.wait() with self._runs_lock: workers_success = [run.return_code == 0 for run in self._runs.values()] if not all(workers_success): raise Exception('at least one run failed (workers_success = %s)' % workers_success) mean_duration, num_runs = self._get_run_mean_duration() print('mean duration : %.3f s (%d runs)' % (mean_duration, num_runs)) return mean_duration def test_starbencher(): if False: stop_condition = StopAfterSingleRun() # stop_condition = StopWhenConverged(max_error=0.0001) bench = StarBencher(run_command=['sleep', '0.1415927'], num_cores_per_run=1, num_parallel_runs=2, max_num_cores=2, stop_condition=stop_condition) mean_duration = bench.run() print(mean_duration) if False: bench = StarBencher(run_command=['ls', '/tmp'], num_cores_per_run=1, num_parallel_runs=2, max_num_cores=2, max_error=0.0001) mean_duration = bench.run() print(mean_duration) # end of starbencher def starbench_cmake_app(git_repos_url: str, code_version: str, tmp_dir: Path, num_cores: int, git_user: str, git_password: str, benchmark_command: List[str], cmake_options: List[str] = None, cmake_exe_location: Path = None): """ tests_to_run : regular expression as understood by ctest's -L option. eg '^arch4_quick$' """ tmp_dir.mkdir(exist_ok=True) git_credentials = [] if git_user: git_credentials.append(git_user) if git_password: git_credentials.append(git_password) if len(git_credentials) != 0: git_repos_url = git_repos_url.replace('https://', 'https://%s@' % ':'.join(git_credentials)) src_dir = tmp_dir / 'source.git' # src_dir.mkdir(exist_ok=True) subprocess.run(['git', 'clone', '%s' % (str(git_repos_url)), str(src_dir)], cwd=str(tmp_dir), check=True) if code_version: subprocess.run(['git', 'checkout', '%s' % (code_version)], cwd=str(src_dir), check=True) # we need one build for each parallel run, otherwise running ctest on parallel would overwrite the same file, which causes the test to randomly fail depnding on race conditions worker_dir = tmp_dir / 'worker' build_dir = worker_dir / 'build' print('creating build directory %s' % worker_dir) create_build_dir = StarBencher( run_command=['mkdir', '-p', build_dir], num_cores_per_run=1, num_parallel_runs=num_cores, max_num_cores=num_cores, stop_condition=StopAfterSingleRun(), run_command_cwd=Path('/tmp'), stdout_filepath=None) create_build_dir_duration = create_build_dir.run() # noqa: F841 # build_dir.mkdir(exist_ok=True) print('configuring %s into %s ...' % (src_dir, build_dir)) cmake_prog = 'cmake' if cmake_exe_location: cmake_prog = str(cmake_exe_location) configure = StarBencher( run_command=[cmake_prog] + cmake_options + [src_dir], num_cores_per_run=1, num_parallel_runs=num_cores, max_num_cores=num_cores, stop_condition=StopAfterSingleRun(), run_command_cwd=build_dir, stdout_filepath=worker_dir / 'configure_stdout.txt', stderr_filepath=worker_dir / 'configure_stderr.txt') configure_duration = configure.run() # noqa: F841 print('building %s ...' % (build_dir)) build = StarBencher( run_command=['make'], num_cores_per_run=1, num_parallel_runs=num_cores, max_num_cores=num_cores, stop_condition=StopAfterSingleRun(), run_command_cwd=build_dir, stdout_filepath=worker_dir / 'build_stdout.txt', stderr_filepath=worker_dir / 'build_stderr.txt') build_duration = build.run() # noqa: F841 print('benchmarking %s ...' % (build_dir)) stop_condition = StopAfterSingleRun() bench = StarBencher( run_command=benchmark_command, num_cores_per_run=1, num_parallel_runs=num_cores, max_num_cores=num_cores, stop_condition=stop_condition, run_command_cwd=build_dir, stdout_filepath=worker_dir / 'bench_stdout.txt', stderr_filepath=worker_dir / 'bench_stderr.txt') mean_duration = bench.run() print('duration : %.3f s' % (mean_duration)) if __name__ == '__main__': example_text = '''example: %(prog)s --git-repos-url https://github.com/hibridon/hibridon --code-version a3bed1c3ccfbca572003020d3e3d3b1ff3934fad --git-user g-raffy --git-pass-file "$HOME/.github/personal_access_tokens/bench.hibridon.cluster.ipr.univ-rennes1.fr.pat" --num-cores 2 --output-dir=/tmp/hibench --cmake-path=/opt/cmake/cmake-3.23.0/bin/cmake --cmake-option=-DCMAKE_BUILD_TYPE=Release --cmake-option=-DBUILD_TESTING=ON --benchmark-command='ctest --output-on-failure -L ^arch4_quick$' ''' parser = argparse.ArgumentParser(description='performs a benchmark on a cmake buildable app hosted on a git repository', epilog=example_text, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--git-repos-url', required=True, help='the url of the code to benchmark (eg https://github.com/hibridon/hibridon)') parser.add_argument('--code-version', help='the version of the code to use; either a branch or a commit id (eg a3bed1c3ccfbca572003020d3e3d3b1ff3934fad)') parser.add_argument('--git-user', help='the git user to use to clone the code repository') password_group = parser.add_mutually_exclusive_group() password_group.add_argument('--git-pass-file', help='the path to a file containing the password (or personal access token)') password_group.add_argument('--git-pass', type=str, help='the password (or personal access token) to use (not recommended for security reasons)') parser.add_argument('--num-cores', type=int, required=True, help='the number of cores that the benchmark will use') parser.add_argument('--output-dir', type=Path, required=True, help='where the output files will be placed') parser.add_argument('--cmake-path', type=Path, help='the path to the cmake executable to use in case a specific cmake is wanted') parser.add_argument('--cmake-option', type=str, action='append', help='additional option passed to cmake in the configure step (use this flag multiple times if you need more than one cmake option)') parser.add_argument('--benchmark-command', required=True, type=str, help='the command to benchmark') args = parser.parse_args() git_user = args.git_user git_repos_url = args.git_repos_url git_password = None if args.git_pass: git_password = args.git_pass elif args.git_pass_file: with open(args.git_pass_file, 'r') as f: git_password = f.readline().replace('\n', '') # os.environ['HIBRIDON_REPOS_PAT'] starbench_cmake_app(git_repos_url=git_repos_url, code_version=args.code_version, tmp_dir=args.output_dir, num_cores=args.num_cores, git_user=git_user, git_password=git_password, cmake_options=args.cmake_option, benchmark_command=args.benchmark_command.split(' '), cmake_exe_location=args.cmake_path)