diff --git a/starbench.py b/starbench.py index 0c5fea7..5840db8 100755 --- a/starbench.py +++ b/starbench.py @@ -1,16 +1,19 @@ #!/usr/bin/env python3 +'''starbench is an application that is able to measure the execution time of a user software suite in various conditions (different build modes and different execution modes) + +''' import argparse import threading import subprocess import os import sys -from typing import List # Dict, Set, , Tuple, Optional +from typing import List, Dict, Optional, Tuple, Callable from datetime import datetime from pathlib import Path from abc import ABC, abstractmethod # from typing import ForwardRef try: - from typing import ForwardRef # type: ignore + from typing import ForwardRef # type: ignore pylint: disable=ungrouped-imports except ImportError: # python 3.6 from typing import _ForwardRef as ForwardRef @@ -18,105 +21,152 @@ except ImportError: assert sys.version_info >= (3, 5, 0), 'this code requires at least python 3.5' # type hints in arguments -class Run(): +class StarBenchException(Exception): + '''base exception for user errors detected by starbench''' - def __init__(self, run_id: int, worker_id: int): + +RunId = int # identifier of a run +WorkerId = int # identifier of a worker (a run is performed on a worker) +DurationInSeconds = float +ProcessId = int +ReturnCode = int + + +class Run(): + """represents a run of a run of the benchmarked command within its CommandPerfEstimator + """ + id: RunId # uniquely identifies a run within its CommandPerfEstimator instance + worker_id: WorkerId # the worker used for this run (number of workers = number of parallel runs) + pid: Optional[ProcessId] # the process identifier of the process used by the command + start_time: datetime # the time at which the command process has started + return_code: ReturnCode # the exit code of the command process + end_time: Optional[datetime] # the time at which the command process has ended. None if the process is still running + + def __init__(self, run_id: RunId, worker_id: WorkerId): self.id = run_id - self.worker_id = worker_id # the worker used for this run (number of workers = number of parallel runs) + self.worker_id = worker_id self.pid = None self.return_code = 0 self.start_time = datetime.now() self.end_time = None - def has_finished(self): + def has_finished(self) -> bool: + """indicates if this run has finished""" return self.end_time is not None - def get_duration(self): + def get_duration(self) -> DurationInSeconds: + """returns the duration of this run, provided it has finished + """ assert self.has_finished() return (self.end_time - self.start_time).total_seconds() -StarBencher = ForwardRef('StarBencher') +CommandPerfEstimator = ForwardRef('CommandPerfEstimator') class IStarBencherStopCondition(ABC): + """abstract handler that decides if the given CommandPerfEstimator has enough runs to estimate the performance or should trigger new runs + """ @abstractmethod - def should_stop(self, star_bencher: StarBencher): - pass + def should_stop(self, star_bencher: CommandPerfEstimator) -> bool: + """decides if the given CommandPerfEstimator instance should trigger new runs + + This method is called at the end of each run, to decide if another run should be triggered or not. + """ class StopAfterSingleRun(IStarBencherStopCondition): + """a stop condition that causes the given CommandPerfEstimator to never start new runs + as a result, this causes the given CommandPerfEstimator to just use one single run of the command to estimate its performance. + """ def __init__(self): pass - def should_stop(self, star_bencher: StarBencher): + def should_stop(self, star_bencher: CommandPerfEstimator): # never start a new run return True class StopWhenConverged(IStarBencherStopCondition): - + """a stop condition that triggers when the just completed run doesn't have much effect on the average run's duration + """ def __init__(self, max_error: float = 0.01): self.max_error = max_error self._last_mean_duration = None - def should_stop(self, star_bencher: StarBencher): + def should_stop(self, star_bencher: CommandPerfEstimator) -> bool: do_stop = False - mean_duration, num_runs = star_bencher._get_run_mean_duration() - print('mean_duration = %f' % mean_duration) + mean_duration, _num_runs = star_bencher.get_run_mean_duration() + print(f'mean_duration = {mean_duration}') if self._last_mean_duration is not None: diff = abs(mean_duration - self._last_mean_duration) - print('diff = %f' % diff) + print(f'diff = {diff}') if diff < self.max_error: do_stop = True self._last_mean_duration = mean_duration return do_stop -class StarBencher(): # pylint: disable=function-redefined (false positive) - ''' - the 'star' term comes from hpl's stadgemm benchmark, where we launch `n` independent programs on `n cores` +class CommandPerfEstimator(): # (false positive) pylint: disable=function-redefined + '''a command runner that runs a given command multiple times and measures the average execution duration + + the 'star' term comes from hpl's stadgemm benchmark, where we launch `n` independent programs on `n` cores ''' + run_command: List[str] # the command that this instance of CommandPerfEstimator is expected to run (eg: ['ctest', '--output-on-failure', '-L', '^arch4_quick$']). The command supports the following tags: + run_command_cwd: Path # the current directory to use when executing run_command + stdout_filepath: Path # the path of the file that records the standard output of run_command + stderr_filepath: Path # the path of the file that records the standard error of run_command + num_cores_per_run: int # the max number of threads used by each run + num_parallel_runs: int # how many times run_command is run simultaneously + max_num_cores: int # the maximum allowed number of cores for this CommandPerfEstimator + stop_condition: IStarBencherStopCondition # the condition that is used so that this CommandPerfEstimator can decide to stop launching commands + stop_on_error: bool + _next_run_id: int + _runs: Dict[int, Run] + _last_mean_duration: Optional[DurationInSeconds] + _num_runs: int + _runs_lock: threading.Lock + _finished_event: threading.Event def __init__(self, run_command: List[str], num_cores_per_run: int, num_parallel_runs: int, max_num_cores: int, stop_condition: IStarBencherStopCondition, stop_on_error=True, run_command_cwd: Path = None, stdout_filepath: Path = None, stderr_filepath: Path = None): assert num_cores_per_run * num_parallel_runs <= max_num_cores - self.run_command = run_command # in python3.6+, replace with self.run_command: List[str] = run_command + self.run_command = run_command self.run_command_cwd = run_command_cwd self.stdout_filepath = stdout_filepath self.stderr_filepath = stderr_filepath self.num_cores_per_run = num_cores_per_run self.num_parallel_runs = num_parallel_runs - self.max_num_cores = max_num_cores # in python3.6+, replace with self.max_num_cores: int = max_num_cores - self.stop_condition = stop_condition # in python3.6+, replace with self.stop_condition: IStarBencherStopCondition = stop_condition + self.max_num_cores = max_num_cores + self.stop_condition = stop_condition self.stop_on_error = stop_on_error - self._next_run_id = 0 # in python3.6+, replace with self._next_run_id: int = 0 - self._runs = {} # in python3.6+, replace with self._runs: Dict(int, Run) = {} + self._next_run_id = 0 + self._runs = {} self._last_mean_duration = None self._num_runs = 0 self._runs_lock = threading.Lock() self._finished_event = threading.Event() - def popen_and_call(self, popen_args, on_exit, run_id: int, cwd: Path, stdout_filepath: Path = None, stderr_filepath: Path = None): + def popen_and_call(self, popen_args: List[str], on_exit: Callable[[ProcessId, ReturnCode, RunId], None], run_id: RunId, cwd: Path, stdout_filepath: Path = None, stderr_filepath: Path = None): """ Runs the given args in a subprocess.Popen, and then calls the function on_exit when the subprocess completes. on_exit is a callable object, and popen_args is a list/tuple of args that would give to subprocess.Popen. """ - def run_in_thread(popen_args, on_exit): + def run_in_thread(popen_args: List[str], on_exit: Callable[[ProcessId, ReturnCode, RunId], None]): stdout = None stderr = None if stdout_filepath is not None: - stdout = open(stdout_filepath, 'w') + stdout = open(stdout_filepath, 'w', encoding='utf8') if stderr_filepath is not None: - stderr = open(stderr_filepath, 'w') + stderr = open(stderr_filepath, 'w', encoding='utf8') env = os.environ.copy() # restrict the number of threads used by openmp - env['OMP_NUM_THREADS'] = '%d' % self.num_cores_per_run + env['OMP_NUM_THREADS'] = f'{self.num_cores_per_run}' # restrict the nu,ber of threads used by intel math kernel library - env['MKL_NUM_THREADS'] = '%d' % self.num_cores_per_run + env['MKL_NUM_THREADS'] = f'{self.num_cores_per_run}' proc = subprocess.Popen(popen_args, cwd=cwd, stdout=stdout, stderr=stderr, env=env) proc.wait() if stderr is not None: @@ -130,7 +180,9 @@ class StarBencher(): # pylint: disable=function-redefined (false positive) # returns immediately after the thread starts return thread - def _get_run_mean_duration(self): + def get_run_mean_duration(self) -> Tuple[DurationInSeconds, int]: + """returns the average duration of all completed runs of this CommandPerfEstimator instance + """ duration_sums = 0.0 # in python3.6+, replace with duration_sums: float = 0.0 num_finished_runs = 0 # in python3.6+, replace with num_finished_runs: int = 0 with self._runs_lock: @@ -148,7 +200,13 @@ class StarBencher(): # pylint: disable=function-redefined (false positive) return False return True - def on_exit(self, pid: int, return_code: int, run_id: int): + def on_exit(self, pid: ProcessId, return_code: ReturnCode, run_id: RunId): + """method called when the command executed by a run ends. Unless the stop condition is met, a new run is started. + + pid: the process identifier of the process of the run that just finished + return_code: the return code of the process of the run that just finished + run_id: the run that just completed + """ end_time = datetime.now() # print(self, pid, run_id) run = self._runs[run_id] @@ -168,25 +226,35 @@ class StarBencher(): # pylint: disable=function-redefined (false positive) # tell the main thread that all the runs have finished self._finished_event.set() - def _start_run(self, worker_id: int): - worker_as_str = '%03d' % worker_id - run_command = [str(s).replace('', worker_as_str) for s in self.run_command] - run_command_cwd = str(self.run_command_cwd).replace('', worker_as_str) + @staticmethod + def _interpret_tags(tagged_string: str, tags_value: Dict[str, str]) -> str: + untagged_string = tagged_string + for tag_id, tag_value in tags_value.items(): + untagged_string = untagged_string.replace(tag_id, tag_value) + return untagged_string + + def _start_run(self, worker_id: WorkerId): + """starts a run using the given worker""" + tags_value = { + '': f'{worker_id:03d}' + } + run_command = [CommandPerfEstimator._interpret_tags(s, tags_value) for s in self.run_command] + run_command_cwd = CommandPerfEstimator._interpret_tags(str(self.run_command_cwd), tags_value) stdout_filepath = None if self.stdout_filepath is not None: - stdout_filepath = str(self.stdout_filepath).replace('', worker_as_str) + stdout_filepath = CommandPerfEstimator._interpret_tags(str(self.stdout_filepath), tags_value) stderr_filepath = None if self.stderr_filepath is not None: - stderr_filepath = str(self.stderr_filepath).replace('', worker_as_str) - run_command_cwd = str(self.run_command_cwd).replace('', worker_as_str) + stderr_filepath = CommandPerfEstimator._interpret_tags(str(self.stderr_filepath), tags_value) with self._runs_lock: run = Run(self._next_run_id, worker_id) self._next_run_id += 1 - run_thread = self.popen_and_call(popen_args=run_command, on_exit=self.on_exit, run_id=run.id, cwd=run_command_cwd, stdout_filepath=stdout_filepath, stderr_filepath=stderr_filepath) # noqa:F841 + _run_thread = self.popen_and_call(popen_args=run_command, on_exit=self.on_exit, run_id=run.id, cwd=run_command_cwd, stdout_filepath=stdout_filepath, stderr_filepath=stderr_filepath) # noqa:F841 self._runs[run.id] = run - def run(self): - print("executing the following command in parallel (%d parallel runs) : '%s'" % (self.num_parallel_runs, str(self.run_command))) + def run(self) -> DurationInSeconds: + '''performs the runs of the command and returns the runs' average duration''' + print(f"executing the following command in parallel ({self.num_parallel_runs} parallel runs) : '{str(self.run_command)}'") for worker_id in range(self.num_parallel_runs): self._start_run(worker_id) # wait until all runs have finished @@ -194,24 +262,25 @@ class StarBencher(): # pylint: disable=function-redefined (false positive) with self._runs_lock: workers_success = [run.return_code == 0 for run in self._runs.values()] if not all(workers_success): - raise Exception('at least one run failed (workers_success = %s)' % workers_success) - mean_duration, num_runs = self._get_run_mean_duration() - print('mean duration : %.3f s (%d runs)' % (mean_duration, num_runs)) + raise StarBenchException(f'at least one run failed (workers_success = {workers_success})') + mean_duration, num_runs = self.get_run_mean_duration() + print(f'mean duration : {mean_duration:.3f} s ({num_runs} runs)') return mean_duration -def test_starbencher(): - if False: - stop_condition = StopAfterSingleRun() - # stop_condition = StopWhenConverged(max_error=0.0001) - bench = StarBencher(run_command=['sleep', '0.1415927'], num_cores_per_run=1, num_parallel_runs=2, max_num_cores=2, stop_condition=stop_condition) - mean_duration = bench.run() - print(mean_duration) +# def test_starbencher(): +# if False: +# stop_condition = StopAfterSingleRun() +# # stop_condition = StopWhenConverged(max_error=0.0001) +# bench = StarBencher(run_command=['sleep', '0.1415927'], num_cores_per_run=1, num_parallel_runs=2, max_num_cores=2, stop_condition=stop_condition) +# mean_duration = bench.run() +# print(mean_duration) - # if False: - # bench = StarBencher(run_command=['ls', '/tmp'], num_cores_per_run=1, num_parallel_runs=2, max_num_cores=2, max_error=0.0001) - # mean_duration = bench.run() - # print(mean_duration) +# if False: +# bench = StarBencher(run_command=['ls', '/tmp'], num_cores_per_run=1, num_parallel_runs=2, max_num_cores=2, max_error=0.0001) +# mean_duration = bench.run() +# print(mean_duration) +# pass # end of starbencher @@ -227,18 +296,18 @@ def starbench_cmake_app(git_repos_url: str, code_version: str, tmp_dir: Path, nu if git_password: git_credentials.append(git_password) if len(git_credentials) != 0: - git_repos_url = git_repos_url.replace('https://', 'https://%s@' % ':'.join(git_credentials)) + git_repos_url = git_repos_url.replace('https://', f"https://{':'.join(git_credentials)}@") src_dir = tmp_dir / 'source.git' # src_dir.mkdir(exist_ok=True) - subprocess.run(['git', 'clone', '%s' % (str(git_repos_url)), str(src_dir)], cwd=str(tmp_dir), check=True) + subprocess.run(['git', 'clone', f'{str(git_repos_url)}', str(src_dir)], cwd=str(tmp_dir), check=True) if code_version: - subprocess.run(['git', 'checkout', '%s' % (code_version)], cwd=str(src_dir), check=True) + subprocess.run(['git', 'checkout', f'{code_version}'], cwd=str(src_dir), check=True) # we need one build for each parallel run, otherwise running ctest on parallel would overwrite the same file, which causes the test to randomly fail depnding on race conditions worker_dir = tmp_dir / 'worker' build_dir = worker_dir / 'build' - print('creating build directory %s' % worker_dir) - create_build_dir = StarBencher( + print(f'creating build directory {worker_dir}') + create_build_dir = CommandPerfEstimator( run_command=['mkdir', '-p', build_dir], num_cores_per_run=1, num_parallel_runs=num_cores, @@ -246,14 +315,14 @@ def starbench_cmake_app(git_repos_url: str, code_version: str, tmp_dir: Path, nu stop_condition=StopAfterSingleRun(), run_command_cwd=Path('/tmp'), stdout_filepath=None) - create_build_dir_duration = create_build_dir.run() # noqa: F841 + _create_build_dir_duration = create_build_dir.run() # noqa: F841 # build_dir.mkdir(exist_ok=True) - print('configuring %s into %s ...' % (src_dir, build_dir)) + print(f'configuring {src_dir} into {build_dir} ...') cmake_prog = 'cmake' if cmake_exe_location: cmake_prog = str(cmake_exe_location) - configure = StarBencher( + configure = CommandPerfEstimator( run_command=[cmake_prog] + cmake_options + [src_dir], num_cores_per_run=1, num_parallel_runs=num_cores, @@ -262,10 +331,10 @@ def starbench_cmake_app(git_repos_url: str, code_version: str, tmp_dir: Path, nu run_command_cwd=build_dir, stdout_filepath=worker_dir / 'configure_stdout.txt', stderr_filepath=worker_dir / 'configure_stderr.txt') - configure_duration = configure.run() # noqa: F841 + _configure_duration = configure.run() # noqa: F841 - print('building %s ...' % (build_dir)) - build = StarBencher( + print(f'building {build_dir} ...') + build = CommandPerfEstimator( run_command=['make'], num_cores_per_run=1, num_parallel_runs=num_cores, @@ -274,11 +343,11 @@ def starbench_cmake_app(git_repos_url: str, code_version: str, tmp_dir: Path, nu run_command_cwd=build_dir, stdout_filepath=worker_dir / 'build_stdout.txt', stderr_filepath=worker_dir / 'build_stderr.txt') - build_duration = build.run() # noqa: F841 + _build_duration = build.run() # noqa: F841 - print('benchmarking %s ...' % (build_dir)) + print(f'benchmarking {build_dir} ...') stop_condition = StopAfterSingleRun() - bench = StarBencher( + bench = CommandPerfEstimator( run_command=benchmark_command, num_cores_per_run=1, num_parallel_runs=num_cores, @@ -288,10 +357,11 @@ def starbench_cmake_app(git_repos_url: str, code_version: str, tmp_dir: Path, nu stdout_filepath=worker_dir / 'bench_stdout.txt', stderr_filepath=worker_dir / 'bench_stderr.txt') mean_duration = bench.run() - print('duration : %.3f s' % (mean_duration)) + print(f'duration : {mean_duration:.3f} s' % ()) -if __name__ == '__main__': +def main(): + '''main program''' example_text = '''example: @@ -320,7 +390,11 @@ if __name__ == '__main__': if args.git_pass: git_password = args.git_pass elif args.git_pass_file: - with open(args.git_pass_file, 'r') as f: + with open(args.git_pass_file, 'r', encoding='utf8') as f: git_password = f.readline().replace('\n', '') # os.environ['HIBRIDON_REPOS_PAT'] starbench_cmake_app(git_repos_url=git_repos_url, code_version=args.code_version, tmp_dir=args.output_dir, num_cores=args.num_cores, git_user=git_user, git_password=git_password, cmake_options=args.cmake_option, benchmark_command=args.benchmark_command.split(' '), cmake_exe_location=args.cmake_path) + + +if __name__ == '__main__': + main()